feat: pubsub base redis.
This commit is contained in:
		
							
								
								
									
										2
									
								
								.vscode/settings.json
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.vscode/settings.json
									
									
									
									
										vendored
									
									
								
							@@ -6,6 +6,8 @@
 | 
			
		||||
    "lpush",
 | 
			
		||||
    "lrange",
 | 
			
		||||
    "metatype",
 | 
			
		||||
    "pmessage",
 | 
			
		||||
    "psubscribe",
 | 
			
		||||
    "rpop",
 | 
			
		||||
    "rpush"
 | 
			
		||||
  ]
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										26
									
								
								package-lock.json
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										26
									
								
								package-lock.json
									
									
									
										generated
									
									
									
								
							@@ -27,6 +27,7 @@
 | 
			
		||||
        "debug": "^4.3.1",
 | 
			
		||||
        "graphql": "^15.5.0",
 | 
			
		||||
        "graphql-tools": "^7.0.2",
 | 
			
		||||
        "ioredis": "^4.25.0",
 | 
			
		||||
        "js-yaml": "^4.0.0",
 | 
			
		||||
        "nestjs-redis": "^1.2.8",
 | 
			
		||||
        "observable-to-async-generator": "^1.0.1-rc",
 | 
			
		||||
@@ -45,6 +46,7 @@
 | 
			
		||||
        "@types/body-parser": "^1.19.0",
 | 
			
		||||
        "@types/debug": "^4.1.5",
 | 
			
		||||
        "@types/express": "^4.17.8",
 | 
			
		||||
        "@types/ioredis": "^4.22.2",
 | 
			
		||||
        "@types/jest": "^26.0.15",
 | 
			
		||||
        "@types/js-yaml": "^4.0.0",
 | 
			
		||||
        "@types/node": "^14.14.6",
 | 
			
		||||
