From bb3efd37145243bbd2f932515fe3763dd5097d9c Mon Sep 17 00:00:00 2001 From: Ivan Date: Sat, 3 Apr 2021 19:19:02 +0800 Subject: [PATCH] feat: pubsub base redis. --- .vscode/settings.json | 2 + package-lock.json | 26 +++-- package.json | 2 + src/commons/commons.module.ts | 2 + .../pub-sub-async-config.interface.ts | 8 ++ .../interfaces/pub-sub-options.interface.ts | 6 + .../pub-sub-raw-message.interface.ts | 15 +++ src/commons/pub-sub/pub-sub.constants.ts | 2 + src/commons/pub-sub/pub-sub.module.ts | 47 ++++++++ src/commons/pub-sub/pub-sub.providers.ts | 23 ++++ src/commons/pub-sub/pub-sub.service.spec.ts | 18 +++ src/commons/pub-sub/pub-sub.service.ts | 9 ++ src/commons/pub-sub/pub-sub.spec.ts | 87 ++++++++++++++ src/commons/pub-sub/pub-sub.ts | 110 ++++++++++++++++++ src/commons/pub-sub/utils/token.ts | 5 + 15 files changed, 350 insertions(+), 12 deletions(-) create mode 100644 src/commons/pub-sub/interfaces/pub-sub-async-config.interface.ts create mode 100644 src/commons/pub-sub/interfaces/pub-sub-options.interface.ts create mode 100644 src/commons/pub-sub/interfaces/pub-sub-raw-message.interface.ts create mode 100644 src/commons/pub-sub/pub-sub.constants.ts create mode 100644 src/commons/pub-sub/pub-sub.module.ts create mode 100644 src/commons/pub-sub/pub-sub.providers.ts create mode 100644 src/commons/pub-sub/pub-sub.service.spec.ts create mode 100644 src/commons/pub-sub/pub-sub.service.ts create mode 100644 src/commons/pub-sub/pub-sub.spec.ts create mode 100644 src/commons/pub-sub/pub-sub.ts create mode 100644 src/commons/pub-sub/utils/token.ts diff --git a/.vscode/settings.json b/.vscode/settings.json index df9c0fb..4c6ad29 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,6 +6,8 @@ "lpush", "lrange", "metatype", + "pmessage", + "psubscribe", "rpop", "rpush" ] diff --git a/package-lock.json b/package-lock.json index 17c0447..d1d62b5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27,6 +27,7 @@ "debug": "^4.3.1", "graphql": "^15.5.0", "graphql-tools": "^7.0.2", + "ioredis": "^4.25.0", "js-yaml": "^4.0.0", "nestjs-redis": "^1.2.8", "observable-to-async-generator": "^1.0.1-rc", @@ -45,6 +46,7 @@ "@types/body-parser": "^1.19.0", "@types/debug": "^4.1.5", "@types/express": "^4.17.8", + "@types/ioredis": "^4.22.2", "@types/jest": "^26.0.15", "@types/js-yaml": "^4.0.0", "@types/node": "^14.14.6", @@ -3278,9 +3280,9 @@ "integrity": "sha512-2aoSC4UUbHDj2uCsCxcG/vRMXey/m17bC7UwitVm5hn22nI8O8Y9iDpA76Orc+DWkQ4zZrOKEshCqR/jSuXAHA==" }, "node_modules/@types/ioredis": { - "version": "4.22.1", - "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.22.1.tgz", - "integrity": "sha512-GxXT828fkvBeThO68ZJg8cD2haqea5ANBJaxA+UZqLranNkEnQ8N7QLPtykwWbN/sRQz75O7kj+PNmCKF4CEKw==", + "version": "4.22.2", + "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.22.2.tgz", + "integrity": "sha512-32nEh2Eq20xlgtIqVx1kkJkpqh5V1wqMsumK8Y9Izma/vW9k8Ro7sD+y4G6SyCqz8u9+HcssZ6+E83Modta38w==", "dependencies": { "@types/node": "*" } @@ -8553,9 +8555,9 @@ } }, "node_modules/ioredis": { - "version": "4.24.4", - "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.24.4.tgz", - "integrity": "sha512-v28xxBENyTmReC6lVTL7EkAPjVF8cuGtDEjGDi1Z2n7htsC2WdiDceZrCIPeuPiLa6kDFWHb1Y8O0upZROsMgA==", + "version": "4.25.0", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.25.0.tgz", + "integrity": "sha512-UoeqXpZB05aerGD3gB9NiigMsAyph+N+GWH8+3lX1+26caVV03GkL6JoLxS2HCxyvqCWbNsVSZTAp5W12qe23A==", "dependencies": { "cluster-key-slot": "^1.1.0", "debug": "^4.3.1", @@ -18258,9 +18260,9 @@ "integrity": "sha512-2aoSC4UUbHDj2uCsCxcG/vRMXey/m17bC7UwitVm5hn22nI8O8Y9iDpA76Orc+DWkQ4zZrOKEshCqR/jSuXAHA==" }, "@types/ioredis": { - "version": "4.22.1", - "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.22.1.tgz", - "integrity": "sha512-GxXT828fkvBeThO68ZJg8cD2haqea5ANBJaxA+UZqLranNkEnQ8N7QLPtykwWbN/sRQz75O7kj+PNmCKF4CEKw==", + "version": "4.22.2", + "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.22.2.tgz", + "integrity": "sha512-32nEh2Eq20xlgtIqVx1kkJkpqh5V1wqMsumK8Y9Izma/vW9k8Ro7sD+y4G6SyCqz8u9+HcssZ6+E83Modta38w==", "requires": { "@types/node": "*" } @@ -22385,9 +22387,9 @@ "dev": true }, "ioredis": { - "version": "4.24.4", - "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.24.4.tgz", - "integrity": "sha512-v28xxBENyTmReC6lVTL7EkAPjVF8cuGtDEjGDi1Z2n7htsC2WdiDceZrCIPeuPiLa6kDFWHb1Y8O0upZROsMgA==", + "version": "4.25.0", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.25.0.tgz", + "integrity": "sha512-UoeqXpZB05aerGD3gB9NiigMsAyph+N+GWH8+3lX1+26caVV03GkL6JoLxS2HCxyvqCWbNsVSZTAp5W12qe23A==", "requires": { "cluster-key-slot": "^1.1.0", "debug": "^4.3.1", diff --git a/package.json b/package.json index a826c32..e755891 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ "debug": "^4.3.1", "graphql": "^15.5.0", "graphql-tools": "^7.0.2", + "ioredis": "^4.25.0", "js-yaml": "^4.0.0", "nestjs-redis": "^1.2.8", "observable-to-async-generator": "^1.0.1-rc", @@ -58,6 +59,7 @@ "@types/body-parser": "^1.19.0", "@types/debug": "^4.1.5", "@types/express": "^4.17.8", + "@types/ioredis": "^4.22.2", "@types/jest": "^26.0.15", "@types/js-yaml": "^4.0.0", "@types/node": "^14.14.6", diff --git a/src/commons/commons.module.ts b/src/commons/commons.module.ts index 12d0563..476581d 100644 --- a/src/commons/commons.module.ts +++ b/src/commons/commons.module.ts @@ -1,8 +1,10 @@ import { Module } from '@nestjs/common'; import { PasswordConverter } from './services/password-converter'; +import { PubSubModule } from './pub-sub/pub-sub.module'; @Module({ providers: [PasswordConverter], exports: [PasswordConverter], + imports: [PubSubModule], }) export class CommonsModule {} diff --git a/src/commons/pub-sub/interfaces/pub-sub-async-config.interface.ts b/src/commons/pub-sub/interfaces/pub-sub-async-config.interface.ts new file mode 100644 index 0000000..444fd50 --- /dev/null +++ b/src/commons/pub-sub/interfaces/pub-sub-async-config.interface.ts @@ -0,0 +1,8 @@ +import { ModuleMetadata } from '@nestjs/common'; +import { PubSubOptions } from 'graphql-subscriptions'; + +export interface PubSubAsyncConfig extends Pick { + name?: string; + useFactory: (...args: any[]) => Promise | PubSubOptions; + inject?: any[]; +} diff --git a/src/commons/pub-sub/interfaces/pub-sub-options.interface.ts b/src/commons/pub-sub/interfaces/pub-sub-options.interface.ts new file mode 100644 index 0000000..05d376e --- /dev/null +++ b/src/commons/pub-sub/interfaces/pub-sub-options.interface.ts @@ -0,0 +1,6 @@ +import { RedisOptions } from 'ioredis'; + +export interface PubSubOptions { + name: string; + redis: RedisOptions; +} diff --git a/src/commons/pub-sub/interfaces/pub-sub-raw-message.interface.ts b/src/commons/pub-sub/interfaces/pub-sub-raw-message.interface.ts new file mode 100644 index 0000000..5522719 --- /dev/null +++ b/src/commons/pub-sub/interfaces/pub-sub-raw-message.interface.ts @@ -0,0 +1,15 @@ +export type PubSubRawMessage = + | PubSubRawNextMessage + | PubSubRawErrorMessage + | PubSubRawCompleteMessage; +export interface PubSubRawNextMessage { + type: 'next'; + message: T; +} +export interface PubSubRawErrorMessage { + type: 'error'; + error: string; +} +export interface PubSubRawCompleteMessage { + type: 'complete'; +} diff --git a/src/commons/pub-sub/pub-sub.constants.ts b/src/commons/pub-sub/pub-sub.constants.ts new file mode 100644 index 0000000..2b8b9c9 --- /dev/null +++ b/src/commons/pub-sub/pub-sub.constants.ts @@ -0,0 +1,2 @@ +export const DEFAULT_PUB_SUB_NAME = 'default'; +export const PUB_SUB_CONFIG_TOKEN = 'pub_sub_config_token'; diff --git a/src/commons/pub-sub/pub-sub.module.ts b/src/commons/pub-sub/pub-sub.module.ts new file mode 100644 index 0000000..48c949a --- /dev/null +++ b/src/commons/pub-sub/pub-sub.module.ts @@ -0,0 +1,47 @@ +import { DynamicModule, Module, Provider } from '@nestjs/common'; +import { PubSubService } from './pub-sub.service'; +import { + createOptionsProvider, + createPubSubProvider, +} from './pub-sub.providers'; +import { PubSubOptions } from './interfaces/pub-sub-options.interface'; +import { PubSubAsyncConfig } from './interfaces/pub-sub-async-config.interface'; +import { getPubSubToken } from './utils/token'; +import { PUB_SUB_CONFIG_TOKEN } from './pub-sub.constants'; + +@Module({ + providers: [PubSubService], +}) +export class PubSubModule { + public static forRoot(options: PubSubOptions): DynamicModule { + const providers = [createPubSubProvider(options)]; + return { + global: true, + module: PubSubModule, + providers, + exports: providers, + }; + } + public static forRootAsync(config: PubSubAsyncConfig) { + const providers: Provider[] = [ + createOptionsProvider(config), + { + provide: getPubSubToken(config.name), + inject: [PUB_SUB_CONFIG_TOKEN], + useFactory: (options: PubSubOptions) => { + return createPubSubProvider({ + name: config.name, + ...options, + }); + }, + }, + ]; + return { + global: true, + module: PubSubModule, + imports: config.imports, + providers, + exports: providers, + }; + } +} diff --git a/src/commons/pub-sub/pub-sub.providers.ts b/src/commons/pub-sub/pub-sub.providers.ts new file mode 100644 index 0000000..de14e9c --- /dev/null +++ b/src/commons/pub-sub/pub-sub.providers.ts @@ -0,0 +1,23 @@ +import { Provider } from '@nestjs/common'; +import { PubSubAsyncConfig } from './interfaces/pub-sub-async-config.interface'; +import { PubSubOptions } from './interfaces/pub-sub-options.interface'; +import { PubSub } from './pub-sub'; +import { PUB_SUB_CONFIG_TOKEN } from './pub-sub.constants'; +import { getPubSubToken } from './utils/token'; + +export function createPubSubProvider(options: PubSubOptions): Provider { + return { + provide: getPubSubToken(options.name), + useFactory: () => { + return new PubSub(options); + }, + }; +} + +export function createOptionsProvider(config: PubSubAsyncConfig): Provider { + return { + provide: PUB_SUB_CONFIG_TOKEN, + useFactory: config.useFactory, + inject: config.inject || [], + }; +} diff --git a/src/commons/pub-sub/pub-sub.service.spec.ts b/src/commons/pub-sub/pub-sub.service.spec.ts new file mode 100644 index 0000000..652ea12 --- /dev/null +++ b/src/commons/pub-sub/pub-sub.service.spec.ts @@ -0,0 +1,18 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { PubSubService } from './pub-sub.service'; + +describe('PubsubService', () => { + let service: PubSubService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [PubSubService], + }).compile(); + + service = module.get(PubSubService); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); +}); diff --git a/src/commons/pub-sub/pub-sub.service.ts b/src/commons/pub-sub/pub-sub.service.ts new file mode 100644 index 0000000..12b7c29 --- /dev/null +++ b/src/commons/pub-sub/pub-sub.service.ts @@ -0,0 +1,9 @@ +import { Injectable } from '@nestjs/common'; +import { PubSubOptions } from './interfaces/pub-sub-options.interface'; + +@Injectable() +export class PubSubService { + private options = new Map(); + private pubClient; + private; +} diff --git a/src/commons/pub-sub/pub-sub.spec.ts b/src/commons/pub-sub/pub-sub.spec.ts new file mode 100644 index 0000000..b988b9d --- /dev/null +++ b/src/commons/pub-sub/pub-sub.spec.ts @@ -0,0 +1,87 @@ +import debug from 'debug'; +import { tap } from 'rxjs/operators'; +import { PubSub } from './pub-sub'; + +debug.enable('app:pubsub:*'); + +describe('PubSub', () => { + let instance: PubSub; + + beforeEach(async () => { + instance = new PubSub({ + name: 'default', + redis: { + host: 'localhost', + }, + }); + }); + + it('should be defined', () => { + expect(instance).toBeDefined(); + }); + + it('should can send and receive message', async () => { + const arr = new Array(10) + .fill(undefined) + .map(() => Math.random().toString(36).slice(2, 8)); + const results: string[] = []; + instance + .message$('test') + .pipe( + tap((val) => { + console.log(val); + }), + ) + .subscribe((val) => results.push(val)); + await new Promise((r) => setTimeout(r, 1000)); + await Promise.all([...arr.map((str) => instance.publish('test', str))]); + await new Promise((r) => setTimeout(r, 1000)); + expect(results).toMatchObject(arr); + }); + + it('should complete', async () => { + const arr = new Array(10) + .fill(undefined) + .map(() => Math.random().toString(36).slice(2, 8)); + const results: string[] = []; + instance + .message$('test') + .pipe( + tap((val) => { + console.log(val); + }), + ) + .subscribe((val) => results.push(val)); + await new Promise((r) => setTimeout(r, 1000)); + await Promise.all([...arr.map((str) => instance.publish('test', str))]); + await instance.finish('test'); + await Promise.all([...arr.map((str) => instance.publish('test', str))]); + await new Promise((r) => setTimeout(r, 1000)); + expect(results).toMatchObject(arr); + }); + it('should error', async () => { + const arr = new Array(10) + .fill(undefined) + .map(() => Math.random().toString(36).slice(2, 8)); + const results: string[] = []; + let error: string; + instance + .message$('test') + .pipe( + tap((val) => { + console.log(val); + }), + ) + .subscribe({ + next: (val) => results.push(val), + error: (err) => (error = err.message), + }); + await new Promise((r) => setTimeout(r, 1000)); + await Promise.all([...arr.map((str) => instance.publish('test', str))]); + await instance.throwError('test', 'TEST ERROR MESSAGE'); + await Promise.all([...arr.map((str) => instance.publish('test', str))]); + await new Promise((r) => setTimeout(r, 1000)); + expect(results).toMatchObject(arr); + expect(error).toEqual('TEST ERROR MESSAGE'); + }); +}); diff --git a/src/commons/pub-sub/pub-sub.ts b/src/commons/pub-sub/pub-sub.ts new file mode 100644 index 0000000..98853f8 --- /dev/null +++ b/src/commons/pub-sub/pub-sub.ts @@ -0,0 +1,110 @@ +import debug from 'debug'; +import { EventEmitter } from 'events'; +import IORedis, { Redis } from 'ioredis'; +import { from, fromEvent, Observable } from 'rxjs'; +import { filter, map, switchMap, takeWhile, tap } from 'rxjs/operators'; +import { ApplicationException } from '../exceptions/application.exception'; +import { PubSubOptions } from './interfaces/pub-sub-options.interface'; +import { + PubSubRawMessage, + PubSubRawNextMessage, +} from './interfaces/pub-sub-raw-message.interface'; + +const log = debug('app:pubsub:instance'); +export class PubSub extends EventEmitter { + pubRedis: Redis; + pSubRedis: Redis; + channels: string[] = []; + event$: Observable<[string, string, any]>; + + constructor(private readonly options: PubSubOptions) { + super(); + this.pubRedis = new IORedis(this.options.redis); + this.pSubRedis = new IORedis(this.options.redis); + + this.event$ = fromEvent<[string, string, string]>( + this.pSubRedis, + 'pmessage', + ).pipe( + map((ev) => { + try { + ev[2] = JSON.parse(ev[2]); + } catch (err) { + log('WARN: is not json'); + return null; + } + log('on message: %s %s %o', ...ev); + return ev; + }), + filter((v) => !!v), + ); + } + + async disconnect() { + log('disconnecting to redis...'); + this.pubRedis.disconnect(); + this.pSubRedis.disconnect(); + log('disconnected'); + } + + async registerChannel(channel: string) { + if (this.channels.includes(channel)) { + return; + } + this.channels.push(channel); + + return await this.pSubRedis.psubscribe(channel); + } + + private async redisPublish(channel: string, message: PubSubRawMessage) { + log.extend('publish')('channel: %s, message: %O', channel, message); + return await this.pubRedis.publish(channel, JSON.stringify(message)); + } + + async publish(channel: string, message: any): Promise { + return await this.redisPublish(channel, { + type: 'next', + message, + }); + } + + async finish(channel: string): Promise { + return await this.redisPublish(channel, { + type: 'complete', + }); + } + + async throwError(channel: string, error: string): Promise { + return await this.redisPublish(channel, { + type: 'error', + error, + }); + } + + message$ = (channel: string): Observable => { + return from(this.registerChannel(channel)) + .pipe(switchMap(() => this.event$)) + .pipe( + filter(([pattern]) => pattern === channel), + tap(([pattern, channel, message]) => { + log.extend('subscribe')( + 'channel: %s, match: %, message: %O', + channel, + pattern, + message, + ); + }), + takeWhile(([pattern, channel, message]) => { + if (pattern === channel) { + if (message.type === 'error') { + throw new ApplicationException(message.error); + } + return message.type !== 'complete'; + } + return true; + }), + map((ev) => ev[2] as PubSubRawMessage), + map((message: PubSubRawNextMessage) => message.message), + ); + }; +} diff --git a/src/commons/pub-sub/utils/token.ts b/src/commons/pub-sub/utils/token.ts new file mode 100644 index 0000000..b4f6472 --- /dev/null +++ b/src/commons/pub-sub/utils/token.ts @@ -0,0 +1,5 @@ +import { DEFAULT_PUB_SUB_NAME } from '../pub-sub.constants'; + +export function getPubSubToken(name) { + return `app:pub-usb:${name || DEFAULT_PUB_SUB_NAME}`; +}