feat: 任务完成后更新数据库中的数据。
This commit is contained in:
parent
0c3310d3a5
commit
133439bb49
@ -18,6 +18,7 @@ describe('PipelineTaskFlushService', () => {
|
|||||||
const redisClient = {
|
const redisClient = {
|
||||||
rpush: jest.fn(() => Promise.resolve()),
|
rpush: jest.fn(() => Promise.resolve()),
|
||||||
lrange: jest.fn(() => Promise.resolve()),
|
lrange: jest.fn(() => Promise.resolve()),
|
||||||
|
expire: jest.fn(() => Promise.resolve()),
|
||||||
};
|
};
|
||||||
const module: TestingModule = await Test.createTestingModule({
|
const module: TestingModule = await Test.createTestingModule({
|
||||||
providers: [
|
providers: [
|
||||||
|
@ -37,11 +37,15 @@ export class PipelineTaskFlushService {
|
|||||||
await client.rpush(this.getKey(message.taskId), JSON.stringify(message));
|
await client.rpush(this.getKey(message.taskId), JSON.stringify(message));
|
||||||
await client.expire(this.getKey(message.taskId), 600); // ten minutes
|
await client.expire(this.getKey(message.taskId), 600); // ten minutes
|
||||||
if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) {
|
if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) {
|
||||||
this.amqpConnection.request({
|
try {
|
||||||
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
await this.amqpConnection.request({
|
||||||
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
payload: { taskId: message.taskId, status: message.status },
|
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
||||||
});
|
payload: { taskId: message.taskId, status: message.status },
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
console.log(error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,26 +4,15 @@ import { getRepositoryToken } from '@nestjs/typeorm';
|
|||||||
import { PipelineTask } from './pipeline-task.entity';
|
import { PipelineTask } from './pipeline-task.entity';
|
||||||
import { Pipeline } from '../pipelines/pipeline.entity';
|
import { Pipeline } from '../pipelines/pipeline.entity';
|
||||||
import { Repository } from 'typeorm';
|
import { Repository } from 'typeorm';
|
||||||
import { Queue } from 'bull';
|
|
||||||
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
|
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
|
||||||
|
import { PipelineTaskFlushService } from './pipeline-task-flush.service';
|
||||||
|
import { getLoggerToken, PinoLogger } from 'nestjs-pino';
|
||||||
|
|
||||||
describe('PipelineTasksService', () => {
|
describe('PipelineTasksService', () => {
|
||||||
let service: PipelineTasksService;
|
let service: PipelineTasksService;
|
||||||
let module: TestingModule;
|
let module: TestingModule;
|
||||||
let taskRepository: Repository<PipelineTask>;
|
let taskRepository: Repository<PipelineTask>;
|
||||||
let pipelineRepository: Repository<Pipeline>;
|
let pipelineRepository: Repository<Pipeline>;
|
||||||
const getBasePipeline = () =>
|
|
||||||
({
|
|
||||||
id: 'test',
|
|
||||||
name: '测试流水线',
|
|
||||||
branch: 'master',
|
|
||||||
workUnitMetadata: {},
|
|
||||||
project: {
|
|
||||||
id: 'test-project',
|
|
||||||
},
|
|
||||||
} as Pipeline);
|
|
||||||
let redisClient;
|
|
||||||
let taskQueue: Queue;
|
|
||||||
|
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
module = await Test.createTestingModule({
|
module = await Test.createTestingModule({
|
||||||
@ -41,6 +30,14 @@ describe('PipelineTasksService', () => {
|
|||||||
provide: AmqpConnection,
|
provide: AmqpConnection,
|
||||||
useValue: {},
|
useValue: {},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
provide: PipelineTaskFlushService,
|
||||||
|
useValue: {},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: getLoggerToken(PipelineTasksService.name),
|
||||||
|
useValue: new PinoLogger({}),
|
||||||
|
},
|
||||||
],
|
],
|
||||||
}).compile();
|
}).compile();
|
||||||
|
|
||||||
|
@ -5,13 +5,10 @@ import { Repository } from 'typeorm';
|
|||||||
import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input';
|
import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input';
|
||||||
import { Pipeline } from '../pipelines/pipeline.entity';
|
import { Pipeline } from '../pipelines/pipeline.entity';
|
||||||
import debug from 'debug';
|
import debug from 'debug';
|
||||||
import {
|
import { AmqpConnection, RabbitRPC } from '@golevelup/nestjs-rabbitmq';
|
||||||
AmqpConnection,
|
|
||||||
RabbitRPC,
|
|
||||||
RabbitSubscribe,
|
|
||||||
} from '@golevelup/nestjs-rabbitmq';
|
|
||||||
import {
|
import {
|
||||||
EXCHANGE_PIPELINE_TASK_FANOUT,
|
EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||||
|
EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
QUEUE_PIPELINE_TASK_DONE,
|
QUEUE_PIPELINE_TASK_DONE,
|
||||||
ROUTE_PIPELINE_TASK_DONE,
|
ROUTE_PIPELINE_TASK_DONE,
|
||||||
} from './pipeline-tasks.constants';
|
} from './pipeline-tasks.constants';
|
||||||
@ -79,7 +76,7 @@ export class PipelineTasksService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@RabbitRPC({
|
@RabbitRPC({
|
||||||
exchange: EXCHANGE_PIPELINE_TASK_FANOUT,
|
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
||||||
queue: QUEUE_PIPELINE_TASK_DONE,
|
queue: QUEUE_PIPELINE_TASK_DONE,
|
||||||
queueOptions: {
|
queueOptions: {
|
||||||
@ -87,13 +84,13 @@ export class PipelineTasksService {
|
|||||||
durable: true,
|
durable: true,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
async updateByEvent({ id }: { id: string }) {
|
async updateByEvent({ taskId }: { taskId: string }) {
|
||||||
try {
|
try {
|
||||||
const [events, task] = await Promise.all([
|
const [events, task] = await Promise.all([
|
||||||
this.eventFlushService.read(id),
|
this.eventFlushService.read(taskId),
|
||||||
this.findTaskById(id),
|
this.findTaskById(taskId),
|
||||||
]);
|
]);
|
||||||
this.logger.info('[updateByEvent] start. taskId: %s', id);
|
this.logger.info('[updateByEvent] start. taskId: %s', taskId);
|
||||||
|
|
||||||
for (const event of events) {
|
for (const event of events) {
|
||||||
if (isNil(event.unit)) {
|
if (isNil(event.unit)) {
|
||||||
@ -130,10 +127,15 @@ export class PipelineTasksService {
|
|||||||
l.status = event.status;
|
l.status = event.status;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
await this.repository.update({ id }, task);
|
await this.repository.update({ id: taskId }, task);
|
||||||
this.logger.info('[updateByEvent] success. taskId: %s', id);
|
return task;
|
||||||
|
this.logger.info('[updateByEvent] success. taskId: %s', taskId);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error({ error }, '[updateByEvent] failed. taskId: %s', id);
|
this.logger.error(
|
||||||
|
{ error },
|
||||||
|
'[updateByEvent] failed. taskId: %s',
|
||||||
|
taskId,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user