feat(pipeline-tasks): 强制停止任务接口。
This commit is contained in:
		@@ -50,17 +50,20 @@ describe('PipelineTaskFlushService', () => {
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  describe('write', () => {
 | 
			
		||||
    const amqpMsg = {
 | 
			
		||||
      properties: { headers: { sender: 'test' } },
 | 
			
		||||
    } as any;
 | 
			
		||||
    it('normal', async () => {
 | 
			
		||||
      const testEvent = new PipelineTaskEvent();
 | 
			
		||||
      testEvent.taskId = 'test';
 | 
			
		||||
      testEvent.status = TaskStatuses.working;
 | 
			
		||||
      const rpush = jest.spyOn(redisService.getClient(), 'rpush');
 | 
			
		||||
      const request = jest.spyOn(amqpConnection, 'request');
 | 
			
		||||
      await service.write(testEvent);
 | 
			
		||||
      await service.write(testEvent, amqpMsg);
 | 
			
		||||
      expect(rpush).toBeCalledTimes(1);
 | 
			
		||||
      expect(rpush.mock.calls[0][0]).toEqual('p-task:log:test');
 | 
			
		||||
      expect(rpush.mock.calls[0][1]).toEqual(JSON.stringify(testEvent));
 | 
			
		||||
      expect(request).toBeCalledTimes(0);
 | 
			
		||||
      expect(request).toBeCalledTimes(1);
 | 
			
		||||
    });
 | 
			
		||||
    it('event for which task done', async () => {
 | 
			
		||||
      const testEvent = new PipelineTaskEvent();
 | 
			
		||||
@@ -68,13 +71,17 @@ describe('PipelineTaskFlushService', () => {
 | 
			
		||||
      testEvent.status = TaskStatuses.success;
 | 
			
		||||
      const rpush = jest.spyOn(redisService.getClient(), 'rpush');
 | 
			
		||||
      const request = jest.spyOn(amqpConnection, 'request');
 | 
			
		||||
      await service.write(testEvent);
 | 
			
		||||
      await service.write(testEvent, amqpMsg);
 | 
			
		||||
      expect(rpush).toBeCalledTimes(1);
 | 
			
		||||
      expect(request).toBeCalledTimes(1);
 | 
			
		||||
      expect(request.mock.calls[0][0]).toMatchObject({
 | 
			
		||||
        exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
 | 
			
		||||
        routingKey: ROUTE_PIPELINE_TASK_DONE,
 | 
			
		||||
        payload: { taskId: 'test', status: TaskStatuses.success },
 | 
			
		||||
        payload: {
 | 
			
		||||
          taskId: 'test',
 | 
			
		||||
          status: TaskStatuses.success,
 | 
			
		||||
          runOn: 'test',
 | 
			
		||||
        },
 | 
			
		||||
      });
 | 
			
		||||
    });
 | 
			
		||||
  });
 | 
			
		||||
 
 | 
			
		||||
@@ -5,7 +5,11 @@ import { ApplicationException } from '../commons/exceptions/application.exceptio
 | 
			
		||||
import { PipelineUnits } from './enums/pipeline-units.enum';
 | 
			
		||||
import { TaskStatuses } from './enums/task-statuses.enum';
 | 
			
		||||
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
 | 
			
		||||
import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
 | 
			
		||||
import {
 | 
			
		||||
  AmqpConnection,
 | 
			
		||||
  RabbitRPC,
 | 
			
		||||
  RabbitSubscribe,
 | 
			
		||||
} from '@golevelup/nestjs-rabbitmq';
 | 
			
		||||
import { PipelineTaskEvent } from './models/pipeline-task-event';
 | 
			
		||||
import { last } from 'ramda';
 | 
			
		||||
import { Inject } from '@nestjs/common';
 | 
			
		||||
@@ -28,6 +32,7 @@ type Spawn = typeof spawn;
 | 
			
		||||
 | 
			
		||||
export class PipelineTaskRunner {
 | 
			
		||||
  readonly processes = new Map<string, ChildProcessWithoutNullStreams>();
 | 
			
		||||
  readonly stopTaskIds = new Set<string>();
 | 
			
		||||
 | 
			
		||||
  constructor(
 | 
			
		||||
    private readonly reposService: ReposService,
 | 
			
		||||
@@ -50,7 +55,7 @@ export class PipelineTaskRunner {
 | 
			
		||||
      this.logger.error({ task, err }, err.message);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  @RabbitSubscribe({
 | 
			
		||||
  @RabbitRPC({
 | 
			
		||||
    exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
 | 
			
		||||
    routingKey: getSelfInstanceRouteKey(ROUTE_PIPELINE_TASK_KILL),
 | 
			
		||||
    queue: getSelfInstanceQueueKey(QUEUE_PIPELINE_TASK_KILL),
 | 
			
		||||
@@ -61,12 +66,16 @@ export class PipelineTaskRunner {
 | 
			
		||||
  })
 | 
			
		||||
  async onStopTask(task: PipelineTask) {
 | 
			
		||||
    this.logger.info({ task }, 'on stop task [%s].', task.id);
 | 
			
		||||
    this.stopTaskIds.add(task.id);
 | 
			
		||||
    const process = this.processes.get(task.id);
 | 
			
		||||
    if (process) {
 | 
			
		||||
      this.logger.info({ task }, 'send signal SIGINT to child process.');
 | 
			
		||||
      process.kill('SIGINT');
 | 
			
		||||
 | 
			
		||||
      setTimeout(() => {
 | 
			
		||||
        setTimeout(() => {
 | 
			
		||||
          this.stopTaskIds.delete(task.id);
 | 
			
		||||
        }, 10_000);
 | 
			
		||||
        if (process === this.processes.get(task.id)) {
 | 
			
		||||
          this.logger.info({ task }, 'send signal SIGKILL to child process.');
 | 
			
		||||
          process.kill('SIGKILL');
 | 
			
		||||
@@ -82,6 +91,7 @@ export class PipelineTaskRunner {
 | 
			
		||||
    } else {
 | 
			
		||||
      this.logger.info({ task }, 'child process is not running.');
 | 
			
		||||
    }
 | 
			
		||||
    return true;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async doTask(task: PipelineTask) {
 | 
			
		||||
@@ -151,6 +161,9 @@ export class PipelineTaskRunner {
 | 
			
		||||
    try {
 | 
			
		||||
      for (const script of scripts) {
 | 
			
		||||
        this.logger.debug('begin runScript %s', script);
 | 
			
		||||
        if (this.stopTaskIds.has(task.id)) {
 | 
			
		||||
          throw new ApplicationException('Task is be KILLED');
 | 
			
		||||
        }
 | 
			
		||||
        await this.runScript(script, workspaceRoot, task, unit);
 | 
			
		||||
        this.logger.debug('end runScript %s', script);
 | 
			
		||||
      }
 | 
			
		||||
@@ -278,6 +291,9 @@ export class PipelineTaskRunner {
 | 
			
		||||
        if (code === 0) {
 | 
			
		||||
          return resolve();
 | 
			
		||||
        }
 | 
			
		||||
        if (this.stopTaskIds.has(task.id)) {
 | 
			
		||||
          throw reject(new ApplicationException('Task is be KILLED'));
 | 
			
		||||
        }
 | 
			
		||||
        return reject(new ApplicationException('exec script failed'));
 | 
			
		||||
      });
 | 
			
		||||
    });
 | 
			
		||||
 
 | 
			
		||||
@@ -55,5 +55,7 @@ export class PipelineTasksResolver {
 | 
			
		||||
  @Mutation(() => Boolean)
 | 
			
		||||
  async stopPipelineTask(@Args('id') id: string) {
 | 
			
		||||
    const task = await this.service.findTaskById(id);
 | 
			
		||||
    await this.service.stopTask(task);
 | 
			
		||||
    return true;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,4 @@
 | 
			
		||||
import { Injectable } from '@nestjs/common';
 | 
			
		||||
import { BadRequestException, Injectable } from '@nestjs/common';
 | 
			
		||||
import { InjectRepository } from '@nestjs/typeorm';
 | 
			
		||||
import { PipelineTask } from './pipeline-task.entity';
 | 
			
		||||
import { Repository } from 'typeorm';
 | 
			
		||||
@@ -16,6 +16,8 @@ import { find, isNil, propEq } from 'ramda';
 | 
			
		||||
import { PipelineTaskLogs } from './models/pipeline-task-logs.model';
 | 
			
		||||
import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum';
 | 
			
		||||
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
 | 
			
		||||
import { getAppInstanceRouteKey } from '../commons/utils/rabbit-mq';
 | 
			
		||||
import { ROUTE_PIPELINE_TASK_KILL } from './pipeline-tasks.constants';
 | 
			
		||||
 | 
			
		||||
const log = debug('fennec:pipeline-tasks:service');
 | 
			
		||||
 | 
			
		||||
@@ -138,4 +140,17 @@ export class PipelineTasksService {
 | 
			
		||||
      );
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async stopTask(task: PipelineTask) {
 | 
			
		||||
    if (isNil(task.runOn)) {
 | 
			
		||||
      throw new BadRequestException(
 | 
			
		||||
        "the task have not running instance on database. field 'runOn' is nil",
 | 
			
		||||
      );
 | 
			
		||||
    }
 | 
			
		||||
    await this.amqpConnection.request({
 | 
			
		||||
      exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
 | 
			
		||||
      routingKey: getAppInstanceRouteKey(ROUTE_PIPELINE_TASK_KILL, task.runOn),
 | 
			
		||||
      payload: task,
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user