diff --git a/src/commons/pub-sub/pub-sub.ts b/src/commons/pub-sub/pub-sub.ts index 12a3282..bb51d56 100644 --- a/src/commons/pub-sub/pub-sub.ts +++ b/src/commons/pub-sub/pub-sub.ts @@ -2,7 +2,7 @@ import debug from 'debug'; import { EventEmitter } from 'events'; import IORedis, { Redis } from 'ioredis'; import { from, fromEvent, Observable } from 'rxjs'; -import { filter, map, switchMap, takeWhile, tap } from 'rxjs/operators'; +import { filter, map, share, switchMap, takeWhile, tap } from 'rxjs/operators'; import { ApplicationException } from '../exceptions/application.exception'; import { PubSubOptions } from './interfaces/pub-sub-options.interface'; import { @@ -22,6 +22,10 @@ export class PubSub extends EventEmitter { this.pubRedis = new IORedis(this.options.redis); this.pSubRedis = new IORedis(this.options.redis); + this.pSubRedis.on('pmessage', (...args) => + log.extend('raw')('%s %s %o', ...args), + ); + this.event$ = fromEvent<[string, string, string]>( this.pSubRedis, 'pmessage', @@ -37,6 +41,7 @@ export class PubSub extends EventEmitter { return ev; }), filter((v) => !!v), + share(), ); } @@ -88,7 +93,7 @@ export class PubSub extends EventEmitter { filter(([pattern]) => pattern === channel), tap(([pattern, channel, message]) => { log.extend('subscribe')( - 'channel: %s, match: %, message: %O', + 'channel: %s, match: %s, message: %O', channel, pattern, message, diff --git a/src/pipeline-tasks/pipeline-tasks.module.ts b/src/pipeline-tasks/pipeline-tasks.module.ts index a7e5c6c..1a4e5bb 100644 --- a/src/pipeline-tasks/pipeline-tasks.module.ts +++ b/src/pipeline-tasks/pipeline-tasks.module.ts @@ -10,7 +10,6 @@ import { BullModule } from '@nestjs/bull'; import { PipelineTaskConsumer } from './pipeline-task.consumer'; import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { PipelineTaskLogsService } from './pipeline-task-logs.service'; -import { PubSub } from 'apollo-server-express'; import { PubSubModule } from '../commons/pub-sub/pub-sub.module'; @Module({ diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index ce9e93e..35943fc 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -126,6 +126,7 @@ export class PipelineTasksService { } async watchTaskUpdated(id: string) { + log('watchTaskUpdated %s', id); return observableToAsyncIterable(this.pubSub.message$(`task:${id}`)); } diff --git a/src/pipelines/commit-logs.resolver.ts b/src/pipelines/commit-logs.resolver.ts index df65882..0a55f76 100644 --- a/src/pipelines/commit-logs.resolver.ts +++ b/src/pipelines/commit-logs.resolver.ts @@ -1,12 +1,10 @@ import { Args, - Info, Parent, ResolveField, Resolver, Subscription, } from '@nestjs/graphql'; -import { GraphQLResolveInfo } from 'graphql'; import { PipelineTasksService } from '../pipeline-tasks/pipeline-tasks.service'; import { LogFields, LogList } from '../repos/dtos/log-list.model'; import { PipelinesService } from './pipelines.service';