Compare commits
No commits in common. "37f8ae19be2f123fcb3473dfa4dc34d96fed0468" and "646f68d2989760c66d1224c26d23a6d8079a34d7" have entirely different histories.
37f8ae19be
...
646f68d298
@ -18,7 +18,6 @@ describe('PipelineTaskFlushService', () => {
|
||||
const redisClient = {
|
||||
rpush: jest.fn(() => Promise.resolve()),
|
||||
lrange: jest.fn(() => Promise.resolve()),
|
||||
expire: jest.fn(() => Promise.resolve()),
|
||||
};
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
|
@ -37,15 +37,11 @@ export class PipelineTaskFlushService {
|
||||
await client.rpush(this.getKey(message.taskId), JSON.stringify(message));
|
||||
await client.expire(this.getKey(message.taskId), 600); // ten minutes
|
||||
if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) {
|
||||
try {
|
||||
await this.amqpConnection.request({
|
||||
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
||||
payload: { taskId: message.taskId, status: message.status },
|
||||
});
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
}
|
||||
this.amqpConnection.request({
|
||||
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
||||
payload: { taskId: message.taskId, status: message.status },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4,15 +4,26 @@ import { getRepositoryToken } from '@nestjs/typeorm';
|
||||
import { PipelineTask } from './pipeline-task.entity';
|
||||
import { Pipeline } from '../pipelines/pipeline.entity';
|
||||
import { Repository } from 'typeorm';
|
||||
import { Queue } from 'bull';
|
||||
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
|
||||
import { PipelineTaskFlushService } from './pipeline-task-flush.service';
|
||||
import { getLoggerToken, PinoLogger } from 'nestjs-pino';
|
||||
|
||||
describe('PipelineTasksService', () => {
|
||||
let service: PipelineTasksService;
|
||||
let module: TestingModule;
|
||||
let taskRepository: Repository<PipelineTask>;
|
||||
let pipelineRepository: Repository<Pipeline>;
|
||||
const getBasePipeline = () =>
|
||||
({
|
||||
id: 'test',
|
||||
name: '测试流水线',
|
||||
branch: 'master',
|
||||
workUnitMetadata: {},
|
||||
project: {
|
||||
id: 'test-project',
|
||||
},
|
||||
} as Pipeline);
|
||||
let redisClient;
|
||||
let taskQueue: Queue;
|
||||
|
||||
beforeEach(async () => {
|
||||
module = await Test.createTestingModule({
|
||||
@ -30,14 +41,6 @@ describe('PipelineTasksService', () => {
|
||||
provide: AmqpConnection,
|
||||
useValue: {},
|
||||
},
|
||||
{
|
||||
provide: PipelineTaskFlushService,
|
||||
useValue: {},
|
||||
},
|
||||
{
|
||||
provide: getLoggerToken(PipelineTasksService.name),
|
||||
useValue: new PinoLogger({}),
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
|
@ -8,7 +8,6 @@ import debug from 'debug';
|
||||
import { AmqpConnection, RabbitRPC } from '@golevelup/nestjs-rabbitmq';
|
||||
import {
|
||||
EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||
EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||
QUEUE_PIPELINE_TASK_DONE,
|
||||
ROUTE_PIPELINE_TASK_DONE,
|
||||
} from './pipeline-tasks.constants';
|
||||
@ -76,7 +75,7 @@ export class PipelineTasksService {
|
||||
}
|
||||
|
||||
@RabbitRPC({
|
||||
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||
exchange: EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
||||
queue: QUEUE_PIPELINE_TASK_DONE,
|
||||
queueOptions: {
|
||||
@ -84,13 +83,13 @@ export class PipelineTasksService {
|
||||
durable: true,
|
||||
},
|
||||
})
|
||||
async updateByEvent({ taskId }: { taskId: string }) {
|
||||
async updateByEvent({ id }: { id: string }) {
|
||||
try {
|
||||
const [events, task] = await Promise.all([
|
||||
this.eventFlushService.read(taskId),
|
||||
this.findTaskById(taskId),
|
||||
this.eventFlushService.read(id),
|
||||
this.findTaskById(id),
|
||||
]);
|
||||
this.logger.info('[updateByEvent] start. taskId: %s', taskId);
|
||||
this.logger.info('[updateByEvent] start. taskId: %s', id);
|
||||
|
||||
for (const event of events) {
|
||||
if (isNil(event.unit)) {
|
||||
@ -127,15 +126,10 @@ export class PipelineTasksService {
|
||||
l.status = event.status;
|
||||
}
|
||||
}
|
||||
await this.repository.update({ id: taskId }, task);
|
||||
return task;
|
||||
this.logger.info('[updateByEvent] success. taskId: %s', taskId);
|
||||
await this.repository.update({ id }, task);
|
||||
this.logger.info('[updateByEvent] success. taskId: %s', id);
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
{ error },
|
||||
'[updateByEvent] failed. taskId: %s',
|
||||
taskId,
|
||||
);
|
||||
this.logger.error({ error }, '[updateByEvent] failed. taskId: %s', id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user