From e5d1cd9ea2c19177386d0fbbcd6f51fea1127132 Mon Sep 17 00:00:00 2001 From: fengmk2 Date: Sat, 8 Feb 2025 05:27:56 +0000 Subject: [PATCH] feat(server): use doc service (#9967) close CLOUD-94 --- .github/workflows/build-images.yml | 2 +- .../server/src/__tests__/doc/cron.spec.ts | 66 +++-------- .../server/src/__tests__/utils/utils.ts | 23 ++-- packages/backend/server/src/app.module.ts | 15 ++- .../backend/server/src/base/event/eventbus.ts | 5 +- .../backend/server/src/base/utils/request.ts | 16 +++ .../doc-renderer/__tests__/controller.spec.ts | 85 ++++++++++++++ .../doc-renderer/__tests__/service.spec.ts | 85 ++++++++++++++ .../server/src/core/doc-renderer/service.ts | 8 +- .../core/doc-service/__tests__/job.spec.ts | 109 ++++++++++++++++++ .../server/src/core/doc-service/controller.ts | 7 +- .../server/src/core/doc-service/index.ts | 2 + .../server/src/core/doc-service/job.ts | 61 ++++++++++ .../server/src/core/doc/adapters/workspace.ts | 2 +- packages/backend/server/src/core/doc/index.ts | 6 +- packages/backend/server/src/core/doc/job.ts | 54 +-------- .../server/src/core/workspaces/controller.ts | 4 +- 17 files changed, 423 insertions(+), 127 deletions(-) create mode 100644 packages/backend/server/src/core/doc-renderer/__tests__/controller.spec.ts create mode 100644 packages/backend/server/src/core/doc-renderer/__tests__/service.spec.ts create mode 100644 packages/backend/server/src/core/doc-service/__tests__/job.spec.ts create mode 100644 packages/backend/server/src/core/doc-service/job.ts diff --git a/.github/workflows/build-images.yml b/.github/workflows/build-images.yml index d711486003ec1..963f228c70f76 100644 --- a/.github/workflows/build-images.yml +++ b/.github/workflows/build-images.yml @@ -28,7 +28,7 @@ jobs: extra-flags: workspaces focus @affine/server - name: Build Server run: | - rm -rf packages/backend/server/src/__tests__ + find packages/backend/server -type d -name "__tests__" -exec rm -rf {} + yarn workspace @affine/server build - name: Upload server dist uses: actions/upload-artifact@v4 diff --git a/packages/backend/server/src/__tests__/doc/cron.spec.ts b/packages/backend/server/src/__tests__/doc/cron.spec.ts index 9cf045ac79cb6..5b90f8db109b5 100644 --- a/packages/backend/server/src/__tests__/doc/cron.spec.ts +++ b/packages/backend/server/src/__tests__/doc/cron.spec.ts @@ -1,67 +1,39 @@ -import { mock } from 'node:test'; - import { ScheduleModule } from '@nestjs/schedule'; -import { TestingModule } from '@nestjs/testing'; import { PrismaClient } from '@prisma/client'; -import test from 'ava'; -import * as Sinon from 'sinon'; +import ava, { TestFn } from 'ava'; -import { Config } from '../../base/config'; import { DocStorageModule } from '../../core/doc'; import { DocStorageCronJob } from '../../core/doc/job'; -import { createTestingModule } from '../utils'; +import { createTestingModule, type TestingModule } from '../utils'; + +interface Context { + module: TestingModule; + db: PrismaClient; + cronJob: DocStorageCronJob; +} -let m: TestingModule; -let timer: Sinon.SinonFakeTimers; -let db: PrismaClient; +const test = ava as TestFn; // cleanup database before each test -test.before(async () => { - timer = Sinon.useFakeTimers({ - toFake: ['setInterval'], - }); - m = await createTestingModule({ +test.before(async t => { + t.context.module = await createTestingModule({ imports: [ScheduleModule.forRoot(), DocStorageModule], }); - db = m.get(PrismaClient); + t.context.db = t.context.module.get(PrismaClient); + t.context.cronJob = t.context.module.get(DocStorageCronJob); }); -test.after.always(async () => { - await m.close(); - timer.restore(); +test.beforeEach(async t => { + await t.context.module.initTestingDB(); }); -test('should poll when intervel due', async t => { - const manager = m.get(DocStorageCronJob); - const interval = m.get(Config).doc.manager.updatePollInterval; - - let resolve: any; - const fake = mock.method(manager, 'autoMergePendingDocUpdates', () => { - return new Promise(_resolve => { - resolve = _resolve; - }); - }); - - timer.tick(interval); - t.is(fake.mock.callCount(), 1); - - // busy - timer.tick(interval); - // @ts-expect-error private member - t.is(manager.busy, true); - t.is(fake.mock.callCount(), 1); - - resolve(); - await timer.tickAsync(1); - - // @ts-expect-error private member - t.is(manager.busy, false); - timer.tick(interval); - t.is(fake.mock.callCount(), 2); +test.after.always(async t => { + await t.context.module.close(); }); test('should be able to cleanup expired history', async t => { + const { db } = t.context; const timestamp = Date.now(); // insert expired data @@ -93,7 +65,7 @@ test('should be able to cleanup expired history', async t => { let count = await db.snapshotHistory.count(); t.is(count, 20); - await m.get(DocStorageCronJob).cleanupExpiredHistory(); + await t.context.cronJob.cleanupExpiredHistory(); count = await db.snapshotHistory.count(); t.is(count, 10); diff --git a/packages/backend/server/src/__tests__/utils/utils.ts b/packages/backend/server/src/__tests__/utils/utils.ts index 1064b43fafc35..05f2042c472f4 100644 --- a/packages/backend/server/src/__tests__/utils/utils.ts +++ b/packages/backend/server/src/__tests__/utils/utils.ts @@ -1,8 +1,4 @@ -import { - ConsoleLogger, - INestApplication, - ModuleMetadata, -} from '@nestjs/common'; +import { INestApplication, LogLevel, ModuleMetadata } from '@nestjs/common'; import { APP_GUARD, ModuleRef } from '@nestjs/core'; import { Query, Resolver } from '@nestjs/graphql'; import { @@ -17,12 +13,15 @@ import type { Response } from 'supertest'; import supertest from 'supertest'; import { AppModule, FunctionalityModules } from '../../app.module'; -import { GlobalExceptionFilter, Runtime } from '../../base'; +import { AFFiNELogger, GlobalExceptionFilter, Runtime } from '../../base'; import { GqlModule } from '../../base/graphql'; import { AuthGuard, AuthModule } from '../../core/auth'; import { RefreshFeatures0001 } from '../../data/migrations/0001-refresh-features'; import { ModelsModule } from '../../models'; +const TEST_LOG_LEVEL: LogLevel = + (process.env.TEST_LOG_LEVEL as LogLevel) ?? 'fatal'; + async function flushDB(client: PrismaClient) { const result: { tablename: string }[] = await client.$queryRaw`SELECT tablename @@ -39,7 +38,7 @@ async function flushDB(client: PrismaClient) { ); } -interface TestingModuleMeatdata extends ModuleMetadata { +interface TestingModuleMetadata extends ModuleMetadata { tapModule?(m: TestingModuleBuilder): void; tapApp?(app: INestApplication): void; } @@ -85,7 +84,7 @@ class MockResolver { } export async function createTestingModule( - moduleDef: TestingModuleMeatdata = {}, + moduleDef: TestingModuleMetadata = {}, autoInitialize = true ): Promise { // setting up @@ -127,7 +126,7 @@ export async function createTestingModule( // can't tolerate the noisy logs // @ts-expect-error private m.applyLogger({ - logger: ['fatal'], + logger: [TEST_LOG_LEVEL], }); const runtime = m.get(Runtime); // by pass password min length validation @@ -146,7 +145,7 @@ export async function createTestingModule( } export async function createTestingApp( - moduleDef: TestingModuleMeatdata = {} + moduleDef: TestingModuleMetadata = {} ): Promise<{ module: TestingModule; app: TestingApp }> { const m = await createTestingModule(moduleDef, false); @@ -155,9 +154,9 @@ export async function createTestingApp( bodyParser: true, rawBody: true, }) as TestingApp; - const logger = new ConsoleLogger(); + const logger = new AFFiNELogger(); - logger.setLogLevels(['fatal']); + logger.setLogLevels([TEST_LOG_LEVEL]); app.useLogger(logger); app.useGlobalFilters(new GlobalExceptionFilter(app.getHttpAdapter())); diff --git a/packages/backend/server/src/app.module.ts b/packages/backend/server/src/app.module.ts index 13f68a355a4dd..fb194c95c4785 100644 --- a/packages/backend/server/src/app.module.ts +++ b/packages/backend/server/src/app.module.ts @@ -1,5 +1,3 @@ -import { randomUUID } from 'node:crypto'; - import { DynamicModule, ForwardReference, @@ -15,7 +13,7 @@ import { get } from 'lodash-es'; import { ClsModule } from 'nestjs-cls'; import { AppController } from './app.controller'; -import { getOptionalModuleMetadata } from './base'; +import { genRequestId, getOptionalModuleMetadata } from './base'; import { CacheModule } from './base/cache'; import { AFFiNEConfig, ConfigModule, mergeConfigOverride } from './base/config'; import { ErrorModule } from './base/error'; @@ -59,7 +57,7 @@ export const FunctionalityModules = [ generateId: true, idGenerator(req: Request) { // make every request has a unique id to tracing - return req.get('x-rpc-trace-id') ?? `req-${randomUUID()}`; + return req.get('x-rpc-trace-id') ?? genRequestId('req'); }, setup(cls, _req, res: Response) { res.setHeader('X-Request-Id', cls.getId()); @@ -72,7 +70,7 @@ export const FunctionalityModules = [ generateId: true, idGenerator() { // make every request has a unique id to tracing - return `ws-${randomUUID()}`; + return genRequestId('ws'); }, }, plugins: [ @@ -200,6 +198,12 @@ export function buildAppModule() { .use(...FunctionalityModules) .use(ModelsModule) + // enable schedule module on graphql server and doc service + .useIf( + config => config.flavor.graphql || config.flavor.doc, + ScheduleModule.forRoot() + ) + // auth .use(UserModule, AuthModule, PermissionModule) @@ -212,7 +216,6 @@ export function buildAppModule() { // graphql server only .useIf( config => config.flavor.graphql, - ScheduleModule.forRoot(), GqlModule, StorageModule, ServerConfigModule, diff --git a/packages/backend/server/src/base/event/eventbus.ts b/packages/backend/server/src/base/event/eventbus.ts index e1ee65612e1c2..8b017369f8e63 100644 --- a/packages/backend/server/src/base/event/eventbus.ts +++ b/packages/backend/server/src/base/event/eventbus.ts @@ -1,5 +1,3 @@ -import { randomUUID } from 'node:crypto'; - import { applyDecorators, Injectable, @@ -21,6 +19,7 @@ import { CLS_ID, ClsService } from 'nestjs-cls'; import type { Server, Socket } from 'socket.io'; import { CallMetric } from '../metrics'; +import { genRequestId } from '../utils'; import type { EventName } from './def'; const EventHandlerWrapper = (event: EventName): MethodDecorator => { @@ -94,7 +93,7 @@ export class EventBus implements OnGatewayConnection, OnApplicationBootstrap { // to internal event system this.server?.on(event, (payload, requestId?: string) => { this.cls.run(() => { - requestId = requestId ?? `server_event-${randomUUID()}`; + requestId = requestId ?? genRequestId('se'); this.cls.set(CLS_ID, requestId); this.logger.log(`Server Event: ${event} (Received)`); this.emit(event, payload); diff --git a/packages/backend/server/src/base/utils/request.ts b/packages/backend/server/src/base/utils/request.ts index bb52dac037f0b..ac787938f8966 100644 --- a/packages/backend/server/src/base/utils/request.ts +++ b/packages/backend/server/src/base/utils/request.ts @@ -1,3 +1,4 @@ +import { randomUUID } from 'node:crypto'; import { IncomingMessage } from 'node:http'; import type { ArgumentsHost, ExecutionContext } from '@nestjs/common'; @@ -77,3 +78,18 @@ export function parseCookies( {} as Record ); } + +/** + * Request type + * + * @description + * - `req`: http request + * - `ws`: websocket request + * - `se`: server event + * - `job`: cron job + */ +export type RequestType = 'req' | 'ws' | 'se' | 'job'; + +export function genRequestId(type: RequestType) { + return `${AFFiNE.flavor.type}:${type}-${randomUUID()}`; +} diff --git a/packages/backend/server/src/core/doc-renderer/__tests__/controller.spec.ts b/packages/backend/server/src/core/doc-renderer/__tests__/controller.spec.ts new file mode 100644 index 0000000000000..cb5248d1e8564 --- /dev/null +++ b/packages/backend/server/src/core/doc-renderer/__tests__/controller.spec.ts @@ -0,0 +1,85 @@ +import { randomUUID } from 'node:crypto'; + +import { User, Workspace } from '@prisma/client'; +import ava, { TestFn } from 'ava'; +import request from 'supertest'; +import { Doc as YDoc } from 'yjs'; + +import { createTestingApp, type TestingApp } from '../../../__tests__/utils'; +import { AppModule } from '../../../app.module'; +import { Config } from '../../../base'; +import { ConfigModule } from '../../../base/config'; +import { Models } from '../../../models'; +import { PgWorkspaceDocStorageAdapter } from '../../doc'; +import { PermissionService } from '../../permission'; + +const test = ava as TestFn<{ + models: Models; + app: TestingApp; + config: Config; + adapter: PgWorkspaceDocStorageAdapter; + permission: PermissionService; +}>; + +test.before(async t => { + const { app } = await createTestingApp({ + imports: [ + ConfigModule.forRoot({ + flavor: { + doc: false, + }, + docService: { + endpoint: '', + }, + }), + AppModule, + ], + }); + + t.context.models = app.get(Models); + t.context.config = app.get(Config); + t.context.adapter = app.get(PgWorkspaceDocStorageAdapter); + t.context.permission = app.get(PermissionService); + t.context.app = app; +}); + +let user: User; +let workspace: Workspace; + +test.beforeEach(async t => { + t.context.config.docService.endpoint = t.context.app.getHttpServerUrl(); + await t.context.app.initTestingDB(); + user = await t.context.models.user.create({ + email: 'test@affine.pro', + }); + workspace = await t.context.models.workspace.create(user.id); +}); + +test.after.always(async t => { + await t.context.app.close(); +}); + +test('should render page success', async t => { + const docId = randomUUID(); + const { app, adapter, permission } = t.context; + + const doc = new YDoc(); + const text = doc.getText('content'); + const updates: Buffer[] = []; + + doc.on('update', update => { + updates.push(Buffer.from(update)); + }); + + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + + await adapter.pushDocUpdates(workspace.id, docId, updates, user.id); + await permission.publishPage(workspace.id, docId); + + await request(app.getHttpServer()) + .get(`/workspace/${workspace.id}/${docId}`) + .expect(200); + t.pass(); +}); diff --git a/packages/backend/server/src/core/doc-renderer/__tests__/service.spec.ts b/packages/backend/server/src/core/doc-renderer/__tests__/service.spec.ts new file mode 100644 index 0000000000000..1fb6d17b75f9d --- /dev/null +++ b/packages/backend/server/src/core/doc-renderer/__tests__/service.spec.ts @@ -0,0 +1,85 @@ +import { randomUUID } from 'node:crypto'; + +import { User, Workspace } from '@prisma/client'; +import ava, { TestFn } from 'ava'; +import { Doc as YDoc } from 'yjs'; + +import { createTestingApp, type TestingApp } from '../../../__tests__/utils'; +import { AppModule } from '../../../app.module'; +import { Config } from '../../../base'; +import { ConfigModule } from '../../../base/config'; +import { Models } from '../../../models'; +import { PgWorkspaceDocStorageAdapter } from '../../doc'; +import { DocContentService } from '..'; + +const test = ava as TestFn<{ + models: Models; + app: TestingApp; + docContentService: DocContentService; + config: Config; + adapter: PgWorkspaceDocStorageAdapter; +}>; + +test.before(async t => { + const { app } = await createTestingApp({ + imports: [ + ConfigModule.forRoot({ + flavor: { + doc: false, + }, + docService: { + endpoint: '', + }, + }), + AppModule, + ], + }); + + t.context.models = app.get(Models); + t.context.docContentService = app.get(DocContentService); + t.context.config = app.get(Config); + t.context.adapter = app.get(PgWorkspaceDocStorageAdapter); + t.context.app = app; +}); + +let user: User; +let workspace: Workspace; + +test.beforeEach(async t => { + t.context.config.docService.endpoint = t.context.app.getHttpServerUrl(); + await t.context.app.initTestingDB(); + user = await t.context.models.user.create({ + email: 'test@affine.pro', + }); + workspace = await t.context.models.workspace.create(user.id); +}); + +test.after.always(async t => { + await t.context.app.close(); +}); + +test('should get doc content from doc service rpc', async t => { + const docId = randomUUID(); + const { docContentService } = t.context; + + const doc = new YDoc(); + const text = doc.getText('content'); + const updates: Buffer[] = []; + + doc.on('update', update => { + updates.push(Buffer.from(update)); + }); + + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + + await t.context.adapter.pushDocUpdates(workspace.id, docId, updates, user.id); + + const docContent = await docContentService.getPageContent( + workspace.id, + docId + ); + // TODO(@fengmk2): should create a test ydoc with blocks + t.is(docContent, null); +}); diff --git a/packages/backend/server/src/core/doc-renderer/service.ts b/packages/backend/server/src/core/doc-renderer/service.ts index 131f52f8536d6..a44bb7f035937 100644 --- a/packages/backend/server/src/core/doc-renderer/service.ts +++ b/packages/backend/server/src/core/doc-renderer/service.ts @@ -2,7 +2,7 @@ import { Injectable } from '@nestjs/common'; import { applyUpdate, Doc } from 'yjs'; import { Cache, OnEvent } from '../../base'; -import { PgWorkspaceDocStorageAdapter } from '../doc'; +import { DocReader } from '../doc'; import { type PageDocContent, parsePageDoc, @@ -14,7 +14,7 @@ import { export class DocContentService { constructor( private readonly cache: Cache, - private readonly workspace: PgWorkspaceDocStorageAdapter + private readonly docReader: DocReader ) {} async getPageContent( @@ -28,7 +28,7 @@ export class DocContentService { return cachedResult; } - const docRecord = await this.workspace.getDoc(workspaceId, guid); + const docRecord = await this.docReader.getDoc(workspaceId, guid); if (!docRecord) { return null; } @@ -61,7 +61,7 @@ export class DocContentService { return cachedResult; } - const docRecord = await this.workspace.getDoc(workspaceId, workspaceId); + const docRecord = await this.docReader.getDoc(workspaceId, workspaceId); if (!docRecord) { return null; } diff --git a/packages/backend/server/src/core/doc-service/__tests__/job.spec.ts b/packages/backend/server/src/core/doc-service/__tests__/job.spec.ts new file mode 100644 index 0000000000000..8c4f2ed7e85b8 --- /dev/null +++ b/packages/backend/server/src/core/doc-service/__tests__/job.spec.ts @@ -0,0 +1,109 @@ +import { mock } from 'node:test'; + +import { ScheduleModule } from '@nestjs/schedule'; +import ava, { TestFn } from 'ava'; +import * as Sinon from 'sinon'; +import { Doc as YDoc } from 'yjs'; + +import { + createTestingModule, + type TestingModule, +} from '../../../__tests__/utils'; +import { Config } from '../../../base'; +import { + DocStorageModule, + PgWorkspaceDocStorageAdapter, +} from '../../../core/doc'; +import { Models } from '../../../models'; +import { DocServiceModule } from '..'; +import { DocServiceCronJob } from '../job'; + +interface Context { + timer: Sinon.SinonFakeTimers; + module: TestingModule; + cronJob: DocServiceCronJob; + config: Config; + adapter: PgWorkspaceDocStorageAdapter; + models: Models; +} + +const test = ava as TestFn; + +// cleanup database before each test +test.before(async t => { + t.context.timer = Sinon.useFakeTimers({ + toFake: ['setInterval'], + }); + t.context.module = await createTestingModule({ + imports: [ScheduleModule.forRoot(), DocStorageModule, DocServiceModule], + }); + + t.context.cronJob = t.context.module.get(DocServiceCronJob); + t.context.config = t.context.module.get(Config); + t.context.adapter = t.context.module.get(PgWorkspaceDocStorageAdapter); + t.context.models = t.context.module.get(Models); +}); + +test.beforeEach(async t => { + await t.context.module.initTestingDB(); +}); + +test.afterEach(async t => { + t.context.timer.restore(); + Sinon.restore(); + mock.reset(); +}); + +test.after.always(async t => { + await t.context.module.close(); +}); + +test('should poll when interval due', async t => { + const cronJob = t.context.cronJob; + const interval = t.context.config.doc.manager.updatePollInterval; + + let resolve: any; + const fake = mock.method(cronJob, 'autoMergePendingDocUpdates', () => { + return new Promise(_resolve => { + resolve = _resolve; + }); + }); + + t.context.timer.tick(interval); + t.is(fake.mock.callCount(), 1); + + // busy + t.context.timer.tick(interval); + // @ts-expect-error private member + t.is(cronJob.busy, true); + t.is(fake.mock.callCount(), 1); + + resolve(); + await t.context.timer.tickAsync(1); + + // @ts-expect-error private member + t.is(cronJob.busy, false); + t.context.timer.tick(interval); + t.is(fake.mock.callCount(), 2); +}); + +test('should auto merge pending doc updates', async t => { + const doc = new YDoc(); + const text = doc.getText('content'); + const updates: Buffer[] = []; + + doc.on('update', update => { + updates.push(Buffer.from(update)); + }); + + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + + await t.context.adapter.pushDocUpdates('2', '2', updates); + await t.context.cronJob.autoMergePendingDocUpdates(); + const rows = await t.context.models.doc.findUpdates('2', '2'); + t.is(rows.length, 0); + // again should merge nothing + await t.context.cronJob.autoMergePendingDocUpdates(); +}); diff --git a/packages/backend/server/src/core/doc-service/controller.ts b/packages/backend/server/src/core/doc-service/controller.ts index 15a2ded0f9b7d..b069e84d1da95 100644 --- a/packages/backend/server/src/core/doc-service/controller.ts +++ b/packages/backend/server/src/core/doc-service/controller.ts @@ -1,4 +1,4 @@ -import { Controller, Get, Param, Res } from '@nestjs/common'; +import { Controller, Get, Logger, Param, Res } from '@nestjs/common'; import type { Response } from 'express'; import { NotFound, SkipThrottle } from '../../base'; @@ -7,12 +7,14 @@ import { PgWorkspaceDocStorageAdapter } from '../doc'; @Controller('/rpc') export class DocRpcController { + private readonly logger = new Logger(DocRpcController.name); + constructor(private readonly workspace: PgWorkspaceDocStorageAdapter) {} @SkipThrottle() @Internal() @Get('/workspaces/:workspaceId/docs/:docId') - async render( + async getDoc( @Param('workspaceId') workspaceId: string, @Param('docId') docId: string, @Res() res: Response @@ -21,6 +23,7 @@ export class DocRpcController { if (!doc) { throw new NotFound('Doc not found'); } + this.logger.log(`get doc ${docId} from workspace ${workspaceId}`); res.setHeader('x-doc-timestamp', doc.timestamp.toString()); if (doc.editor) { res.setHeader('x-doc-editor-id', doc.editor); diff --git a/packages/backend/server/src/core/doc-service/index.ts b/packages/backend/server/src/core/doc-service/index.ts index 4ec81ff7f230f..762564815fb0e 100644 --- a/packages/backend/server/src/core/doc-service/index.ts +++ b/packages/backend/server/src/core/doc-service/index.ts @@ -2,9 +2,11 @@ import { Module } from '@nestjs/common'; import { DocStorageModule } from '../doc'; import { DocRpcController } from './controller'; +import { DocServiceCronJob } from './job'; @Module({ imports: [DocStorageModule], + providers: [DocServiceCronJob], controllers: [DocRpcController], }) export class DocServiceModule {} diff --git a/packages/backend/server/src/core/doc-service/job.ts b/packages/backend/server/src/core/doc-service/job.ts new file mode 100644 index 0000000000000..45f4ee5d6a17d --- /dev/null +++ b/packages/backend/server/src/core/doc-service/job.ts @@ -0,0 +1,61 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { SchedulerRegistry } from '@nestjs/schedule'; +import { CLS_ID, ClsService } from 'nestjs-cls'; + +import { CallMetric, Config, genRequestId, metrics } from '../../base'; +import { PgWorkspaceDocStorageAdapter } from '../doc'; + +@Injectable() +export class DocServiceCronJob implements OnModuleInit { + private busy = false; + private readonly logger = new Logger(DocServiceCronJob.name); + + constructor( + private readonly config: Config, + private readonly cls: ClsService, + private readonly workspace: PgWorkspaceDocStorageAdapter, + private readonly registry: SchedulerRegistry + ) {} + + onModuleInit() { + if (this.config.doc.manager.enableUpdateAutoMerging) { + this.registry.addInterval( + this.autoMergePendingDocUpdates.name, + // scheduler registry will clean up the interval when the app is stopped + setInterval(() => { + if (this.busy) { + return; + } + this.busy = true; + this.autoMergePendingDocUpdates() + .catch(() => { + /* never fail */ + }) + .finally(() => { + this.busy = false; + }); + }, this.config.doc.manager.updatePollInterval) + ); + + this.logger.log('Updates pending queue auto merging cron started'); + } + } + + @CallMetric('doc', 'auto_merge_pending_doc_updates') + async autoMergePendingDocUpdates() { + await this.cls.run(async () => { + this.cls.set(CLS_ID, genRequestId('job')); + try { + const randomDoc = await this.workspace.randomDoc(); + if (!randomDoc) { + return; + } + + await this.workspace.getDoc(randomDoc.workspaceId, randomDoc.docId); + } catch (e) { + metrics.doc.counter('auto_merge_pending_doc_updates_error').add(1); + this.logger.error('Failed to auto merge pending doc updates', e); + } + }); + } +} diff --git a/packages/backend/server/src/core/doc/adapters/workspace.ts b/packages/backend/server/src/core/doc/adapters/workspace.ts index 7154c37f621f2..14b3e21352049 100644 --- a/packages/backend/server/src/core/doc/adapters/workspace.ts +++ b/packages/backend/server/src/core/doc/adapters/workspace.ts @@ -334,7 +334,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { }); if (updatedSnapshot) { - this.event.emit('doc.snapshot.updated', { + this.event.broadcast('doc.snapshot.updated', { workspaceId: snapshot.spaceId, docId: snapshot.docId, }); diff --git a/packages/backend/server/src/core/doc/index.ts b/packages/backend/server/src/core/doc/index.ts index 0ee0b13c46000..f2ad96ceb9ad6 100644 --- a/packages/backend/server/src/core/doc/index.ts +++ b/packages/backend/server/src/core/doc/index.ts @@ -19,7 +19,11 @@ import { DocReader, DocReaderProvider } from './reader'; DocStorageCronJob, DocReaderProvider, ], - exports: [PgWorkspaceDocStorageAdapter, PgUserspaceDocStorageAdapter], + exports: [ + DocReader, + PgWorkspaceDocStorageAdapter, + PgUserspaceDocStorageAdapter, + ], }) export class DocStorageModule {} export { diff --git a/packages/backend/server/src/core/doc/job.ts b/packages/backend/server/src/core/doc/job.ts index 9b3a20ec37bda..f9e3843699361 100644 --- a/packages/backend/server/src/core/doc/job.ts +++ b/packages/backend/server/src/core/doc/job.ts @@ -1,61 +1,17 @@ -import { Injectable, Logger, OnModuleInit, Optional } from '@nestjs/common'; -import { Cron, CronExpression, SchedulerRegistry } from '@nestjs/schedule'; +import { Injectable } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; import { PrismaClient } from '@prisma/client'; -import { CallMetric, Config, metrics, OnEvent } from '../../base'; +import { metrics, OnEvent } from '../../base'; import { PgWorkspaceDocStorageAdapter } from './adapters/workspace'; @Injectable() -export class DocStorageCronJob implements OnModuleInit { - private busy = false; - private readonly logger = new Logger(DocStorageCronJob.name); - +export class DocStorageCronJob { constructor( - private readonly config: Config, private readonly db: PrismaClient, - private readonly workspace: PgWorkspaceDocStorageAdapter, - @Optional() private readonly registry?: SchedulerRegistry + private readonly workspace: PgWorkspaceDocStorageAdapter ) {} - onModuleInit() { - if (this.registry && this.config.doc.manager.enableUpdateAutoMerging) { - this.registry.addInterval( - this.autoMergePendingDocUpdates.name, - // scheduler registry will clean up the interval when the app is stopped - setInterval(() => { - if (this.busy) { - return; - } - this.busy = true; - this.autoMergePendingDocUpdates() - .catch(() => { - /* never fail */ - }) - .finally(() => { - this.busy = false; - }); - }, this.config.doc.manager.updatePollInterval) - ); - - this.logger.log('Updates pending queue auto merging cron started'); - } - } - - @CallMetric('doc', 'auto_merge_pending_doc_updates') - async autoMergePendingDocUpdates() { - try { - const randomDoc = await this.workspace.randomDoc(); - if (!randomDoc) { - return; - } - - await this.workspace.getDoc(randomDoc.workspaceId, randomDoc.docId); - } catch (e) { - metrics.doc.counter('auto_merge_pending_doc_updates_error').add(1); - this.logger.error('Failed to auto merge pending doc updates', e); - } - } - @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT /* everyday at 12am */) async cleanupExpiredHistory() { await this.db.snapshotHistory.deleteMany({ diff --git a/packages/backend/server/src/core/workspaces/controller.ts b/packages/backend/server/src/core/workspaces/controller.ts index dd55a5c3c4665..913fcfe4f261f 100644 --- a/packages/backend/server/src/core/workspaces/controller.ts +++ b/packages/backend/server/src/core/workspaces/controller.ts @@ -13,6 +13,7 @@ import { } from '../../base'; import { CurrentUser, Public } from '../auth'; import { PgWorkspaceDocStorageAdapter } from '../doc'; +import { DocReader } from '../doc/reader'; import { PermissionService, PublicDocMode } from '../permission'; import { WorkspaceBlobStorage } from '../storage'; import { DocID } from '../utils/doc'; @@ -24,6 +25,7 @@ export class WorkspacesController { private readonly storage: WorkspaceBlobStorage, private readonly permission: PermissionService, private readonly workspace: PgWorkspaceDocStorageAdapter, + private readonly docReader: DocReader, private readonly prisma: PrismaClient ) {} @@ -95,7 +97,7 @@ export class WorkspacesController { throw new AccessDenied(); } - const binResponse = await this.workspace.getDoc( + const binResponse = await this.docReader.getDoc( docId.workspace, docId.guid );