Compare commits

...

3 Commits

9 changed files with 115 additions and 18 deletions

29
package-lock.json generated
View File

@ -16,6 +16,7 @@
"@nestjs/graphql": "^7.9.8", "@nestjs/graphql": "^7.9.8",
"@nestjs/platform-express": "^7.5.1", "@nestjs/platform-express": "^7.5.1",
"@nestjs/typeorm": "^7.1.5", "@nestjs/typeorm": "^7.1.5",
"@types/amqplib": "^0.8.0",
"@types/bull": "^3.15.0", "@types/bull": "^3.15.0",
"@types/ramda": "^0.27.38", "@types/ramda": "^0.27.38",
"apollo-server-express": "^2.19.2", "apollo-server-express": "^2.19.2",
@ -2758,6 +2759,15 @@
"@types/node": "*" "@types/node": "*"
} }
}, },
"node_modules/@types/amqplib": {
"version": "0.8.0",
"resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.8.0.tgz",
"integrity": "sha512-RDojJ8WACs43HIfWSQGnAVwgNzjMGx4YMNeW7jptgAFgkG1EpNQqts+cND5HYWdYgTM58b+RHe675b0i4A9WpQ==",
"dependencies": {
"@types/bluebird": "*",
"@types/node": "*"
}
},
"node_modules/@types/babel__core": { "node_modules/@types/babel__core": {
"version": "7.1.14", "version": "7.1.14",
"resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.1.14.tgz", "resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.1.14.tgz",
@ -2799,6 +2809,11 @@
"@babel/types": "^7.3.0" "@babel/types": "^7.3.0"
} }
}, },
"node_modules/@types/bluebird": {
"version": "3.5.35",
"resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.35.tgz",
"integrity": "sha512-2WeeXK7BuQo7yPI4WGOBum90SzF/f8rqlvpaXx4rjeTmNssGRDHWf7fgDUH90xMB3sUOu716fUK5d+OVx0+ncQ=="
},
"node_modules/@types/body-parser": { "node_modules/@types/body-parser": {
"version": "1.19.0", "version": "1.19.0",
"resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.0.tgz", "resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.0.tgz",
@ -17553,6 +17568,15 @@
"@types/node": "*" "@types/node": "*"
} }
}, },
"@types/amqplib": {
"version": "0.8.0",
"resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.8.0.tgz",
"integrity": "sha512-RDojJ8WACs43HIfWSQGnAVwgNzjMGx4YMNeW7jptgAFgkG1EpNQqts+cND5HYWdYgTM58b+RHe675b0i4A9WpQ==",
"requires": {
"@types/bluebird": "*",
"@types/node": "*"
}
},
"@types/babel__core": { "@types/babel__core": {
"version": "7.1.14", "version": "7.1.14",
"resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.1.14.tgz", "resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.1.14.tgz",
@ -17594,6 +17618,11 @@
"@babel/types": "^7.3.0" "@babel/types": "^7.3.0"
} }
}, },
"@types/bluebird": {
"version": "3.5.35",
"resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.35.tgz",
"integrity": "sha512-2WeeXK7BuQo7yPI4WGOBum90SzF/f8rqlvpaXx4rjeTmNssGRDHWf7fgDUH90xMB3sUOu716fUK5d+OVx0+ncQ=="
},
"@types/body-parser": { "@types/body-parser": {
"version": "1.19.0", "version": "1.19.0",
"resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.0.tgz", "resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.0.tgz",

View File

@ -29,6 +29,7 @@
"@nestjs/graphql": "^7.9.8", "@nestjs/graphql": "^7.9.8",
"@nestjs/platform-express": "^7.5.1", "@nestjs/platform-express": "^7.5.1",
"@nestjs/typeorm": "^7.1.5", "@nestjs/typeorm": "^7.1.5",
"@types/amqplib": "^0.8.0",
"@types/bull": "^3.15.0", "@types/bull": "^3.15.0",
"@types/ramda": "^0.27.38", "@types/ramda": "^0.27.38",
"apollo-server-express": "^2.19.2", "apollo-server-express": "^2.19.2",

View File

@ -50,17 +50,20 @@ describe('PipelineTaskFlushService', () => {
}); });
describe('write', () => { describe('write', () => {
const amqpMsg = {
properties: { headers: { sender: 'test' } },
} as any;
it('normal', async () => { it('normal', async () => {
const testEvent = new PipelineTaskEvent(); const testEvent = new PipelineTaskEvent();
testEvent.taskId = 'test'; testEvent.taskId = 'test';
testEvent.status = TaskStatuses.working; testEvent.status = TaskStatuses.working;
const rpush = jest.spyOn(redisService.getClient(), 'rpush'); const rpush = jest.spyOn(redisService.getClient(), 'rpush');
const request = jest.spyOn(amqpConnection, 'request'); const request = jest.spyOn(amqpConnection, 'request');
await service.write(testEvent); await service.write(testEvent, amqpMsg);
expect(rpush).toBeCalledTimes(1); expect(rpush).toBeCalledTimes(1);
expect(rpush.mock.calls[0][0]).toEqual('p-task:log:test'); expect(rpush.mock.calls[0][0]).toEqual('p-task:log:test');
expect(rpush.mock.calls[0][1]).toEqual(JSON.stringify(testEvent)); expect(rpush.mock.calls[0][1]).toEqual(JSON.stringify(testEvent));
expect(request).toBeCalledTimes(0); expect(request).toBeCalledTimes(1);
}); });
it('event for which task done', async () => { it('event for which task done', async () => {
const testEvent = new PipelineTaskEvent(); const testEvent = new PipelineTaskEvent();
@ -68,13 +71,17 @@ describe('PipelineTaskFlushService', () => {
testEvent.status = TaskStatuses.success; testEvent.status = TaskStatuses.success;
const rpush = jest.spyOn(redisService.getClient(), 'rpush'); const rpush = jest.spyOn(redisService.getClient(), 'rpush');
const request = jest.spyOn(amqpConnection, 'request'); const request = jest.spyOn(amqpConnection, 'request');
await service.write(testEvent); await service.write(testEvent, amqpMsg);
expect(rpush).toBeCalledTimes(1); expect(rpush).toBeCalledTimes(1);
expect(request).toBeCalledTimes(1); expect(request).toBeCalledTimes(1);
expect(request.mock.calls[0][0]).toMatchObject({ expect(request.mock.calls[0][0]).toMatchObject({
exchange: EXCHANGE_PIPELINE_TASK_TOPIC, exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
routingKey: ROUTE_PIPELINE_TASK_DONE, routingKey: ROUTE_PIPELINE_TASK_DONE,
payload: { taskId: 'test', status: TaskStatuses.success }, payload: {
taskId: 'test',
status: TaskStatuses.success,
runOn: 'test',
},
}); });
}); });
}); });

View File

@ -1,10 +1,10 @@
import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { ConsumeMessage } from 'amqplib';
import { deserialize } from 'class-transformer'; import { deserialize } from 'class-transformer';
import { RedisService } from 'nestjs-redis'; import { RedisService } from 'nestjs-redis';
import { isNil } from 'ramda'; import { isNil } from 'ramda';
import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq'; import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq';
import { terminalTaskStatuses } from './enums/task-statuses.enum';
import { PipelineTaskEvent } from './models/pipeline-task-event'; import { PipelineTaskEvent } from './models/pipeline-task-event';
import { import {
EXCHANGE_PIPELINE_TASK_TOPIC, EXCHANGE_PIPELINE_TASK_TOPIC,
@ -32,16 +32,20 @@ export class PipelineTaskFlushService {
durable: true, durable: true,
}, },
}) })
async write(message: PipelineTaskEvent) { async write(message: PipelineTaskEvent, amqpMsg: ConsumeMessage) {
const client = this.redisService.getClient(); const client = this.redisService.getClient();
await client.rpush(this.getKey(message.taskId), JSON.stringify(message)); await client.rpush(this.getKey(message.taskId), JSON.stringify(message));
await client.expire(this.getKey(message.taskId), 600); // ten minutes await client.expire(this.getKey(message.taskId), 600); // ten minutes
if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) { if (isNil(message.unit)) {
try { try {
await this.amqpConnection.request({ await this.amqpConnection.request({
exchange: EXCHANGE_PIPELINE_TASK_TOPIC, exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
routingKey: ROUTE_PIPELINE_TASK_DONE, routingKey: ROUTE_PIPELINE_TASK_DONE,
payload: { taskId: message.taskId, status: message.status }, payload: {
taskId: message.taskId,
status: message.status,
runOn: amqpMsg.properties.headers.sender,
},
}); });
} catch (error) { } catch (error) {
console.log(error); console.log(error);

View File

@ -36,4 +36,7 @@ export class PipelineTask extends AppBaseEntity {
@Column({ nullable: true }) @Column({ nullable: true })
endedAt?: Date; endedAt?: Date;
@Column({ nullable: true })
runOn: string;
} }

View File

@ -5,19 +5,34 @@ import { ApplicationException } from '../commons/exceptions/application.exceptio
import { PipelineUnits } from './enums/pipeline-units.enum'; import { PipelineUnits } from './enums/pipeline-units.enum';
import { TaskStatuses } from './enums/task-statuses.enum'; import { TaskStatuses } from './enums/task-statuses.enum';
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino'; import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import {
AmqpConnection,
RabbitRPC,
RabbitSubscribe,
} from '@golevelup/nestjs-rabbitmq';
import { PipelineTaskEvent } from './models/pipeline-task-event'; import { PipelineTaskEvent } from './models/pipeline-task-event';
import { last } from 'ramda'; import { last } from 'ramda';
import { Inject } from '@nestjs/common'; import { Inject } from '@nestjs/common';
import {
EXCHANGE_PIPELINE_TASK_TOPIC,
QUEUE_PIPELINE_TASK_KILL,
ROUTE_PIPELINE_TASK_KILL,
} from './pipeline-tasks.constants';
import { import {
EXCHANGE_PIPELINE_TASK_FANOUT, EXCHANGE_PIPELINE_TASK_FANOUT,
ROUTE_PIPELINE_TASK_LOG, ROUTE_PIPELINE_TASK_LOG,
} from './pipeline-tasks.constants'; } from './pipeline-tasks.constants';
import {
getInstanceName,
getSelfInstanceQueueKey,
getSelfInstanceRouteKey,
} from '../commons/utils/rabbit-mq';
type Spawn = typeof spawn; type Spawn = typeof spawn;
export class PipelineTaskRunner { export class PipelineTaskRunner {
readonly processes = new Map<string, ChildProcessWithoutNullStreams>(); readonly processes = new Map<string, ChildProcessWithoutNullStreams>();
readonly stopTaskIds = new Set<string>();
constructor( constructor(
private readonly reposService: ReposService, private readonly reposService: ReposService,
@ -40,19 +55,27 @@ export class PipelineTaskRunner {
this.logger.error({ task, err }, err.message); this.logger.error({ task, err }, err.message);
} }
} }
@RabbitSubscribe({ @RabbitRPC({
exchange: 'stop-pipeline-task', exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
routingKey: 'mac', routingKey: getSelfInstanceRouteKey(ROUTE_PIPELINE_TASK_KILL),
queue: 'mac.stop-pipeline-task', queue: getSelfInstanceQueueKey(QUEUE_PIPELINE_TASK_KILL),
queueOptions: {
autoDelete: true,
durable: true,
},
}) })
async onStopTask(task: PipelineTask) { async onStopTask(task: PipelineTask) {
this.logger.info({ task }, 'on stop task [%s].', task.id); this.logger.info({ task }, 'on stop task [%s].', task.id);
this.stopTaskIds.add(task.id);
const process = this.processes.get(task.id); const process = this.processes.get(task.id);
if (process) { if (process) {
this.logger.info({ task }, 'send signal SIGINT to child process.'); this.logger.info({ task }, 'send signal SIGINT to child process.');
process.kill('SIGINT'); process.kill('SIGINT');
setTimeout(() => { setTimeout(() => {
setTimeout(() => {
this.stopTaskIds.delete(task.id);
}, 10_000);
if (process === this.processes.get(task.id)) { if (process === this.processes.get(task.id)) {
this.logger.info({ task }, 'send signal SIGKILL to child process.'); this.logger.info({ task }, 'send signal SIGKILL to child process.');
process.kill('SIGKILL'); process.kill('SIGKILL');
@ -68,6 +91,7 @@ export class PipelineTaskRunner {
} else { } else {
this.logger.info({ task }, 'child process is not running.'); this.logger.info({ task }, 'child process is not running.');
} }
return true;
} }
async doTask(task: PipelineTask) { async doTask(task: PipelineTask) {
@ -137,6 +161,9 @@ export class PipelineTaskRunner {
try { try {
for (const script of scripts) { for (const script of scripts) {
this.logger.debug('begin runScript %s', script); this.logger.debug('begin runScript %s', script);
if (this.stopTaskIds.has(task.id)) {
throw new ApplicationException('Task is be KILLED');
}
await this.runScript(script, workspaceRoot, task, unit); await this.runScript(script, workspaceRoot, task, unit);
this.logger.debug('end runScript %s', script); this.logger.debug('end runScript %s', script);
} }
@ -207,7 +234,11 @@ export class PipelineTaskRunner {
status, status,
}; };
this.amqpConnection this.amqpConnection
.publish(EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, event) .publish(EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, event, {
headers: {
sender: getInstanceName(),
},
})
.catch((error) => { .catch((error) => {
this.logger.error( this.logger.error(
{ error, event }, { error, event },
@ -260,6 +291,9 @@ export class PipelineTaskRunner {
if (code === 0) { if (code === 0) {
return resolve(); return resolve();
} }
if (this.stopTaskIds.has(task.id)) {
throw reject(new ApplicationException('Task is be KILLED'));
}
return reject(new ApplicationException('exec script failed')); return reject(new ApplicationException('exec script failed'));
}); });
}); });

View File

@ -5,3 +5,5 @@ export const QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT = 'pipeline-task-log';
export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log'; export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log';
export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done'; export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done';
export const QUEUE_PIPELINE_TASK_DONE = 'pipeline-task-done'; export const QUEUE_PIPELINE_TASK_DONE = 'pipeline-task-done';
export const ROUTE_PIPELINE_TASK_KILL = 'pipeline-task-kill';
export const QUEUE_PIPELINE_TASK_KILL = 'pipeline-task-kill';

View File

@ -55,5 +55,7 @@ export class PipelineTasksResolver {
@Mutation(() => Boolean) @Mutation(() => Boolean)
async stopPipelineTask(@Args('id') id: string) { async stopPipelineTask(@Args('id') id: string) {
const task = await this.service.findTaskById(id); const task = await this.service.findTaskById(id);
await this.service.stopTask(task);
return true;
} }
} }

View File

@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common'; import { BadRequestException, Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm'; import { InjectRepository } from '@nestjs/typeorm';
import { PipelineTask } from './pipeline-task.entity'; import { PipelineTask } from './pipeline-task.entity';
import { Repository } from 'typeorm'; import { Repository } from 'typeorm';
@ -7,7 +7,6 @@ import { Pipeline } from '../pipelines/pipeline.entity';
import debug from 'debug'; import debug from 'debug';
import { AmqpConnection, RabbitRPC } from '@golevelup/nestjs-rabbitmq'; import { AmqpConnection, RabbitRPC } from '@golevelup/nestjs-rabbitmq';
import { import {
EXCHANGE_PIPELINE_TASK_FANOUT,
EXCHANGE_PIPELINE_TASK_TOPIC, EXCHANGE_PIPELINE_TASK_TOPIC,
QUEUE_PIPELINE_TASK_DONE, QUEUE_PIPELINE_TASK_DONE,
ROUTE_PIPELINE_TASK_DONE, ROUTE_PIPELINE_TASK_DONE,
@ -17,6 +16,8 @@ import { find, isNil, propEq } from 'ramda';
import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; import { PipelineTaskLogs } from './models/pipeline-task-logs.model';
import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum'; import { TaskStatuses, terminalTaskStatuses } from './enums/task-statuses.enum';
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino'; import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
import { getAppInstanceRouteKey } from '../commons/utils/rabbit-mq';
import { ROUTE_PIPELINE_TASK_KILL } from './pipeline-tasks.constants';
const log = debug('fennec:pipeline-tasks:service'); const log = debug('fennec:pipeline-tasks:service');
@ -84,7 +85,7 @@ export class PipelineTasksService {
durable: true, durable: true,
}, },
}) })
async updateByEvent({ taskId }: { taskId: string }) { async updateByEvent({ taskId, runOn }: { taskId: string; runOn: string }) {
try { try {
const [events, task] = await Promise.all([ const [events, task] = await Promise.all([
this.eventFlushService.read(taskId), this.eventFlushService.read(taskId),
@ -127,9 +128,10 @@ export class PipelineTasksService {
l.status = event.status; l.status = event.status;
} }
} }
task.runOn = runOn;
await this.repository.update({ id: taskId }, task); await this.repository.update({ id: taskId }, task);
return task;
this.logger.info('[updateByEvent] success. taskId: %s', taskId); this.logger.info('[updateByEvent] success. taskId: %s', taskId);
return task;
} catch (error) { } catch (error) {
this.logger.error( this.logger.error(
{ error }, { error },
@ -138,4 +140,17 @@ export class PipelineTasksService {
); );
} }
} }
async stopTask(task: PipelineTask) {
if (isNil(task.runOn)) {
throw new BadRequestException(
"the task have not running instance on database. field 'runOn' is nil",
);
}
await this.amqpConnection.request({
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
routingKey: getAppInstanceRouteKey(ROUTE_PIPELINE_TASK_KILL, task.runOn),
payload: task,
});
}
} }