@@ -3278,9 +3280,9 @@
 | 
			
		||||
      "integrity": "sha512-2aoSC4UUbHDj2uCsCxcG/vRMXey/m17bC7UwitVm5hn22nI8O8Y9iDpA76Orc+DWkQ4zZrOKEshCqR/jSuXAHA=="
 | 
			
		||||
    },
 | 
			
		||||
    "node_modules/@types/ioredis": {
 | 
			
		||||
      "version": "4.22.1",
 | 
			
		||||
      "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.22.1.tgz",
 | 
			
		||||
      "integrity": "sha512-GxXT828fkvBeThO68ZJg8cD2haqea5ANBJaxA+UZqLranNkEnQ8N7QLPtykwWbN/sRQz75O7kj+PNmCKF4CEKw==",
 | 
			
		||||
      "version": "4.22.2",
 | 
			
		||||
      "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.22.2.tgz",
 | 
			
		||||
      "integrity": "sha512-32nEh2Eq20xlgtIqVx1kkJkpqh5V1wqMsumK8Y9Izma/vW9k8Ro7sD+y4G6SyCqz8u9+HcssZ6+E83Modta38w==",
 | 
			
		||||
      "dependencies": {
 | 
			
		||||
        "@types/node": "*"
 | 
			
		||||
      }
 | 
			
		||||
@@ -8553,9 +8555,9 @@
 | 
			
		||||
      }
 | 
			
		||||
    },
 | 
			
		||||
    "node_modules/ioredis": {
 | 
			
		||||
      "version": "4.24.4",
 | 
			
		||||
      "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.24.4.tgz",
 | 
			
		||||
      "integrity": "sha512-v28xxBENyTmReC6lVTL7EkAPjVF8cuGtDEjGDi1Z2n7htsC2WdiDceZrCIPeuPiLa6kDFWHb1Y8O0upZROsMgA==",
 | 
			
		||||
      "version": "4.25.0",
 | 
			
		||||
      "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.25.0.tgz",
 | 
			
		||||
      "integrity": "sha512-UoeqXpZB05aerGD3gB9NiigMsAyph+N+GWH8+3lX1+26caVV03GkL6JoLxS2HCxyvqCWbNsVSZTAp5W12qe23A==",
 | 
			
		||||
      "dependencies": {
 | 
			
		||||
        "cluster-key-slot": "^1.1.0",
 | 
			
		||||
        "debug": "^4.3.1",
 | 
			
		||||
@@ -18258,9 +18260,9 @@
 | 
			
		||||
      "integrity": "sha512-2aoSC4UUbHDj2uCsCxcG/vRMXey/m17bC7UwitVm5hn22nI8O8Y9iDpA76Orc+DWkQ4zZrOKEshCqR/jSuXAHA=="
 | 
			
		||||
    },
 | 
			
		||||
    "@types/ioredis": {
 | 
			
		||||
      "version": "4.22.1",
 | 
			
		||||
      "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.22.1.tgz",
 | 
			
		||||
      "integrity": "sha512-GxXT828fkvBeThO68ZJg8cD2haqea5ANBJaxA+UZqLranNkEnQ8N7QLPtykwWbN/sRQz75O7kj+PNmCKF4CEKw==",
 | 
			
		||||
      "version": "4.22.2",
 | 
			
		||||
      "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.22.2.tgz",
 | 
			
		||||
      "integrity": "sha512-32nEh2Eq20xlgtIqVx1kkJkpqh5V1wqMsumK8Y9Izma/vW9k8Ro7sD+y4G6SyCqz8u9+HcssZ6+E83Modta38w==",
 | 
			
		||||
      "requires": {
 | 
			
		||||
        "@types/node": "*"
 | 
			
		||||
      }
 | 
			
		||||
@@ -22385,9 +22387,9 @@
 | 
			
		||||
      "dev": true
 | 
			
		||||
    },
 | 
			
		||||
    "ioredis": {
 | 
			
		||||
      "version": "4.24.4",
 | 
			
		||||
      "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.24.4.tgz",
 | 
			
		||||
      "integrity": "sha512-v28xxBENyTmReC6lVTL7EkAPjVF8cuGtDEjGDi1Z2n7htsC2WdiDceZrCIPeuPiLa6kDFWHb1Y8O0upZROsMgA==",
 | 
			
		||||
      "version": "4.25.0",
 | 
			
		||||
      "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.25.0.tgz",
 | 
			
		||||
      "integrity": "sha512-UoeqXpZB05aerGD3gB9NiigMsAyph+N+GWH8+3lX1+26caVV03GkL6JoLxS2HCxyvqCWbNsVSZTAp5W12qe23A==",
 | 
			
		||||
      "requires": {
 | 
			
		||||
        "cluster-key-slot": "^1.1.0",
 | 
			
		||||
        "debug": "^4.3.1",
 | 
			
		||||
 
 | 
			
		||||
@@ -40,6 +40,7 @@
 | 
			
		||||
    "debug": "^4.3.1",
 | 
			
		||||
    "graphql": "^15.5.0",
 | 
			
		||||
    "graphql-tools": "^7.0.2",
 | 
			
		||||
    "ioredis": "^4.25.0",
 | 
			
		||||
    "js-yaml": "^4.0.0",
 | 
			
		||||
    "nestjs-redis": "^1.2.8",
 | 
			
		||||
    "observable-to-async-generator": "^1.0.1-rc",
 | 
			
		||||
@@ -58,6 +59,7 @@
 | 
			
		||||
    "@types/body-parser": "^1.19.0",
 | 
			
		||||
    "@types/debug": "^4.1.5",
 | 
			
		||||
    "@types/express": "^4.17.8",
 | 
			
		||||
    "@types/ioredis": "^4.22.2",
 | 
			
		||||
    "@types/jest": "^26.0.15",
 | 
			
		||||
    "@types/js-yaml": "^4.0.0",
 | 
			
		||||
    "@types/node": "^14.14.6",
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,10 @@
 | 
			
		||||
import { Module } from '@nestjs/common';
 | 
			
		||||
import { PasswordConverter } from './services/password-converter';
 | 
			
		||||
import { PubSubModule } from './pub-sub/pub-sub.module';
 | 
			
		||||
 | 
			
		||||
@Module({
 | 
			
		||||
  providers: [PasswordConverter],
 | 
			
		||||
  exports: [PasswordConverter],
 | 
			
		||||
  imports: [PubSubModule],
 | 
			
		||||
})
 | 
			
		||||
export class CommonsModule {}
 | 
			
		||||
 
 | 
			
		||||
@@ -0,0 +1,8 @@
 | 
			
		||||
import { ModuleMetadata } from '@nestjs/common';
 | 
			
		||||
import { PubSubOptions } from 'graphql-subscriptions';
 | 
			
		||||
 | 
			
		||||
export interface PubSubAsyncConfig extends Pick<ModuleMetadata, 'imports'> {
 | 
			
		||||
  name?: string;
 | 
			
		||||
  useFactory: (...args: any[]) => Promise<PubSubOptions> | PubSubOptions;
 | 
			
		||||
  inject?: any[];
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,6 @@
 | 
			
		||||
import { RedisOptions } from 'ioredis';
 | 
			
		||||
 | 
			
		||||
export interface PubSubOptions {
 | 
			
		||||
  name: string;
 | 
			
		||||
  redis: RedisOptions;
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,15 @@
 | 
			
		||||
export type PubSubRawMessage<T> =
 | 
			
		||||
  | PubSubRawNextMessage<T>
 | 
			
		||||
  | PubSubRawErrorMessage<T>
 | 
			
		||||
  | PubSubRawCompleteMessage<T>;
 | 
			
		||||
export interface PubSubRawNextMessage<T> {
 | 
			
		||||
  type: 'next';
 | 
			
		||||
  message: T;
 | 
			
		||||
}
 | 
			
		||||
export interface PubSubRawErrorMessage<T> {
 | 
			
		||||
  type: 'error';
 | 
			
		||||
  error: string;
 | 
			
		||||
}
 | 
			
		||||
export interface PubSubRawCompleteMessage<T> {
 | 
			
		||||
  type: 'complete';
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										2
									
								
								src/commons/pub-sub/pub-sub.constants.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								src/commons/pub-sub/pub-sub.constants.ts
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,2 @@
 | 
			
		||||
export const DEFAULT_PUB_SUB_NAME = 'default';
 | 
			
		||||
export const PUB_SUB_CONFIG_TOKEN = 'pub_sub_config_token';
 | 
			
		||||
							
								
								
									
										47
									
								
								src/commons/pub-sub/pub-sub.module.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								src/commons/pub-sub/pub-sub.module.ts
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,47 @@
 | 
			
		||||
import { DynamicModule, Module, Provider } from '@nestjs/common';
 | 
			
		||||
import { PubSubService } from './pub-sub.service';
 | 
			
		||||
import {
 | 
			
		||||
  createOptionsProvider,
 | 
			
		||||
  createPubSubProvider,
 | 
			
		||||
} from './pub-sub.providers';
 | 
			
		||||
import { PubSubOptions } from './interfaces/pub-sub-options.interface';
 | 
			
		||||
import { PubSubAsyncConfig } from './interfaces/pub-sub-async-config.interface';
 | 
			
		||||
import { getPubSubToken } from './utils/token';
 | 
			
		||||
import { PUB_SUB_CONFIG_TOKEN } from './pub-sub.constants';
 | 
			
		||||
 | 
			
		||||
@Module({
 | 
			
		||||
  providers: [PubSubService],
 | 
			
		||||
})
 | 
			
		||||
export class PubSubModule {
 | 
			
		||||
  public static forRoot(options: PubSubOptions): DynamicModule {
 | 
			
		||||
    const providers = [createPubSubProvider(options)];
 | 
			
		||||
    return {
 | 
			
		||||
      global: true,
 | 
			
		||||
      module: PubSubModule,
 | 
			
		||||
      providers,
 | 
			
		||||
      exports: providers,
 | 
			
		||||
    };
 | 
			
		||||
  }
 | 
			
		||||
  public static forRootAsync(config: PubSubAsyncConfig) {
 | 
			
		||||
    const providers: Provider[] = [
 | 
			
		||||
      createOptionsProvider(config),
 | 
			
		||||
      {
 | 
			
		||||
        provide: getPubSubToken(config.name),
 | 
			
		||||
        inject: [PUB_SUB_CONFIG_TOKEN],
 | 
			
		||||
        useFactory: (options: PubSubOptions) => {
 | 
			
		||||
          return createPubSubProvider({
 | 
			
		||||
            name: config.name,
 | 
			
		||||
            ...options,
 | 
			
		||||
          });
 | 
			
		||||
        },
 | 
			
		||||
      },
 | 
			
		||||
    ];
 | 
			
		||||
    return {
 | 
			
		||||
      global: true,
 | 
			
		||||
      module: PubSubModule,
 | 
			
		||||
      imports: config.imports,
 | 
			
		||||
      providers,
 | 
			
		||||
      exports: providers,
 | 
			
		||||
    };
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										23
									
								
								src/commons/pub-sub/pub-sub.providers.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								src/commons/pub-sub/pub-sub.providers.ts
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,23 @@
 | 
			
		||||
import { Provider } from '@nestjs/common';
 | 
			
		||||
import { PubSubAsyncConfig } from './interfaces/pub-sub-async-config.interface';
 | 
			
		||||
import { PubSubOptions } from './interfaces/pub-sub-options.interface';
 | 
			
		||||
import { PubSub } from './pub-sub';
 | 
			
		||||
import { PUB_SUB_CONFIG_TOKEN } from './pub-sub.constants';
 | 
			
		||||
import { getPubSubToken } from './utils/token';
 | 
			
		||||
 | 
			
		||||
export function createPubSubProvider(options: PubSubOptions): Provider {
 | 
			
		||||
  return {
 | 
			
		||||
    provide: getPubSubToken(options.name),
 | 
			
		||||
    useFactory: () => {
 | 
			
		||||
      return new PubSub(options);
 | 
			
		||||
    },
 | 
			
		||||
  };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export function createOptionsProvider(config: PubSubAsyncConfig): Provider {
 | 
			
		||||
  return {
 | 
			
		||||
    provide: PUB_SUB_CONFIG_TOKEN,
 | 
			
		||||
    useFactory: config.useFactory,
 | 
			
		||||
    inject: config.inject || [],
 | 
			
		||||
  };
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										18
									
								
								src/commons/pub-sub/pub-sub.service.spec.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								src/commons/pub-sub/pub-sub.service.spec.ts
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,18 @@
 | 
			
		||||
import { Test, TestingModule } from '@nestjs/testing';
 | 
			
		||||
import { PubSubService } from './pub-sub.service';
 | 
			
		||||
 | 
			
		||||
describe('PubsubService', () => {
 | 
			
		||||
  let service: PubSubService;
 | 
			
		||||
 | 
			
		||||
  beforeEach(async () => {
 | 
			
		||||
    const module: TestingModule = await Test.createTestingModule({
 | 
			
		||||
      providers: [PubSubService],
 | 
			
		||||
    }).compile();
 | 
			
		||||
 | 
			
		||||
    service = module.get<PubSubService>(PubSubService);
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  it('should be defined', () => {
 | 
			
		||||
    expect(service).toBeDefined();
 | 
			
		||||
  });
 | 
			
		||||
});
 | 
			
		||||
							
								
								
									
										9
									
								
								src/commons/pub-sub/pub-sub.service.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										9
									
								
								src/commons/pub-sub/pub-sub.service.ts
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,9 @@
 | 
			
		||||
import { Injectable } from '@nestjs/common';
 | 
			
		||||
import { PubSubOptions } from './interfaces/pub-sub-options.interface';
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class PubSubService {
 | 
			
		||||
  private options = new Map<string, PubSubOptions>();
 | 
			
		||||
  private pubClient;
 | 
			
		||||
  private;
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										87
									
								
								src/commons/pub-sub/pub-sub.spec.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										87
									
								
								src/commons/pub-sub/pub-sub.spec.ts
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,87 @@
 | 
			
		||||
import debug from 'debug';
 | 
			
		||||
import { tap } from 'rxjs/operators';
 | 
			
		||||
import { PubSub } from './pub-sub';
 | 
			
		||||
 | 
			
		||||
debug.enable('app:pubsub:*');
 | 
			
		||||
 | 
			
		||||
describe('PubSub', () => {
 | 
			
		||||
  let instance: PubSub;
 | 
			
		||||
 | 
			
		||||
  beforeEach(async () => {
 | 
			
		||||
    instance = new PubSub({
 | 
			
		||||
      name: 'default',
 | 
			
		||||
      redis: {
 | 
			
		||||
        host: 'localhost',
 | 
			
		||||
      },
 | 
			
		||||
    });
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  it('should be defined', () => {
 | 
			
		||||
    expect(instance).toBeDefined();
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  it('should can send and receive message', async () => {
 | 
			
		||||
    const arr = new Array(10)
 | 
			
		||||
      .fill(undefined)
 | 
			
		||||
      .map(() => Math.random().toString(36).slice(2, 8));
 | 
			
		||||
    const results: string[] = [];
 | 
			
		||||
    instance
 | 
			
		||||
      .message$<string>('test')
 | 
			
		||||
      .pipe(
 | 
			
		||||
        tap((val) => {
 | 
			
		||||
          console.log(val);
 | 
			
		||||
        }),
 | 
			
		||||
      )
 | 
			
		||||
      .subscribe((val) => results.push(val));
 | 
			
		||||
    await new Promise((r) => setTimeout(r, 1000));
 | 
			
		||||
    await Promise.all([...arr.map((str) => instance.publish('test', str))]);
 | 
			
		||||
    await new Promise((r) => setTimeout(r, 1000));
 | 
			
		||||
    expect(results).toMatchObject(arr);
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  it('should complete', async () => {
 | 
			
		||||
    const arr = new Array(10)
 | 
			
		||||
      .fill(undefined)
 | 
			
		||||
      .map(() => Math.random().toString(36).slice(2, 8));
 | 
			
		||||
    const results: string[] = [];
 | 
			
		||||
    instance
 | 
			
		||||
      .message$<string>('test')
 | 
			
		||||
      .pipe(
 | 
			
		||||
        tap((val) => {
 | 
			
		||||
          console.log(val);
 | 
			
		||||
        }),
 | 
			
		||||
      )
 | 
			
		||||
      .subscribe((val) => results.push(val));
 | 
			
		||||
    await new Promise((r) => setTimeout(r, 1000));
 | 
			
		||||
    await Promise.all([...arr.map((str) => instance.publish('test', str))]);
 | 
			
		||||
    await instance.finish('test');
 | 
			
		||||
    await Promise.all([...arr.map((str) => instance.publish('test', str))]);
 | 
			
		||||
    await new Promise((r) => setTimeout(r, 1000));
 | 
			
		||||
    expect(results).toMatchObject(arr);
 | 
			
		||||
  });
 | 
			
		||||
  it('should error', async () => {
 | 
			
		||||
    const arr = new Array(10)
 | 
			
		||||
      .fill(undefined)
 | 
			
		||||
      .map(() => Math.random().toString(36).slice(2, 8));
 | 
			
		||||
    const results: string[] = [];
 | 
			
		||||
    let error: string;
 | 
			
		||||
    instance
 | 
			
		||||
      .message$<string>('test')
 | 
			
		||||
      .pipe(
 | 
			
		||||
        tap((val) => {
 | 
			
		||||
          console.log(val);
 | 
			
		||||
        }),
 | 
			
		||||
      )
 | 
			
		||||
      .subscribe({
 | 
			
		||||
        next: (val) => results.push(val),
 | 
			
		||||
        error: (err) => (error = err.message),
 | 
			
		||||
      });
 | 
			
		||||
    await new Promise((r) => setTimeout(r, 1000));
 | 
			
		||||
    await Promise.all([...arr.map((str) => instance.publish('test', str))]);
 | 
			
		||||
    await instance.throwError('test', 'TEST ERROR MESSAGE');
 | 
			
		||||
    await Promise.all([...arr.map((str) => instance.publish('test', str))]);
 | 
			
		||||
    await new Promise((r) => setTimeout(r, 1000));
 | 
			
		||||
    expect(results).toMatchObject(arr);
 | 
			
		||||
    expect(error).toEqual('TEST ERROR MESSAGE');
 | 
			
		||||
  });
 | 
			
		||||
});
 | 
			
		||||
							
								
								
									
										110
									
								
								src/commons/pub-sub/pub-sub.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										110
									
								
								src/commons/pub-sub/pub-sub.ts
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,110 @@
 | 
			
		||||
import debug from 'debug';
 | 
			
		||||
import { EventEmitter } from 'events';
 | 
			
		||||
import IORedis, { Redis } from 'ioredis';
 | 
			
		||||
import { from, fromEvent, Observable } from 'rxjs';
 | 
			
		||||
import { filter, map, switchMap, takeWhile, tap } from 'rxjs/operators';
 | 
			
		||||
import { ApplicationException } from '../exceptions/application.exception';
 | 
			
		||||
import { PubSubOptions } from './interfaces/pub-sub-options.interface';
 | 
			
		||||
import {
 | 
			
		||||
  PubSubRawMessage,
 | 
			
		||||
  PubSubRawNextMessage,
 | 
			
		||||
} from './interfaces/pub-sub-raw-message.interface';
 | 
			
		||||
 | 
			
		||||
const log = debug('app:pubsub:instance');
 | 
			
		||||
export class PubSub extends EventEmitter {
 | 
			
		||||
  pubRedis: Redis;
 | 
			
		||||
  pSubRedis: Redis;
 | 
			
		||||
  channels: string[] = [];
 | 
			
		||||
  event$: Observable<[string, string, any]>;
 | 
			
		||||
 | 
			
		||||
  constructor(private readonly options: PubSubOptions) {
 | 
			
		||||
    super();
 | 
			
		||||
    this.pubRedis = new IORedis(this.options.redis);
 | 
			
		||||
    this.pSubRedis = new IORedis(this.options.redis);
 | 
			
		||||
 | 
			
		||||
    this.event$ = fromEvent<[string, string, string]>(
 | 
			
		||||
      this.pSubRedis,
 | 
			
		||||
      'pmessage',
 | 
			
		||||
    ).pipe(
 | 
			
		||||
      map((ev) => {
 | 
			
		||||
        try {
 | 
			
		||||
          ev[2] = JSON.parse(ev[2]);
 | 
			
		||||
        } catch (err) {
 | 
			
		||||
          log('WARN: is not json');
 | 
			
		||||
          return null;
 | 
			
		||||
        }
 | 
			
		||||
        log('on message: %s %s %o', ...ev);
 | 
			
		||||
        return ev;
 | 
			
		||||
      }),
 | 
			
		||||
      filter((v) => !!v),
 | 
			
		||||
    );
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async disconnect() {
 | 
			
		||||
    log('disconnecting to redis...');
 | 
			
		||||
    this.pubRedis.disconnect();
 | 
			
		||||
    this.pSubRedis.disconnect();
 | 
			
		||||
    log('disconnected');
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async registerChannel(channel: string) {
 | 
			
		||||
    if (this.channels.includes(channel)) {
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
    this.channels.push(channel);
 | 
			
		||||
 | 
			
		||||
    return await this.pSubRedis.psubscribe(channel);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async redisPublish<T>(channel: string, message: PubSubRawMessage<T>) {
 | 
			
		||||
    log.extend('publish')('channel: %s, message: %O', channel, message);
 | 
			
		||||
    return await this.pubRedis.publish(channel, JSON.stringify(message));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async publish(channel: string, message: any): Promise<number> {
 | 
			
		||||
    return await this.redisPublish(channel, {
 | 
			
		||||
      type: 'next',
 | 
			
		||||
      message,
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async finish(channel: string): Promise<number> {
 | 
			
		||||
    return await this.redisPublish(channel, {
 | 
			
		||||
      type: 'complete',
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async throwError(channel: string, error: string): Promise<number> {
 | 
			
		||||
    return await this.redisPublish(channel, {
 | 
			
		||||
      type: 'error',
 | 
			
		||||
      error,
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  message$ = <T>(channel: string): Observable<T> => {
 | 
			
		||||
    return from(this.registerChannel(channel))
 | 
			
		||||
      .pipe(switchMap(() => this.event$))
 | 
			
		||||
      .pipe(
 | 
			
		||||
        filter(([pattern]) => pattern === channel),
 | 
			
		||||
        tap(([pattern, channel, message]) => {
 | 
			
		||||
          log.extend('subscribe')(
 | 
			
		||||
            'channel: %s, match: %, message: %O',
 | 
			
		||||
            channel,
 | 
			
		||||
            pattern,
 | 
			
		||||
            message,
 | 
			
		||||
          );
 | 
			
		||||
        }),
 | 
			
		||||
        takeWhile(([pattern, channel, message]) => {
 | 
			
		||||
          if (pattern === channel) {
 | 
			
		||||
            if (message.type === 'error') {
 | 
			
		||||
              throw new ApplicationException(message.error);
 | 
			
		||||
            }
 | 
			
		||||
            return message.type !== 'complete';
 | 
			
		||||
          }
 | 
			
		||||
          return true;
 | 
			
		||||
        }),
 | 
			
		||||
        map((ev) => ev[2] as PubSubRawMessage<T>),
 | 
			
		||||
        map((message: PubSubRawNextMessage<T>) => message.message),
 | 
			
		||||
      );
 | 
			
		||||
  };
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										5
									
								
								src/commons/pub-sub/utils/token.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										5
									
								
								src/commons/pub-sub/utils/token.ts
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,5 @@
 | 
			
		||||
import { DEFAULT_PUB_SUB_NAME } from '../pub-sub.constants';
 | 
			
		||||
 | 
			
		||||
export function getPubSubToken(name) {
 | 
			
		||||
  return `app:pub-usb:${name || DEFAULT_PUB_SUB_NAME}`;
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user