feat-pub-sub #3

Merged
Ivan merged 3 commits from feat-pub-sub into master 2021-04-04 13:29:00 +08:00
4 changed files with 8 additions and 5 deletions
Showing only changes of commit 46fb41f856 - Show all commits

View File

@ -2,7 +2,7 @@ import debug from 'debug';
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import IORedis, { Redis } from 'ioredis'; import IORedis, { Redis } from 'ioredis';
import { from, fromEvent, Observable } from 'rxjs'; import { from, fromEvent, Observable } from 'rxjs';
import { filter, map, switchMap, takeWhile, tap } from 'rxjs/operators'; import { filter, map, share, switchMap, takeWhile, tap } from 'rxjs/operators';
import { ApplicationException } from '../exceptions/application.exception'; import { ApplicationException } from '../exceptions/application.exception';
import { PubSubOptions } from './interfaces/pub-sub-options.interface'; import { PubSubOptions } from './interfaces/pub-sub-options.interface';
import { import {
@ -22,6 +22,10 @@ export class PubSub extends EventEmitter {
this.pubRedis = new IORedis(this.options.redis); this.pubRedis = new IORedis(this.options.redis);
this.pSubRedis = new IORedis(this.options.redis); this.pSubRedis = new IORedis(this.options.redis);
this.pSubRedis.on('pmessage', (...args) =>
log.extend('raw')('%s %s %o', ...args),
);
this.event$ = fromEvent<[string, string, string]>( this.event$ = fromEvent<[string, string, string]>(
this.pSubRedis, this.pSubRedis,
'pmessage', 'pmessage',
@ -37,6 +41,7 @@ export class PubSub extends EventEmitter {
return ev; return ev;
}), }),
filter((v) => !!v), filter((v) => !!v),
share(),
); );
} }
@ -88,7 +93,7 @@ export class PubSub extends EventEmitter {
filter(([pattern]) => pattern === channel), filter(([pattern]) => pattern === channel),
tap(([pattern, channel, message]) => { tap(([pattern, channel, message]) => {
log.extend('subscribe')( log.extend('subscribe')(
'channel: %s, match: %, message: %O', 'channel: %s, match: %s, message: %O',
channel, channel,
pattern, pattern,
message, message,

View File

@ -10,7 +10,6 @@ import { BullModule } from '@nestjs/bull';
import { PipelineTaskConsumer } from './pipeline-task.consumer'; import { PipelineTaskConsumer } from './pipeline-task.consumer';
import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants'; import { PIPELINE_TASK_QUEUE } from './pipeline-tasks.constants';
import { PipelineTaskLogsService } from './pipeline-task-logs.service'; import { PipelineTaskLogsService } from './pipeline-task-logs.service';
import { PubSub } from 'apollo-server-express';
import { PubSubModule } from '../commons/pub-sub/pub-sub.module'; import { PubSubModule } from '../commons/pub-sub/pub-sub.module';
@Module({ @Module({

View File

@ -126,6 +126,7 @@ export class PipelineTasksService {
} }
async watchTaskUpdated(id: string) { async watchTaskUpdated(id: string) {
log('watchTaskUpdated %s', id);
return observableToAsyncIterable(this.pubSub.message$(`task:${id}`)); return observableToAsyncIterable(this.pubSub.message$(`task:${id}`));
} }

View File

@ -1,12 +1,10 @@
import { import {
Args, Args,
Info,
Parent, Parent,
ResolveField, ResolveField,
Resolver, Resolver,
Subscription, Subscription,
} from '@nestjs/graphql'; } from '@nestjs/graphql';
import { GraphQLResolveInfo } from 'graphql';
import { PipelineTasksService } from '../pipeline-tasks/pipeline-tasks.service'; import { PipelineTasksService } from '../pipeline-tasks/pipeline-tasks.service';
import { LogFields, LogList } from '../repos/dtos/log-list.model'; import { LogFields, LogList } from '../repos/dtos/log-list.model';
import { PipelinesService } from './pipelines.service'; import { PipelinesService } from './pipelines.service';