feat(pipeline-task): flush service.
This commit is contained in:
parent
20612d4301
commit
ead32a1204
@ -11,3 +11,5 @@ registerEnumType(TaskStatuses, {
|
||||
name: 'TaskStatuses',
|
||||
description: '任务状态',
|
||||
});
|
||||
|
||||
export const terminalTaskStatuses = [TaskStatuses.success, TaskStatuses.failed];
|
||||
|
80
src/pipeline-tasks/pipeline-task-flush.service.spec.ts
Normal file
80
src/pipeline-tasks/pipeline-task-flush.service.spec.ts
Normal file
@ -0,0 +1,80 @@
|
||||
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
|
||||
import { Test, TestingModule } from '@nestjs/testing';
|
||||
import { RedisService } from 'nestjs-redis';
|
||||
import { PipelineTaskFlushService } from './pipeline-task-flush.service';
|
||||
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
||||
import { TaskStatuses } from './enums/task-statuses.enum';
|
||||
import {
|
||||
EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||
ROUTE_PIPELINE_TASK_DONE,
|
||||
} from './pipeline-tasks.constants';
|
||||
|
||||
describe('PipelineTaskFlushService', () => {
|
||||
let service: PipelineTaskFlushService;
|
||||
let redisService: RedisService;
|
||||
let amqpConnection: AmqpConnection;
|
||||
|
||||
beforeEach(async () => {
|
||||
const redisClient = {
|
||||
rpush: jest.fn(() => Promise.resolve()),
|
||||
lrange: jest.fn(() => Promise.resolve()),
|
||||
};
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
PipelineTaskFlushService,
|
||||
{
|
||||
provide: RedisService,
|
||||
useValue: {
|
||||
getClient() {
|
||||
return redisClient;
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
provide: AmqpConnection,
|
||||
useValue: {
|
||||
request: jest.fn(() => Promise.resolve()),
|
||||
},
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
service = module.get<PipelineTaskFlushService>(PipelineTaskFlushService);
|
||||
redisService = module.get<RedisService>(RedisService);
|
||||
amqpConnection = module.get<AmqpConnection>(AmqpConnection);
|
||||
});
|
||||
|
||||
it('should be defined', () => {
|
||||
expect(service).toBeDefined();
|
||||
});
|
||||
|
||||
describe('write', () => {
|
||||
it('normal', async () => {
|
||||
const testEvent = new PipelineTaskEvent();
|
||||
testEvent.taskId = 'test';
|
||||
testEvent.status = TaskStatuses.working;
|
||||
const rpush = jest.spyOn(redisService.getClient(), 'rpush');
|
||||
const request = jest.spyOn(amqpConnection, 'request');
|
||||
await service.write(testEvent);
|
||||
expect(rpush).toBeCalledTimes(1);
|
||||
expect(rpush.mock.calls[0][0]).toEqual('p-task:log:test');
|
||||
expect(rpush.mock.calls[0][1]).toEqual(JSON.stringify(testEvent));
|
||||
expect(request).toBeCalledTimes(0);
|
||||
});
|
||||
it('event for which task done', async () => {
|
||||
const testEvent = new PipelineTaskEvent();
|
||||
testEvent.taskId = 'test';
|
||||
testEvent.status = TaskStatuses.success;
|
||||
const rpush = jest.spyOn(redisService.getClient(), 'rpush');
|
||||
const request = jest.spyOn(amqpConnection, 'request');
|
||||
await service.write(testEvent);
|
||||
expect(rpush).toBeCalledTimes(1);
|
||||
expect(request).toBeCalledTimes(1);
|
||||
expect(request.mock.calls[0][0]).toMatchObject({
|
||||
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
||||
payload: { taskId: 'test', status: TaskStatuses.success },
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
57
src/pipeline-tasks/pipeline-task-flush.service.ts
Normal file
57
src/pipeline-tasks/pipeline-task-flush.service.ts
Normal file
@ -0,0 +1,57 @@
|
||||
import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { deserialize } from 'class-transformer';
|
||||
import { RedisService } from 'nestjs-redis';
|
||||
import { isNil } from 'ramda';
|
||||
import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq';
|
||||
import { terminalTaskStatuses } from './enums/task-statuses.enum';
|
||||
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
||||
import {
|
||||
EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||
ROUTE_PIPELINE_TASK_DONE,
|
||||
} from './pipeline-tasks.constants';
|
||||
import {
|
||||
EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||
ROUTE_PIPELINE_TASK_LOG,
|
||||
QUEUE_WRITE_PIPELINE_TASK_LOG,
|
||||
} from './pipeline-tasks.constants';
|
||||
|
||||
@Injectable()
|
||||
export class PipelineTaskFlushService {
|
||||
constructor(
|
||||
private readonly redisService: RedisService,
|
||||
private readonly amqpConnection: AmqpConnection,
|
||||
) {}
|
||||
|
||||
@RabbitSubscribe({
|
||||
exchange: EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||
routingKey: ROUTE_PIPELINE_TASK_LOG,
|
||||
queue: getSelfInstanceQueueKey(QUEUE_WRITE_PIPELINE_TASK_LOG),
|
||||
queueOptions: {
|
||||
autoDelete: true,
|
||||
},
|
||||
})
|
||||
async write(message: PipelineTaskEvent) {
|
||||
await this.redisService
|
||||
.getClient()
|
||||
.rpush(this.getKey(message.taskId), JSON.stringify(message));
|
||||
if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) {
|
||||
this.amqpConnection.request({
|
||||
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
||||
payload: { taskId: message.taskId, status: message.status },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async read(taskId: string) {
|
||||
const raw = await this.redisService
|
||||
.getClient()
|
||||
.lrange(this.getKey(taskId), 0, -1);
|
||||
return raw.map((it) => deserialize(PipelineTaskEvent, it));
|
||||
}
|
||||
|
||||
private getKey(taskId: string) {
|
||||
return `p-task:log:${taskId}`;
|
||||
}
|
||||
}
|
@ -23,13 +23,17 @@ describe('PipelineTaskRunner', () => {
|
||||
it('normal', async () => {
|
||||
const event = new PipelineTaskEvent();
|
||||
event.taskId = 'test';
|
||||
event.emittedAt = new Date().toISOString() as any;
|
||||
const emittedAt = new Date();
|
||||
event.emittedAt = emittedAt.toISOString() as any;
|
||||
const message$ = logger.getMessage$('test');
|
||||
|
||||
let receiveEvent;
|
||||
message$.pipe(take(1)).subscribe((value) => (receiveEvent = value));
|
||||
await logger.handleEvent(event);
|
||||
expect(receiveEvent).toEqual(event);
|
||||
expect(receiveEvent).toMatchObject({
|
||||
...event,
|
||||
emittedAt,
|
||||
});
|
||||
});
|
||||
it('no match', async () => {
|
||||
const event = new PipelineTaskEvent();
|
||||
|
@ -2,7 +2,7 @@ import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
|
||||
import { Injectable, OnModuleDestroy } from '@nestjs/common';
|
||||
import { plainToClass } from 'class-transformer';
|
||||
import { Observable, Subject } from 'rxjs';
|
||||
import { filter, publish, tap } from 'rxjs/operators';
|
||||
import { filter } from 'rxjs/operators';
|
||||
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
||||
import {
|
||||
EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||
@ -14,20 +14,21 @@ import {
|
||||
export class PipelineTaskLogger implements OnModuleDestroy {
|
||||
private readonly messageSubject = new Subject<PipelineTaskEvent>();
|
||||
private readonly message$: Observable<PipelineTaskEvent> = this.messageSubject.pipe();
|
||||
|
||||
@RabbitSubscribe({
|
||||
exchange: EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||
routingKey: ROUTE_PIPELINE_TASK_LOG,
|
||||
queue: QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT,
|
||||
queueOptions: {
|
||||
autoDelete: true,
|
||||
},
|
||||
})
|
||||
async handleEvent(message: PipelineTaskEvent) {
|
||||
this.messageSubject.next(plainToClass(PipelineTaskEvent, message));
|
||||
}
|
||||
|
||||
getMessage$(taskId: string) {
|
||||
return this.message$.pipe(
|
||||
tap((val) => console.log(val)),
|
||||
filter((event) => event.taskId === taskId),
|
||||
);
|
||||
return this.message$.pipe(filter((event) => event.taskId === taskId));
|
||||
}
|
||||
|
||||
onModuleDestroy() {
|
||||
|
@ -2,3 +2,5 @@ export const EXCHANGE_PIPELINE_TASK_TOPIC = 'pipeline-task.topic';
|
||||
export const EXCHANGE_PIPELINE_TASK_FANOUT = 'pipeline-task.fanout';
|
||||
export const ROUTE_PIPELINE_TASK_LOG = 'pipeline-task-log';
|
||||
export const QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT = 'pipeline-task-log';
|
||||
export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log';
|
||||
export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done';
|
||||
|
@ -10,8 +10,12 @@ import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
|
||||
import { ConfigModule, ConfigService } from '@nestjs/config';
|
||||
import { PipelineTaskRunner } from './pipeline-task.runner';
|
||||
import { spawn } from 'child_process';
|
||||
import { EXCHANGE_PIPELINE_TASK_FANOUT } from './pipeline-tasks.constants';
|
||||
import {
|
||||
EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||
EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||
} from './pipeline-tasks.constants';
|
||||
import { PipelineTaskLogger } from './pipeline-task.logger';
|
||||
import { PipelineTaskFlushService } from './pipeline-task-flush.service';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
@ -56,6 +60,14 @@ import { PipelineTaskLogger } from './pipeline-task.logger';
|
||||
autoDelete: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||
type: 'topic',
|
||||
options: {
|
||||
durable: false,
|
||||
autoDelete: true,
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
inject: [ConfigService],
|
||||
@ -70,6 +82,7 @@ import { PipelineTaskLogger } from './pipeline-task.logger';
|
||||
provide: 'spawn',
|
||||
useValue: spawn,
|
||||
},
|
||||
PipelineTaskFlushService,
|
||||
],
|
||||
exports: [PipelineTasksService],
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user