Compare commits

..

No commits in common. "37f8ae19be2f123fcb3473dfa4dc34d96fed0468" and "646f68d2989760c66d1224c26d23a6d8079a34d7" have entirely different histories.

4 changed files with 26 additions and 34 deletions

View File

@ -18,7 +18,6 @@ describe('PipelineTaskFlushService', () => {
const redisClient = { const redisClient = {
rpush: jest.fn(() => Promise.resolve()), rpush: jest.fn(() => Promise.resolve()),
lrange: jest.fn(() => Promise.resolve()), lrange: jest.fn(() => Promise.resolve()),
expire: jest.fn(() => Promise.resolve()),
}; };
const module: TestingModule = await Test.createTestingModule({ const module: TestingModule = await Test.createTestingModule({
providers: [ providers: [

View File

@ -37,15 +37,11 @@ export class PipelineTaskFlushService {
await client.rpush(this.getKey(message.taskId), JSON.stringify(message)); await client.rpush(this.getKey(message.taskId), JSON.stringify(message));
await client.expire(this.getKey(message.taskId), 600); // ten minutes await client.expire(this.getKey(message.taskId), 600); // ten minutes
if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) { if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) {
try { this.amqpConnection.request({
await this.amqpConnection.request({ exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
exchange: EXCHANGE_PIPELINE_TASK_TOPIC, routingKey: ROUTE_PIPELINE_TASK_DONE,
routingKey: ROUTE_PIPELINE_TASK_DONE, payload: { taskId: message.taskId, status: message.status },
payload: { taskId: message.taskId, status: message.status }, });
});
} catch (error) {
console.log(error);
}
} }
} }

View File

@ -4,15 +4,26 @@ import { getRepositoryToken } from '@nestjs/typeorm';
import { PipelineTask } from './pipeline-task.entity'; import { PipelineTask } from './pipeline-task.entity';
import { Pipeline } from '../pipelines/pipeline.entity'; import { Pipeline } from '../pipelines/pipeline.entity';
import { Repository } from 'typeorm'; import { Repository } from 'typeorm';
import { Queue } from 'bull';
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq'; import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { PipelineTaskFlushService } from './pipeline-task-flush.service';
import { getLoggerToken, PinoLogger } from 'nestjs-pino';
describe('PipelineTasksService', () => { describe('PipelineTasksService', () => {
let service: PipelineTasksService; let service: PipelineTasksService;
let module: TestingModule; let module: TestingModule;
let taskRepository: Repository<PipelineTask>; let taskRepository: Repository<PipelineTask>;
let pipelineRepository: Repository<Pipeline>; let pipelineRepository: Repository<Pipeline>;
const getBasePipeline = () =>
({
id: 'test',
name: '测试流水线',
branch: 'master',
workUnitMetadata: {},
project: {
id: 'test-project',
},
} as Pipeline);
let redisClient;
let taskQueue: Queue;
beforeEach(async () => { beforeEach(async () => {
module = await Test.createTestingModule({ module = await Test.createTestingModule({
@ -30,14 +41,6 @@ describe('PipelineTasksService', () => {
provide: AmqpConnection, provide: AmqpConnection,
useValue: {}, useValue: {},
}, },
{
provide: PipelineTaskFlushService,
useValue: {},
},
{
provide: getLoggerToken(PipelineTasksService.name),
useValue: new PinoLogger({}),
},
], ],
}).compile(); }).compile();

View File

@ -8,7 +8,6 @@ import debug from 'debug';
import { AmqpConnection, RabbitRPC } from '@golevelup/nestjs-rabbitmq'; import { AmqpConnection, RabbitRPC } from '@golevelup/nestjs-rabbitmq';
import { import {
EXCHANGE_PIPELINE_TASK_FANOUT, EXCHANGE_PIPELINE_TASK_FANOUT,
EXCHANGE_PIPELINE_TASK_TOPIC,
QUEUE_PIPELINE_TASK_DONE, QUEUE_PIPELINE_TASK_DONE,
ROUTE_PIPELINE_TASK_DONE, ROUTE_PIPELINE_TASK_DONE,
} from './pipeline-tasks.constants'; } from './pipeline-tasks.constants';
@ -76,7 +75,7 @@ export class PipelineTasksService {
} }
@RabbitRPC({ @RabbitRPC({
exchange: EXCHANGE_PIPELINE_TASK_TOPIC, exchange: EXCHANGE_PIPELINE_TASK_FANOUT,
routingKey: ROUTE_PIPELINE_TASK_DONE, routingKey: ROUTE_PIPELINE_TASK_DONE,
queue: QUEUE_PIPELINE_TASK_DONE, queue: QUEUE_PIPELINE_TASK_DONE,
queueOptions: { queueOptions: {
@ -84,13 +83,13 @@ export class PipelineTasksService {
durable: true, durable: true,
}, },
}) })
async updateByEvent({ taskId }: { taskId: string }) { async updateByEvent({ id }: { id: string }) {
try { try {
const [events, task] = await Promise.all([ const [events, task] = await Promise.all([
this.eventFlushService.read(taskId), this.eventFlushService.read(id),
this.findTaskById(taskId), this.findTaskById(id),
]); ]);
this.logger.info('[updateByEvent] start. taskId: %s', taskId); this.logger.info('[updateByEvent] start. taskId: %s', id);
for (const event of events) { for (const event of events) {
if (isNil(event.unit)) { if (isNil(event.unit)) {
@ -127,15 +126,10 @@ export class PipelineTasksService {
l.status = event.status; l.status = event.status;
} }
} }
await this.repository.update({ id: taskId }, task); await this.repository.update({ id }, task);
return task; this.logger.info('[updateByEvent] success. taskId: %s', id);
this.logger.info('[updateByEvent] success. taskId: %s', taskId);
} catch (error) { } catch (error) {
this.logger.error( this.logger.error({ error }, '[updateByEvent] failed. taskId: %s', id);
{ error },
'[updateByEvent] failed. taskId: %s',
taskId,
);
} }
} }
} }