diff --git a/.vscode/settings.json b/.vscode/settings.json index df9c0fb..4c6ad29 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,6 +6,8 @@ "lpush", "lrange", "metatype", + "pmessage", + "psubscribe", "rpop", "rpush" ] diff --git a/package-lock.json b/package-lock.json index 17c0447..d1d62b5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27,6 +27,7 @@ "debug": "^4.3.1", "graphql": "^15.5.0", "graphql-tools": "^7.0.2", + "ioredis": "^4.25.0", "js-yaml": "^4.0.0", "nestjs-redis": "^1.2.8", "observable-to-async-generator": "^1.0.1-rc", @@ -45,6 +46,7 @@ "@types/body-parser": "^1.19.0", "@types/debug": "^4.1.5", "@types/express": "^4.17.8", + "@types/ioredis": "^4.22.2", "@types/jest": "^26.0.15", "@types/js-yaml": "^4.0.0", "@types/node": "^14.14.6", @@ -3278,9 +3280,9 @@ "integrity": "sha512-2aoSC4UUbHDj2uCsCxcG/vRMXey/m17bC7UwitVm5hn22nI8O8Y9iDpA76Orc+DWkQ4zZrOKEshCqR/jSuXAHA==" }, "node_modules/@types/ioredis": { - "version": "4.22.1", - "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.22.1.tgz", - "integrity": "sha512-GxXT828fkvBeThO68ZJg8cD2haqea5ANBJaxA+UZqLranNkEnQ8N7QLPtykwWbN/sRQz75O7kj+PNmCKF4CEKw==", + "version": "4.22.2", + "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.22.2.tgz", + "integrity": "sha512-32nEh2Eq20xlgtIqVx1kkJkpqh5V1wqMsumK8Y9Izma/vW9k8Ro7sD+y4G6SyCqz8u9+HcssZ6+E83Modta38w==", "dependencies": { "@types/node": "*" } @@ -8553,9 +8555,9 @@ } }, "node_modules/ioredis": { - "version": "4.24.4", - "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.24.4.tgz", - "integrity": "sha512-v28xxBENyTmReC6lVTL7EkAPjVF8cuGtDEjGDi1Z2n7htsC2WdiDceZrCIPeuPiLa6kDFWHb1Y8O0upZROsMgA==", + "version": "4.25.0", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.25.0.tgz", + "integrity": "sha512-UoeqXpZB05aerGD3gB9NiigMsAyph+N+GWH8+3lX1+26caVV03GkL6JoLxS2HCxyvqCWbNsVSZTAp5W12qe23A==", "dependencies": { "cluster-key-slot": "^1.1.0", "debug": "^4.3.1", @@ -18258,9 +18260,9 @@ "integrity": "sha512-2aoSC4UUbHDj2uCsCxcG/vRMXey/m17bC7UwitVm5hn22nI8O8Y9iDpA76Orc+DWkQ4zZrOKEshCqR/jSuXAHA==" }, "@types/ioredis": { - "version": "4.22.1", - "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.22.1.tgz", - "integrity": "sha512-GxXT828fkvBeThO68ZJg8cD2haqea5ANBJaxA+UZqLranNkEnQ8N7QLPtykwWbN/sRQz75O7kj+PNmCKF4CEKw==", + "version": "4.22.2", + "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.22.2.tgz", + "integrity": "sha512-32nEh2Eq20xlgtIqVx1kkJkpqh5V1wqMsumK8Y9Izma/vW9k8Ro7sD+y4G6SyCqz8u9+HcssZ6+E83Modta38w==", "requires": { "@types/node": "*" } @@ -22385,9 +22387,9 @@ "dev": true }, "ioredis": { - "version": "4.24.4", - "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.24.4.tgz", - "integrity": "sha512-v28xxBENyTmReC6lVTL7EkAPjVF8cuGtDEjGDi1Z2n7htsC2WdiDceZrCIPeuPiLa6kDFWHb1Y8O0upZROsMgA==", + "version": "4.25.0", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.25.0.tgz", + "integrity": "sha512-UoeqXpZB05aerGD3gB9NiigMsAyph+N+GWH8+3lX1+26caVV03GkL6JoLxS2HCxyvqCWbNsVSZTAp5W12qe23A==", "requires": { "cluster-key-slot": "^1.1.0", "debug": "^4.3.1", diff --git a/package.json b/package.json index a826c32..e755891 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ "debug": "^4.3.1", "graphql": "^15.5.0", "graphql-tools": "^7.0.2", + "ioredis": "^4.25.0", "js-yaml": "^4.0.0", "nestjs-redis": "^1.2.8", "observable-to-async-generator": "^1.0.1-rc", @@ -58,6 +59,7 @@ "@types/body-parser": "^1.19.0", "@types/debug": "^4.1.5", "@types/express": "^4.17.8", + "@types/ioredis": "^4.22.2", "@types/jest": "^26.0.15", "@types/js-yaml": "^4.0.0", "@types/node": "^14.14.6", diff --git a/src/app.module.ts b/src/app.module.ts index 662f01d..ca21461 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -16,6 +16,7 @@ import { RawBodyMiddleware } from './commons/middlewares/raw-body.middleware'; import { GiteaWebhooksController } from './webhooks/gitea-webhooks.controller'; import { ParseBodyMiddleware } from './commons/middlewares/parse-body.middleware'; import { BullModule } from '@nestjs/bull'; +import { PubSubModule } from './commons/pub-sub/pub-sub.module'; @Module({ imports: [ @@ -51,8 +52,19 @@ import { BullModule } from '@nestjs/bull'; useFactory: (configService: ConfigService) => ({ redis: { host: configService.get('db.redis.host', 'localhost'), - port: configService.get('db.redis.port', 6379), - password: configService.get('db.redis.password', ''), + port: configService.get('db.redis.port', undefined), + password: configService.get('db.redis.password', undefined), + }, + }), + inject: [ConfigService], + }), + PubSubModule.forRootAsync({ + imports: [ConfigModule], + useFactory: (configService: ConfigService) => ({ + redis: { + host: configService.get('db.redis.host', 'localhost'), + port: configService.get('db.redis.port', undefined), + password: configService.get('db.redis.password', undefined), }, }), inject: [ConfigService], diff --git a/src/commons/commons.module.ts b/src/commons/commons.module.ts index 12d0563..476581d 100644 --- a/src/commons/commons.module.ts +++ b/src/commons/commons.module.ts @@ -1,8 +1,10 @@ import { Module } from '@nestjs/common'; import { PasswordConverter } from './services/password-converter'; +import { PubSubModule } from './pub-sub/pub-sub.module'; @Module({ providers: [PasswordConverter], exports: [PasswordConverter], + imports: [PubSubModule], }) export class CommonsModule {} diff --git a/src/commons/pipes/sanitize.pipe.ts b/src/commons/pipes/sanitize.pipe.ts index 2ca94fd..d438f3e 100644 --- a/src/commons/pipes/sanitize.pipe.ts +++ b/src/commons/pipes/sanitize.pipe.ts @@ -1,5 +1,5 @@ import { ArgumentMetadata, Injectable, PipeTransform } from '@nestjs/common'; -import { sanitize } from '@neuralegion/class-sanitizer/dist'; +import { plainToClass } from 'class-transformer'; @Injectable() export class SanitizePipe implements PipeTransform { @@ -12,13 +12,11 @@ export class SanitizePipe implements PipeTransform { return value; } const constructorFunction = metadata.metatype; - if (!constructorFunction) { + if (!constructorFunction || value instanceof constructorFunction) { return value; } - value = Object.assign(new constructorFunction(), value); try { - sanitize(value); - return value; + return plainToClass(constructorFunction, value); } catch (err) { console.error(err); throw err; diff --git a/src/commons/pub-sub/decorators/inject-pub-sub.decorator.ts b/src/commons/pub-sub/decorators/inject-pub-sub.decorator.ts new file mode 100644 index 0000000..e5a8579 --- /dev/null +++ b/src/commons/pub-sub/decorators/inject-pub-sub.decorator.ts @@ -0,0 +1,5 @@ +import { Inject } from '@nestjs/common'; +import { getPubSubToken } from '../utils/token'; + +export const InjectPubSub = (name?: string): ParameterDecorator => + Inject(getPubSubToken(name)); diff --git a/src/commons/pub-sub/interfaces/pub-sub-async-config.interface.ts b/src/commons/pub-sub/interfaces/pub-sub-async-config.interface.ts new file mode 100644 index 0000000..498b822 --- /dev/null +++ b/src/commons/pub-sub/interfaces/pub-sub-async-config.interface.ts @@ -0,0 +1,8 @@ +import { ModuleMetadata } from '@nestjs/common'; +import { PubSubOptions } from './pub-sub-options.interface'; + +export interface PubSubAsyncConfig extends Pick { + name?: string; + useFactory: (...args: any[]) => Promise | PubSubOptions; + inject?: any[]; +} diff --git a/src/commons/pub-sub/interfaces/pub-sub-options.interface.ts b/src/commons/pub-sub/interfaces/pub-sub-options.interface.ts new file mode 100644 index 0000000..769e479 --- /dev/null +++ b/src/commons/pub-sub/interfaces/pub-sub-options.interface.ts @@ -0,0 +1,6 @@ +import { RedisOptions } from 'ioredis'; + +export interface PubSubOptions { + name?: string; + redis: RedisOptions; +} diff --git a/src/commons/pub-sub/interfaces/pub-sub-raw-message.interface.ts b/src/commons/pub-sub/interfaces/pub-sub-raw-message.interface.ts new file mode 100644 index 0000000..5522719 --- /dev/null +++ b/src/commons/pub-sub/interfaces/pub-sub-raw-message.interface.ts @@ -0,0 +1,15 @@ +export type PubSubRawMessage = + | PubSubRawNextMessage + | PubSubRawErrorMessage + | PubSubRawCompleteMessage; +export interface PubSubRawNextMessage { + type: 'next'; + message: T; +} +export interface PubSubRawErrorMessage { + type: 'error'; + error: string; +} +export interface PubSubRawCompleteMessage { + type: 'complete'; +} diff --git a/src/commons/pub-sub/pub-sub.constants.ts b/src/commons/pub-sub/pub-sub.constants.ts new file mode 100644 index 0000000..eef63e4 --- /dev/null +++ b/src/commons/pub-sub/pub-sub.constants.ts @@ -0,0 +1 @@ +export const DEFAULT_PUB_SUB_NAME = 'default'; diff --git a/src/commons/pub-sub/pub-sub.module.ts b/src/commons/pub-sub/pub-sub.module.ts new file mode 100644 index 0000000..a9d06c4 --- /dev/null +++ b/src/commons/pub-sub/pub-sub.module.ts @@ -0,0 +1,48 @@ +import { DynamicModule, Module } from '@nestjs/common'; +import { PubSubService } from './pub-sub.service'; +import { + createAsyncPubSubProviders, + createPubSubProvider, +} from './pub-sub.providers'; +import { PubSubOptions } from './interfaces/pub-sub-options.interface'; +import { PubSubAsyncConfig } from './interfaces/pub-sub-async-config.interface'; +import { getPubSubConfigToken } from './utils/token'; + +@Module({ + providers: [PubSubService], +}) +export class PubSubModule { + public static forRoot(options: PubSubOptions): DynamicModule { + const providers = [ + { + provide: getPubSubConfigToken(options.name), + useValue: options, + }, + createPubSubProvider(options.name), + ]; + return { + global: true, + module: PubSubModule, + providers, + exports: providers, + }; + } + public static forRootAsync(...configs: PubSubAsyncConfig[]): DynamicModule { + const providers = createAsyncPubSubProviders(configs); + return { + global: true, + module: PubSubModule, + imports: configs + .map((config) => config.imports) + .flat() + .filter((o, i, a) => a.indexOf(o) === i), + providers, + exports: providers, + }; + } + public static forFeature(): DynamicModule { + return { + module: PubSubModule, + }; + } +} diff --git a/src/commons/pub-sub/pub-sub.providers.ts b/src/commons/pub-sub/pub-sub.providers.ts new file mode 100644 index 0000000..b131e49 --- /dev/null +++ b/src/commons/pub-sub/pub-sub.providers.ts @@ -0,0 +1,31 @@ +import { Provider } from '@nestjs/common'; +import { PubSubAsyncConfig } from './interfaces/pub-sub-async-config.interface'; +import { PubSubOptions } from './interfaces/pub-sub-options.interface'; +import { PubSub } from './pub-sub'; +import { getPubSubConfigToken, getPubSubToken } from './utils/token'; + +export function createPubSubProvider(name: string): Provider { + return { + provide: getPubSubToken(name), + useFactory: (option: PubSubOptions) => new PubSub(option), + inject: [getPubSubConfigToken(name)], + }; +} + +export function createOptionsProvider(config: PubSubAsyncConfig): Provider { + return { + provide: getPubSubConfigToken(config.name), + useFactory: config.useFactory, + inject: config.inject || [], + }; +} +export function createAsyncPubSubProviders( + configs: PubSubAsyncConfig[], +): Provider[] { + return configs + .map((config) => [ + createOptionsProvider(config), + createPubSubProvider(config.name), + ]) + .flat(); +} diff --git a/src/commons/pub-sub/pub-sub.service.spec.ts b/src/commons/pub-sub/pub-sub.service.spec.ts new file mode 100644 index 0000000..652ea12 --- /dev/null +++ b/src/commons/pub-sub/pub-sub.service.spec.ts @@ -0,0 +1,18 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { PubSubService } from './pub-sub.service'; + +describe('PubsubService', () => { + let service: PubSubService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [PubSubService], + }).compile(); + + service = module.get(PubSubService); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); +}); diff --git a/src/commons/pub-sub/pub-sub.service.ts b/src/commons/pub-sub/pub-sub.service.ts new file mode 100644 index 0000000..12b7c29 --- /dev/null +++ b/src/commons/pub-sub/pub-sub.service.ts @@ -0,0 +1,9 @@ +import { Injectable } from '@nestjs/common'; +import { PubSubOptions } from './interfaces/pub-sub-options.interface'; + +@Injectable() +export class PubSubService { + private options = new Map(); + private pubClient; + private; +} diff --git a/src/commons/pub-sub/pub-sub.spec.ts b/src/commons/pub-sub/pub-sub.spec.ts new file mode 100644 index 0000000..b988b9d --- /dev/null +++ b/src/commons/pub-sub/pub-sub.spec.ts @@ -0,0 +1,87 @@ +import debug from 'debug'; +import { tap } from 'rxjs/operators'; +import { PubSub } from './pub-sub'; + +debug.enable('app:pubsub:*'); + +describe('PubSub', () => { + let instance: PubSub; + + beforeEach(async () => { + instance = new PubSub({ + name: 'default', + redis: { + host: 'localhost', + }, + }); + }); + + it('should be defined', () => { + expect(instance).toBeDefined(); + }); + + it('should can send and receive message', async () => { + const arr = new Array(10) + .fill(undefined) + .map(() => Math.random().toString(36).slice(2, 8)); + const results: string[] = []; + instance + .message$('test') + .pipe( + tap((val) => { + console.log(val); + }), + ) + .subscribe((val) => results.push(val)); + await new Promise((r) => setTimeout(r, 1000)); + await Promise.all([...arr.map((str) => instance.publish('test', str))]); + await new Promise((r) => setTimeout(r, 1000)); + expect(results).toMatchObject(arr); + }); + + it('should complete', async () => { + const arr = new Array(10) + .fill(undefined) + .map(() => Math.random().toString(36).slice(2, 8)); + const results: string[] = []; + instance + .message$('test') + .pipe( + tap((val) => { + console.log(val); + }), + ) + .subscribe((val) => results.push(val)); + await new Promise((r) => setTimeout(r, 1000)); + await Promise.all([...arr.map((str) => instance.publish('test', str))]); + await instance.finish('test'); + await Promise.all([...arr.map((str) => instance.publish('test', str))]); + await new Promise((r) => setTimeout(r, 1000)); + expect(results).toMatchObject(arr); + }); + it('should error', async () => { + const arr = new Array(10) + .fill(undefined) + .map(() => Math.random().toString(36).slice(2, 8)); + const results: string[] = []; + let error: string; + instance + .message$('test') + .pipe( + tap((val) => { + console.log(val); + }), + ) + .subscribe({ + next: (val) => results.push(val), + error: (err) => (error = err.message), + }); + await new Promise((r) => setTimeout(r, 1000)); + await Promise.all([...arr.map((str) => instance.publish('test', str))]); + await instance.throwError('test', 'TEST ERROR MESSAGE'); + await Promise.all([...arr.map((str) => instance.publish('test', str))]); + await new Promise((r) => setTimeout(r, 1000)); + expect(results).toMatchObject(arr); + expect(error).toEqual('TEST ERROR MESSAGE'); + }); +}); diff --git a/src/commons/pub-sub/pub-sub.ts b/src/commons/pub-sub/pub-sub.ts new file mode 100644 index 0000000..bb51d56 --- /dev/null +++ b/src/commons/pub-sub/pub-sub.ts @@ -0,0 +1,115 @@ +import debug from 'debug'; +import { EventEmitter } from 'events'; +import IORedis, { Redis } from 'ioredis'; +import { from, fromEvent, Observable } from 'rxjs'; +import { filter, map, share, switchMap, takeWhile, tap } from 'rxjs/operators'; +import { ApplicationException } from '../exceptions/application.exception'; +import { PubSubOptions } from './interfaces/pub-sub-options.interface'; +import { + PubSubRawMessage, + PubSubRawNextMessage, +} from './interfaces/pub-sub-raw-message.interface'; + +const log = debug('fennec:pubsub:instance'); +export class PubSub extends EventEmitter { + pubRedis: Redis; + pSubRedis: Redis; + channels: string[] = []; + event$: Observable<[string, string, any]>; + + constructor(private readonly options: PubSubOptions) { + super(); + this.pubRedis = new IORedis(this.options.redis); + this.pSubRedis = new IORedis(this.options.redis); + + this.pSubRedis.on('pmessage', (...args) => + log.extend('raw')('%s %s %o', ...args), + ); + + this.event$ = fromEvent<[string, string, string]>( + this.pSubRedis, + 'pmessage', + ).pipe( + map((ev) => { + try { + ev[2] = JSON.parse(ev[2]); + } catch (err) { + log('WARN: is not json'); + return null; + } + log('on message: %s %s %o', ...ev); + return ev; + }), + filter((v) => !!v), + share(), + ); + } + + async disconnect() { + log('disconnecting to redis...'); + this.pubRedis.disconnect(); + this.pSubRedis.disconnect(); + log('disconnected'); + } + + async registerChannel(channel: string) { + if (this.channels.includes(channel)) { + return; + } + this.channels.push(channel); + + return await this.pSubRedis.psubscribe(channel); + } + + private async redisPublish(channel: string, message: PubSubRawMessage) { + log.extend('publish')('channel: %s, message: %O', channel, message); + return await this.pubRedis.publish(channel, JSON.stringify(message)); + } + + async publish(channel: string, message: any): Promise { + return await this.redisPublish(channel, { + type: 'next', + message, + }); + } + + async finish(channel: string): Promise { + return await this.redisPublish(channel, { + type: 'complete', + }); + } + + async throwError(channel: string, error: string): Promise { + return await this.redisPublish(channel, { + type: 'error', + error, + }); + } + + message$ = (channel: string): Observable => { + return from(this.registerChannel(channel)) + .pipe(switchMap(() => this.event$)) + .pipe( + filter(([pattern]) => pattern === channel), + tap(([pattern, channel, message]) => { + log.extend('subscribe')( + 'channel: %s, match: %s, message: %O', + channel, + pattern, + message, + ); + }), + takeWhile(([pattern, channel, message]) => { + if (pattern === channel) { + if (message.type === 'error') { + throw new ApplicationException(message.error); + } + return message.type !== 'complete'; + } + return true; + }), + map((ev) => ev[2] as PubSubRawMessage), + map((message: PubSubRawNextMessage) => message.message), + ); + }; +} diff --git a/src/commons/pub-sub/utils/token.ts b/src/commons/pub-sub/utils/token.ts new file mode 100644 index 0000000..de8e6ba --- /dev/null +++ b/src/commons/pub-sub/utils/token.ts @@ -0,0 +1,8 @@ +import { DEFAULT_PUB_SUB_NAME } from '../pub-sub.constants'; + +export function getPubSubToken(name) { + return `app:pub-usb:${name || DEFAULT_PUB_SUB_NAME}`; +} +export function getPubSubConfigToken(name) { + return `app:pub-usb:config:${name || DEFAULT_PUB_SUB_NAME}`; +} diff --git a/src/pipeline-tasks/models/pipeline-task-log-message.module.ts b/src/pipeline-tasks/models/pipeline-task-log-message.module.ts index 09bf4ca..215560e 100644 --- a/src/pipeline-tasks/models/pipeline-task-log-message.module.ts +++ b/src/pipeline-tasks/models/pipeline-task-log-message.module.ts @@ -1,6 +1,7 @@ import { PipelineTask } from './../pipeline-task.entity'; import { PipelineUnits } from '../enums/pipeline-units.enum'; import { Field, HideField, ObjectType } from '@nestjs/graphql'; +import { Type } from 'class-transformer'; @ObjectType() export class PipelineTaskLogMessage { @@ -9,6 +10,7 @@ export class PipelineTaskLogMessage { @Field(() => PipelineUnits, { nullable: true }) unit?: PipelineUnits; @Field() + @Type(() => Date) time: Date; @Field() message: string; diff --git a/src/pipeline-tasks/pipeline-task-logs.service.ts b/src/pipeline-tasks/pipeline-task-logs.service.ts index 328ff64..86f915a 100644 --- a/src/pipeline-tasks/pipeline-task-logs.service.ts +++ b/src/pipeline-tasks/pipeline-task-logs.service.ts @@ -1,8 +1,9 @@ import { Injectable } from '@nestjs/common'; -import { log } from 'console'; -import { PubSub } from 'graphql-subscriptions'; +import { observableToAsyncIterable } from 'graphql-tools'; import { RedisService } from 'nestjs-redis'; import { find, omit, propEq } from 'ramda'; +import { InjectPubSub } from '../commons/pub-sub/decorators/inject-pub-sub.decorator'; +import { PubSub } from '../commons/pub-sub/pub-sub'; import { PipelineUnits } from './enums/pipeline-units.enum'; import { TaskStatuses } from './enums/task-statuses.enum'; import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; @@ -13,9 +14,11 @@ const LOG_TIMEOUT_SECONDS = 10_000; @Injectable() export class PipelineTaskLogsService { - constructor(private readonly redisService: RedisService) {} - - pubSub = new PubSub(); + constructor( + private readonly redisService: RedisService, + @InjectPubSub() + private readonly pubSub: PubSub, + ) {} get redis() { return this.redisService.getClient(); @@ -73,6 +76,6 @@ export class PipelineTaskLogsService { } watchLogs(task: PipelineTask) { - return this.pubSub.asyncIterator(this.getKeys(task)); + return observableToAsyncIterable(this.pubSub.message$(this.getKeys(task))); } } diff --git a/src/pipeline-tasks/pipeline-tasks.constants.ts b/src/pipeline-tasks/pipeline-tasks.constants.ts index 29b2c5b..777e284 100644 --- a/src/pipeline-tasks/pipeline-tasks.constants.ts +++ b/src/pipeline-tasks/pipeline-tasks.constants.ts @@ -1,3 +1,2 @@ export const PIPELINE_TASK_QUEUE = 'PIPELINE_TASK_QUEUE'; export const PIPELINE_TASK_LOG_QUEUE = 'PIPELINE_TASK_LOG_QUEUE'; -export const PIPELINE_TASK_LOG_PUBSUB = 'PIPELINE_TASK_LOG_PUBSUB'; diff --git a/src/pipeline-tasks/pipeline-tasks.module.ts b/src/pipeline-tasks/pipeline-tasks.module.ts index 7c84304..1a4e5bb 100644 --- a/src/pipeline-tasks/pipeline-tasks.module.ts +++ b/src/pipeline-tasks/pipeline-tasks.module.ts @@ -8,12 +8,9 @@ import { ReposModule } from '../repos/repos.module'; import { RedisModule } from 'nestjs-redis'; import { BullModule } from '@nestjs/bull'; import { PipelineTaskConsumer } from './pipeline-task.consumer'; -import { - PIPELINE_TASK_QUEUE, - PIPELINE_TASK_LOG_PUBSUB, -} from './pipeline-tasks.constants'; +import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { PipelineTaskLogsService } from './pipeline-task-logs.service'; -import { PubSub } from 'apollo-server-express'; +import { PubSubModule } from '../commons/pub-sub/pub-sub.module'; @Module({ imports: [ @@ -21,6 +18,7 @@ import { PubSub } from 'apollo-server-express'; BullModule.registerQueue({ name: PIPELINE_TASK_QUEUE, }), + PubSubModule.forFeature(), RedisModule, ReposModule, ], @@ -29,10 +27,6 @@ import { PubSub } from 'apollo-server-express'; PipelineTasksResolver, PipelineTaskConsumer, PipelineTaskLogsService, - { - provide: Symbol(PIPELINE_TASK_LOG_PUBSUB), - useValue: new PubSub(), - }, ], exports: [PipelineTasksService], }) diff --git a/src/pipeline-tasks/pipeline-tasks.resolver.ts b/src/pipeline-tasks/pipeline-tasks.resolver.ts index 603f39e..8c80313 100644 --- a/src/pipeline-tasks/pipeline-tasks.resolver.ts +++ b/src/pipeline-tasks/pipeline-tasks.resolver.ts @@ -5,6 +5,7 @@ import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input'; import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; import { PipelineTaskLogArgs } from './dtos/pipeline-task-log.args'; import { PipelineTaskLogsService } from './pipeline-task-logs.service'; +import { plainToClass } from 'class-transformer'; @Resolver() export class PipelineTasksResolver { @@ -20,7 +21,8 @@ export class PipelineTasksResolver { @Subscription(() => PipelineTaskLogMessage, { resolve: (value) => { - return value; + const data = plainToClass(PipelineTaskLogMessage, value); + return data; }, }) async pipelineTaskLog(@Args() args: PipelineTaskLogArgs) { diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index 3258af6..35943fc 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -9,16 +9,17 @@ import { InjectQueue } from '@nestjs/bull'; import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { Queue } from 'bull'; import { LockFailedException } from '../commons/exceptions/lock-failed.exception'; -import { PubSub } from 'apollo-server-express'; import { TaskStatuses } from './enums/task-statuses.enum'; import { isNil } from 'ramda'; import debug from 'debug'; +import { InjectPubSub } from '../commons/pub-sub/decorators/inject-pub-sub.decorator'; +import { PubSub } from '../commons/pub-sub/pub-sub'; +import { observableToAsyncIterable } from '@graphql-tools/utils'; const log = debug('fennec:pipeline-tasks:service'); @Injectable() export class PipelineTasksService { - pubSub = new PubSub(); constructor( @InjectRepository(PipelineTask) private readonly repository: Repository, @@ -27,6 +28,8 @@ export class PipelineTasksService { @InjectQueue(PIPELINE_TASK_QUEUE) private readonly queue: Queue, private readonly redis: RedisService, + @InjectPubSub() + private readonly pubSub: PubSub, ) {} async addTask(dto: CreatePipelineTaskInput) { const pipeline = await this.pipelineRepository.findOneOrFail({ @@ -118,12 +121,13 @@ export class PipelineTasksService { } async updateTask(task: PipelineTask) { - this.pubSub.publish(task.id, task); + this.pubSub.publish(`task:${task.id}`, task); return await this.repository.save(task); } async watchTaskUpdated(id: string) { - return this.pubSub.asyncIterator(id); + log('watchTaskUpdated %s', id); + return observableToAsyncIterable(this.pubSub.message$(`task:${id}`)); } getRedisTokens(pipeline: Pipeline): [string, string] { diff --git a/src/pipelines/commit-logs.resolver.ts b/src/pipelines/commit-logs.resolver.ts index 575aa16..0a55f76 100644 --- a/src/pipelines/commit-logs.resolver.ts +++ b/src/pipelines/commit-logs.resolver.ts @@ -1,12 +1,10 @@ import { Args, - Info, Parent, ResolveField, Resolver, Subscription, } from '@nestjs/graphql'; -import { GraphQLResolveInfo } from 'graphql'; import { PipelineTasksService } from '../pipeline-tasks/pipeline-tasks.service'; import { LogFields, LogList } from '../repos/dtos/log-list.model'; import { PipelinesService } from './pipelines.service'; @@ -22,11 +20,7 @@ export class CommitLogsResolver { return value; }, }) - async listLogsForPipeline( - @Args('id', { type: () => String }) id: string, - @Info() info: GraphQLResolveInfo, - ) { - info.returnType.toString(); + async listLogsForPipeline(@Args('id', { type: () => String }) id: string) { const job = await this.service.listLogsForPipeline(id); return (async function* () { yield await job.finished();