diff --git a/src/app.module.ts b/src/app.module.ts index 662f01d..ca21461 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -16,6 +16,7 @@ import { RawBodyMiddleware } from './commons/middlewares/raw-body.middleware'; import { GiteaWebhooksController } from './webhooks/gitea-webhooks.controller'; import { ParseBodyMiddleware } from './commons/middlewares/parse-body.middleware'; import { BullModule } from '@nestjs/bull'; +import { PubSubModule } from './commons/pub-sub/pub-sub.module'; @Module({ imports: [ @@ -51,8 +52,19 @@ import { BullModule } from '@nestjs/bull'; useFactory: (configService: ConfigService) => ({ redis: { host: configService.get('db.redis.host', 'localhost'), - port: configService.get('db.redis.port', 6379), - password: configService.get('db.redis.password', ''), + port: configService.get('db.redis.port', undefined), + password: configService.get('db.redis.password', undefined), + }, + }), + inject: [ConfigService], + }), + PubSubModule.forRootAsync({ + imports: [ConfigModule], + useFactory: (configService: ConfigService) => ({ + redis: { + host: configService.get('db.redis.host', 'localhost'), + port: configService.get('db.redis.port', undefined), + password: configService.get('db.redis.password', undefined), }, }), inject: [ConfigService], diff --git a/src/commons/pipes/sanitize.pipe.ts b/src/commons/pipes/sanitize.pipe.ts index 2ca94fd..d438f3e 100644 --- a/src/commons/pipes/sanitize.pipe.ts +++ b/src/commons/pipes/sanitize.pipe.ts @@ -1,5 +1,5 @@ import { ArgumentMetadata, Injectable, PipeTransform } from '@nestjs/common'; -import { sanitize } from '@neuralegion/class-sanitizer/dist'; +import { plainToClass } from 'class-transformer'; @Injectable() export class SanitizePipe implements PipeTransform { @@ -12,13 +12,11 @@ export class SanitizePipe implements PipeTransform { return value; } const constructorFunction = metadata.metatype; - if (!constructorFunction) { + if (!constructorFunction || value instanceof constructorFunction) { return value; } - value = Object.assign(new constructorFunction(), value); try { - sanitize(value); - return value; + return plainToClass(constructorFunction, value); } catch (err) { console.error(err); throw err; diff --git a/src/commons/pub-sub/decorators/inject-pub-sub.decorator.ts b/src/commons/pub-sub/decorators/inject-pub-sub.decorator.ts new file mode 100644 index 0000000..e5a8579 --- /dev/null +++ b/src/commons/pub-sub/decorators/inject-pub-sub.decorator.ts @@ -0,0 +1,5 @@ +import { Inject } from '@nestjs/common'; +import { getPubSubToken } from '../utils/token'; + +export const InjectPubSub = (name?: string): ParameterDecorator => + Inject(getPubSubToken(name)); diff --git a/src/commons/pub-sub/interfaces/pub-sub-async-config.interface.ts b/src/commons/pub-sub/interfaces/pub-sub-async-config.interface.ts index 444fd50..498b822 100644 --- a/src/commons/pub-sub/interfaces/pub-sub-async-config.interface.ts +++ b/src/commons/pub-sub/interfaces/pub-sub-async-config.interface.ts @@ -1,5 +1,5 @@ import { ModuleMetadata } from '@nestjs/common'; -import { PubSubOptions } from 'graphql-subscriptions'; +import { PubSubOptions } from './pub-sub-options.interface'; export interface PubSubAsyncConfig extends Pick { name?: string; diff --git a/src/commons/pub-sub/interfaces/pub-sub-options.interface.ts b/src/commons/pub-sub/interfaces/pub-sub-options.interface.ts index 05d376e..769e479 100644 --- a/src/commons/pub-sub/interfaces/pub-sub-options.interface.ts +++ b/src/commons/pub-sub/interfaces/pub-sub-options.interface.ts @@ -1,6 +1,6 @@ import { RedisOptions } from 'ioredis'; export interface PubSubOptions { - name: string; + name?: string; redis: RedisOptions; } diff --git a/src/commons/pub-sub/pub-sub.constants.ts b/src/commons/pub-sub/pub-sub.constants.ts index 2b8b9c9..eef63e4 100644 --- a/src/commons/pub-sub/pub-sub.constants.ts +++ b/src/commons/pub-sub/pub-sub.constants.ts @@ -1,2 +1 @@ export const DEFAULT_PUB_SUB_NAME = 'default'; -export const PUB_SUB_CONFIG_TOKEN = 'pub_sub_config_token'; diff --git a/src/commons/pub-sub/pub-sub.module.ts b/src/commons/pub-sub/pub-sub.module.ts index 48c949a..a9d06c4 100644 --- a/src/commons/pub-sub/pub-sub.module.ts +++ b/src/commons/pub-sub/pub-sub.module.ts @@ -1,47 +1,48 @@ -import { DynamicModule, Module, Provider } from '@nestjs/common'; +import { DynamicModule, Module } from '@nestjs/common'; import { PubSubService } from './pub-sub.service'; import { - createOptionsProvider, + createAsyncPubSubProviders, createPubSubProvider, } from './pub-sub.providers'; import { PubSubOptions } from './interfaces/pub-sub-options.interface'; import { PubSubAsyncConfig } from './interfaces/pub-sub-async-config.interface'; -import { getPubSubToken } from './utils/token'; -import { PUB_SUB_CONFIG_TOKEN } from './pub-sub.constants'; +import { getPubSubConfigToken } from './utils/token'; @Module({ providers: [PubSubService], }) export class PubSubModule { public static forRoot(options: PubSubOptions): DynamicModule { - const providers = [createPubSubProvider(options)]; - return { - global: true, - module: PubSubModule, - providers, - exports: providers, - }; - } - public static forRootAsync(config: PubSubAsyncConfig) { - const providers: Provider[] = [ - createOptionsProvider(config), + const providers = [ { - provide: getPubSubToken(config.name), - inject: [PUB_SUB_CONFIG_TOKEN], - useFactory: (options: PubSubOptions) => { - return createPubSubProvider({ - name: config.name, - ...options, - }); - }, + provide: getPubSubConfigToken(options.name), + useValue: options, }, + createPubSubProvider(options.name), ]; return { global: true, module: PubSubModule, - imports: config.imports, providers, exports: providers, }; } + public static forRootAsync(...configs: PubSubAsyncConfig[]): DynamicModule { + const providers = createAsyncPubSubProviders(configs); + return { + global: true, + module: PubSubModule, + imports: configs + .map((config) => config.imports) + .flat() + .filter((o, i, a) => a.indexOf(o) === i), + providers, + exports: providers, + }; + } + public static forFeature(): DynamicModule { + return { + module: PubSubModule, + }; + } } diff --git a/src/commons/pub-sub/pub-sub.providers.ts b/src/commons/pub-sub/pub-sub.providers.ts index de14e9c..b131e49 100644 --- a/src/commons/pub-sub/pub-sub.providers.ts +++ b/src/commons/pub-sub/pub-sub.providers.ts @@ -2,22 +2,30 @@ import { Provider } from '@nestjs/common'; import { PubSubAsyncConfig } from './interfaces/pub-sub-async-config.interface'; import { PubSubOptions } from './interfaces/pub-sub-options.interface'; import { PubSub } from './pub-sub'; -import { PUB_SUB_CONFIG_TOKEN } from './pub-sub.constants'; -import { getPubSubToken } from './utils/token'; +import { getPubSubConfigToken, getPubSubToken } from './utils/token'; -export function createPubSubProvider(options: PubSubOptions): Provider { +export function createPubSubProvider(name: string): Provider { return { - provide: getPubSubToken(options.name), - useFactory: () => { - return new PubSub(options); - }, + provide: getPubSubToken(name), + useFactory: (option: PubSubOptions) => new PubSub(option), + inject: [getPubSubConfigToken(name)], }; } export function createOptionsProvider(config: PubSubAsyncConfig): Provider { return { - provide: PUB_SUB_CONFIG_TOKEN, + provide: getPubSubConfigToken(config.name), useFactory: config.useFactory, inject: config.inject || [], }; } +export function createAsyncPubSubProviders( + configs: PubSubAsyncConfig[], +): Provider[] { + return configs + .map((config) => [ + createOptionsProvider(config), + createPubSubProvider(config.name), + ]) + .flat(); +} diff --git a/src/commons/pub-sub/pub-sub.ts b/src/commons/pub-sub/pub-sub.ts index 98853f8..12a3282 100644 --- a/src/commons/pub-sub/pub-sub.ts +++ b/src/commons/pub-sub/pub-sub.ts @@ -10,7 +10,7 @@ import { PubSubRawNextMessage, } from './interfaces/pub-sub-raw-message.interface'; -const log = debug('app:pubsub:instance'); +const log = debug('fennec:pubsub:instance'); export class PubSub extends EventEmitter { pubRedis: Redis; pSubRedis: Redis; diff --git a/src/commons/pub-sub/utils/token.ts b/src/commons/pub-sub/utils/token.ts index b4f6472..de8e6ba 100644 --- a/src/commons/pub-sub/utils/token.ts +++ b/src/commons/pub-sub/utils/token.ts @@ -3,3 +3,6 @@ import { DEFAULT_PUB_SUB_NAME } from '../pub-sub.constants'; export function getPubSubToken(name) { return `app:pub-usb:${name || DEFAULT_PUB_SUB_NAME}`; } +export function getPubSubConfigToken(name) { + return `app:pub-usb:config:${name || DEFAULT_PUB_SUB_NAME}`; +} diff --git a/src/pipeline-tasks/models/pipeline-task-log-message.module.ts b/src/pipeline-tasks/models/pipeline-task-log-message.module.ts index 09bf4ca..215560e 100644 --- a/src/pipeline-tasks/models/pipeline-task-log-message.module.ts +++ b/src/pipeline-tasks/models/pipeline-task-log-message.module.ts @@ -1,6 +1,7 @@ import { PipelineTask } from './../pipeline-task.entity'; import { PipelineUnits } from '../enums/pipeline-units.enum'; import { Field, HideField, ObjectType } from '@nestjs/graphql'; +import { Type } from 'class-transformer'; @ObjectType() export class PipelineTaskLogMessage { @@ -9,6 +10,7 @@ export class PipelineTaskLogMessage { @Field(() => PipelineUnits, { nullable: true }) unit?: PipelineUnits; @Field() + @Type(() => Date) time: Date; @Field() message: string; diff --git a/src/pipeline-tasks/pipeline-task-logs.service.ts b/src/pipeline-tasks/pipeline-task-logs.service.ts index 328ff64..86f915a 100644 --- a/src/pipeline-tasks/pipeline-task-logs.service.ts +++ b/src/pipeline-tasks/pipeline-task-logs.service.ts @@ -1,8 +1,9 @@ import { Injectable } from '@nestjs/common'; -import { log } from 'console'; -import { PubSub } from 'graphql-subscriptions'; +import { observableToAsyncIterable } from 'graphql-tools'; import { RedisService } from 'nestjs-redis'; import { find, omit, propEq } from 'ramda'; +import { InjectPubSub } from '../commons/pub-sub/decorators/inject-pub-sub.decorator'; +import { PubSub } from '../commons/pub-sub/pub-sub'; import { PipelineUnits } from './enums/pipeline-units.enum'; import { TaskStatuses } from './enums/task-statuses.enum'; import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; @@ -13,9 +14,11 @@ const LOG_TIMEOUT_SECONDS = 10_000; @Injectable() export class PipelineTaskLogsService { - constructor(private readonly redisService: RedisService) {} - - pubSub = new PubSub(); + constructor( + private readonly redisService: RedisService, + @InjectPubSub() + private readonly pubSub: PubSub, + ) {} get redis() { return this.redisService.getClient(); @@ -73,6 +76,6 @@ export class PipelineTaskLogsService { } watchLogs(task: PipelineTask) { - return this.pubSub.asyncIterator(this.getKeys(task)); + return observableToAsyncIterable(this.pubSub.message$(this.getKeys(task))); } } diff --git a/src/pipeline-tasks/pipeline-tasks.constants.ts b/src/pipeline-tasks/pipeline-tasks.constants.ts index 29b2c5b..777e284 100644 --- a/src/pipeline-tasks/pipeline-tasks.constants.ts +++ b/src/pipeline-tasks/pipeline-tasks.constants.ts @@ -1,3 +1,2 @@ export const PIPELINE_TASK_QUEUE = 'PIPELINE_TASK_QUEUE'; export const PIPELINE_TASK_LOG_QUEUE = 'PIPELINE_TASK_LOG_QUEUE'; -export const PIPELINE_TASK_LOG_PUBSUB = 'PIPELINE_TASK_LOG_PUBSUB'; diff --git a/src/pipeline-tasks/pipeline-tasks.module.ts b/src/pipeline-tasks/pipeline-tasks.module.ts index 7c84304..a7e5c6c 100644 --- a/src/pipeline-tasks/pipeline-tasks.module.ts +++ b/src/pipeline-tasks/pipeline-tasks.module.ts @@ -8,12 +8,10 @@ import { ReposModule } from '../repos/repos.module'; import { RedisModule } from 'nestjs-redis'; import { BullModule } from '@nestjs/bull'; import { PipelineTaskConsumer } from './pipeline-task.consumer'; -import { - PIPELINE_TASK_QUEUE, - PIPELINE_TASK_LOG_PUBSUB, -} from './pipeline-tasks.constants'; +import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { PipelineTaskLogsService } from './pipeline-task-logs.service'; import { PubSub } from 'apollo-server-express'; +import { PubSubModule } from '../commons/pub-sub/pub-sub.module'; @Module({ imports: [ @@ -21,6 +19,7 @@ import { PubSub } from 'apollo-server-express'; BullModule.registerQueue({ name: PIPELINE_TASK_QUEUE, }), + PubSubModule.forFeature(), RedisModule, ReposModule, ], @@ -29,10 +28,6 @@ import { PubSub } from 'apollo-server-express'; PipelineTasksResolver, PipelineTaskConsumer, PipelineTaskLogsService, - { - provide: Symbol(PIPELINE_TASK_LOG_PUBSUB), - useValue: new PubSub(), - }, ], exports: [PipelineTasksService], }) diff --git a/src/pipeline-tasks/pipeline-tasks.resolver.ts b/src/pipeline-tasks/pipeline-tasks.resolver.ts index 603f39e..8c80313 100644 --- a/src/pipeline-tasks/pipeline-tasks.resolver.ts +++ b/src/pipeline-tasks/pipeline-tasks.resolver.ts @@ -5,6 +5,7 @@ import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input'; import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; import { PipelineTaskLogArgs } from './dtos/pipeline-task-log.args'; import { PipelineTaskLogsService } from './pipeline-task-logs.service'; +import { plainToClass } from 'class-transformer'; @Resolver() export class PipelineTasksResolver { @@ -20,7 +21,8 @@ export class PipelineTasksResolver { @Subscription(() => PipelineTaskLogMessage, { resolve: (value) => { - return value; + const data = plainToClass(PipelineTaskLogMessage, value); + return data; }, }) async pipelineTaskLog(@Args() args: PipelineTaskLogArgs) { diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index 3258af6..ce9e93e 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -9,16 +9,17 @@ import { InjectQueue } from '@nestjs/bull'; import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { Queue } from 'bull'; import { LockFailedException } from '../commons/exceptions/lock-failed.exception'; -import { PubSub } from 'apollo-server-express'; import { TaskStatuses } from './enums/task-statuses.enum'; import { isNil } from 'ramda'; import debug from 'debug'; +import { InjectPubSub } from '../commons/pub-sub/decorators/inject-pub-sub.decorator'; +import { PubSub } from '../commons/pub-sub/pub-sub'; +import { observableToAsyncIterable } from '@graphql-tools/utils'; const log = debug('fennec:pipeline-tasks:service'); @Injectable() export class PipelineTasksService { - pubSub = new PubSub(); constructor( @InjectRepository(PipelineTask) private readonly repository: Repository, @@ -27,6 +28,8 @@ export class PipelineTasksService { @InjectQueue(PIPELINE_TASK_QUEUE) private readonly queue: Queue, private readonly redis: RedisService, + @InjectPubSub() + private readonly pubSub: PubSub, ) {} async addTask(dto: CreatePipelineTaskInput) { const pipeline = await this.pipelineRepository.findOneOrFail({ @@ -118,12 +121,12 @@ export class PipelineTasksService { } async updateTask(task: PipelineTask) { - this.pubSub.publish(task.id, task); + this.pubSub.publish(`task:${task.id}`, task); return await this.repository.save(task); } async watchTaskUpdated(id: string) { - return this.pubSub.asyncIterator(id); + return observableToAsyncIterable(this.pubSub.message$(`task:${id}`)); } getRedisTokens(pipeline: Pipeline): [string, string] { diff --git a/src/pipelines/commit-logs.resolver.ts b/src/pipelines/commit-logs.resolver.ts index 575aa16..df65882 100644 --- a/src/pipelines/commit-logs.resolver.ts +++ b/src/pipelines/commit-logs.resolver.ts @@ -22,11 +22,7 @@ export class CommitLogsResolver { return value; }, }) - async listLogsForPipeline( - @Args('id', { type: () => String }) id: string, - @Info() info: GraphQLResolveInfo, - ) { - info.returnType.toString(); + async listLogsForPipeline(@Args('id', { type: () => String }) id: string) { const job = await this.service.listLogsForPipeline(id); return (async function* () { yield await job.finished();