diff --git a/src/pipeline-tasks/pipeline-task-flush.service.spec.ts b/src/pipeline-tasks/pipeline-task-flush.service.spec.ts index fa2a26f..273f6f7 100644 --- a/src/pipeline-tasks/pipeline-task-flush.service.spec.ts +++ b/src/pipeline-tasks/pipeline-task-flush.service.spec.ts @@ -50,17 +50,20 @@ describe('PipelineTaskFlushService', () => { }); describe('write', () => { + const amqpMsg = { + properties: { headers: { sender: 'test' } }, + } as any; it('normal', async () => { const testEvent = new PipelineTaskEvent(); testEvent.taskId = 'test'; testEvent.status = TaskStatuses.working; const rpush = jest.spyOn(redisService.getClient(), 'rpush'); const request = jest.spyOn(amqpConnection, 'request'); - await service.write(testEvent); + await service.write(testEvent, amqpMsg); expect(rpush).toBeCalledTimes(1); expect(rpush.mock.calls[0][0]).toEqual('p-task:log:test'); expect(rpush.mock.calls[0][1]).toEqual(JSON.stringify(testEvent)); - expect(request).toBeCalledTimes(0); + expect(request).toBeCalledTimes(1); }); it('event for which task done', async () => { const testEvent = new PipelineTaskEvent(); @@ -68,13 +71,17 @@ describe('PipelineTaskFlushService', () => { testEvent.status = TaskStatuses.success; const rpush = jest.spyOn(redisService.getClient(), 'rpush'); const request = jest.spyOn(amqpConnection, 'request'); - await service.write(testEvent); + await service.write(testEvent, amqpMsg); expect(rpush).toBeCalledTimes(1); expect(request).toBeCalledTimes(1); expect(request.mock.calls[0][0]).toMatchObject({ exchange: EXCHANGE_PIPELINE_TASK_TOPIC, routingKey: ROUTE_PIPELINE_TASK_DONE, - payload: { taskId: 'test', status: TaskStatuses.success }, + payload: { + taskId: 'test', + status: TaskStatuses.success, + runOn: 'test', + }, }); }); }); diff --git a/src/pipeline-tasks/pipeline-task.runner.ts b/src/pipeline-tasks/pipeline-task.runner.ts index cfbf0fe..742e433 100644 --- a/src/pipeline-tasks/pipeline-task.runner.ts +++ b/src/pipeline-tasks/pipeline-task.runner.ts @@ -5,7 +5,11 @@ import { ApplicationException } from '../commons/exceptions/application.exceptio import { PipelineUnits } from './enums/pipeline-units.enum'; import { TaskStatuses } from './enums/task-statuses.enum'; import { InjectPinoLogger, PinoLogger } from 'nestjs-pino'; -import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; +import { + AmqpConnection, + RabbitRPC, + RabbitSubscribe, +} from '@golevelup/nestjs-rabbitmq'; import { PipelineTaskEvent } from './models/pipeline-task-event'; import { last } from 'ramda'; import { Inject } from '@nestjs/common'; @@ -28,6 +32,7 @@ type Spawn = typeof spawn; export class PipelineTaskRunner { readonly processes = new Map(); + readonly stopTaskIds = new Set(); constructor( private readonly reposService: ReposService, @@ -50,7 +55,7 @@ export class PipelineTaskRunner { this.logger.error({ task, err }, err.message); } } - @RabbitSubscribe({ + @RabbitRPC({ exchange: EXCHANGE_PIPELINE_TASK_TOPIC, routingKey: getSelfInstanceRouteKey(ROUTE_PIPELINE_TASK_KILL), queue: getSelfInstanceQueueKey(QUEUE_PIPELINE_TASK_KILL), @@ -61,12 +66,16 @@ export class PipelineTaskRunner { }) async onStopTask(task: PipelineTask) { this.logger.info({ task }, 'on stop task [%s].', task.id); + this.stopTaskIds.add(task.id); const process = this.processes.get(task.id); if (process) { this.logger.info({ task }, 'send signal SIGINT to child process.'); process.kill('SIGINT'); setTimeout(() => { + setTimeout(() => { + this.stopTaskIds.delete(task.id); + }, 10_000); if (process === this.processes.get(task.id)) { this.logger.info({ task }, 'send signal SIGKILL to child process.'); process.kill('SIGKILL'); @@ -82,6 +91,7 @@ export class PipelineTaskRunner { } else { this.logger.info({ task }, 'child process is not running.'); } + return true; } async doTask(task: PipelineTask) { @@ -151,6 +161,9 @@ export class PipelineTaskRunner { try { for (const script of scripts) { this.logger.debug('begin runScript %s', script); + if (this.stopTaskIds.has(task.id)) { + throw new ApplicationException('Task is be KILLED'); + } await this.runScript(script, workspaceRoot, task, unit); this.logger.debug('end runScript %s', script); } @@ -278,6 +291,9 @@ export class PipelineTaskRunner { if (code === 0) { return resolve(); } + if (this.stopTaskIds.has(task.id)) { + throw reject(new ApplicationException('Task is be KILLED')); + } return reject(new ApplicationException('exec script failed')); }); }); diff --git a/src/pipeline-tasks/pipeline-tasks.resolver.ts b/src/pipeline-tasks/pipeline-tasks.resolver.ts index 079f4aa..5702282 100644 --- a/src/pipeline-tasks/pipeline-tasks.resolver.ts +++ b/src/pipeline-tasks/pipeline-tasks.resolver.ts @@ -55,5 +55,7 @@ export class PipelineTasksResolver { @Mutation(() => Boolean) async stopPipelineTask(@Args('id') id: string) { const task = await this.service.findTaskById(id); + await this.service.stopTask(task); + return true; } } diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index 5a04ebe..b3fa1ed 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { BadRequestException, Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { PipelineTask } from './pipeline-task.entity'; import { Repository } from 'typeorm'; @@ -16,6 +16,8 @@ import { find, isNil, propEq } from 'ramda'; import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum'; import { InjectPinoLogger, PinoLogger } from 'nestjs-pino'; +import { getAppInstanceRouteKey } from '../commons/utils/rabbit-mq'; +import { ROUTE_PIPELINE_TASK_KILL } from './pipeline-tasks.constants'; const log = debug('fennec:pipeline-tasks:service'); @@ -138,4 +140,17 @@ export class PipelineTasksService { ); } } + + async stopTask(task: PipelineTask) { + if (isNil(task.runOn)) { + throw new BadRequestException( + "the task have not running instance on database. field 'runOn' is nil", + ); + } + await this.amqpConnection.request({ + exchange: EXCHANGE_PIPELINE_TASK_TOPIC, + routingKey: getAppInstanceRouteKey(ROUTE_PIPELINE_TASK_KILL, task.runOn), + payload: task, + }); + } }