Compare commits
3 Commits
3ee41ece67
...
ead32a1204
Author | SHA1 | Date | |
---|---|---|---|
|
ead32a1204 | ||
|
20612d4301 | ||
|
7091f9df6a |
@ -11,3 +11,5 @@ registerEnumType(TaskStatuses, {
|
|||||||
name: 'TaskStatuses',
|
name: 'TaskStatuses',
|
||||||
description: '任务状态',
|
description: '任务状态',
|
||||||
});
|
});
|
||||||
|
|
||||||
|
export const terminalTaskStatuses = [TaskStatuses.success, TaskStatuses.failed];
|
||||||
|
@ -1,13 +1,25 @@
|
|||||||
|
import { Field, ObjectType } from '@nestjs/graphql';
|
||||||
import { PipelineUnits } from '../enums/pipeline-units.enum';
|
import { PipelineUnits } from '../enums/pipeline-units.enum';
|
||||||
import { TaskStatuses } from '../enums/task-statuses.enum';
|
import { TaskStatuses } from '../enums/task-statuses.enum';
|
||||||
|
import { Type } from 'class-transformer';
|
||||||
|
|
||||||
|
@ObjectType()
|
||||||
export class PipelineTaskEvent {
|
export class PipelineTaskEvent {
|
||||||
|
@Field()
|
||||||
taskId: string;
|
taskId: string;
|
||||||
|
@Field()
|
||||||
pipelineId: string;
|
pipelineId: string;
|
||||||
|
@Field()
|
||||||
projectId: string;
|
projectId: string;
|
||||||
|
@Field(() => PipelineUnits, { nullable: true })
|
||||||
unit: PipelineUnits | null;
|
unit: PipelineUnits | null;
|
||||||
|
@Field()
|
||||||
|
@Type(() => Date)
|
||||||
emittedAt: Date;
|
emittedAt: Date;
|
||||||
|
@Field()
|
||||||
message: string;
|
message: string;
|
||||||
|
@Field()
|
||||||
messageType: 'stdout' | 'stderr' | 'stdin';
|
messageType: 'stdout' | 'stderr' | 'stdin';
|
||||||
|
@Field(() => TaskStatuses)
|
||||||
status: TaskStatuses;
|
status: TaskStatuses;
|
||||||
}
|
}
|
||||||
|
80
src/pipeline-tasks/pipeline-task-flush.service.spec.ts
Normal file
80
src/pipeline-tasks/pipeline-task-flush.service.spec.ts
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
|
||||||
|
import { Test, TestingModule } from '@nestjs/testing';
|
||||||
|
import { RedisService } from 'nestjs-redis';
|
||||||
|
import { PipelineTaskFlushService } from './pipeline-task-flush.service';
|
||||||
|
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
||||||
|
import { TaskStatuses } from './enums/task-statuses.enum';
|
||||||
|
import {
|
||||||
|
EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
|
ROUTE_PIPELINE_TASK_DONE,
|
||||||
|
} from './pipeline-tasks.constants';
|
||||||
|
|
||||||
|
describe('PipelineTaskFlushService', () => {
|
||||||
|
let service: PipelineTaskFlushService;
|
||||||
|
let redisService: RedisService;
|
||||||
|
let amqpConnection: AmqpConnection;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
const redisClient = {
|
||||||
|
rpush: jest.fn(() => Promise.resolve()),
|
||||||
|
lrange: jest.fn(() => Promise.resolve()),
|
||||||
|
};
|
||||||
|
const module: TestingModule = await Test.createTestingModule({
|
||||||
|
providers: [
|
||||||
|
PipelineTaskFlushService,
|
||||||
|
{
|
||||||
|
provide: RedisService,
|
||||||
|
useValue: {
|
||||||
|
getClient() {
|
||||||
|
return redisClient;
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: AmqpConnection,
|
||||||
|
useValue: {
|
||||||
|
request: jest.fn(() => Promise.resolve()),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}).compile();
|
||||||
|
|
||||||
|
service = module.get<PipelineTaskFlushService>(PipelineTaskFlushService);
|
||||||
|
redisService = module.get<RedisService>(RedisService);
|
||||||
|
amqpConnection = module.get<AmqpConnection>(AmqpConnection);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should be defined', () => {
|
||||||
|
expect(service).toBeDefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('write', () => {
|
||||||
|
it('normal', async () => {
|
||||||
|
const testEvent = new PipelineTaskEvent();
|
||||||
|
testEvent.taskId = 'test';
|
||||||
|
testEvent.status = TaskStatuses.working;
|
||||||
|
const rpush = jest.spyOn(redisService.getClient(), 'rpush');
|
||||||
|
const request = jest.spyOn(amqpConnection, 'request');
|
||||||
|
await service.write(testEvent);
|
||||||
|
expect(rpush).toBeCalledTimes(1);
|
||||||
|
expect(rpush.mock.calls[0][0]).toEqual('p-task:log:test');
|
||||||
|
expect(rpush.mock.calls[0][1]).toEqual(JSON.stringify(testEvent));
|
||||||
|
expect(request).toBeCalledTimes(0);
|
||||||
|
});
|
||||||
|
it('event for which task done', async () => {
|
||||||
|
const testEvent = new PipelineTaskEvent();
|
||||||
|
testEvent.taskId = 'test';
|
||||||
|
testEvent.status = TaskStatuses.success;
|
||||||
|
const rpush = jest.spyOn(redisService.getClient(), 'rpush');
|
||||||
|
const request = jest.spyOn(amqpConnection, 'request');
|
||||||
|
await service.write(testEvent);
|
||||||
|
expect(rpush).toBeCalledTimes(1);
|
||||||
|
expect(request).toBeCalledTimes(1);
|
||||||
|
expect(request.mock.calls[0][0]).toMatchObject({
|
||||||
|
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
|
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
||||||
|
payload: { taskId: 'test', status: TaskStatuses.success },
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
57
src/pipeline-tasks/pipeline-task-flush.service.ts
Normal file
57
src/pipeline-tasks/pipeline-task-flush.service.ts
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
|
||||||
|
import { Injectable } from '@nestjs/common';
|
||||||
|
import { deserialize } from 'class-transformer';
|
||||||
|
import { RedisService } from 'nestjs-redis';
|
||||||
|
import { isNil } from 'ramda';
|
||||||
|
import { getSelfInstanceQueueKey } from '../commons/utils/rabbit-mq';
|
||||||
|
import { terminalTaskStatuses } from './enums/task-statuses.enum';
|
||||||
|
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
||||||
|
import {
|
||||||
|
EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
|
ROUTE_PIPELINE_TASK_DONE,
|
||||||
|
} from './pipeline-tasks.constants';
|
||||||
|
import {
|
||||||
|
EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||||
|
ROUTE_PIPELINE_TASK_LOG,
|
||||||
|
QUEUE_WRITE_PIPELINE_TASK_LOG,
|
||||||
|
} from './pipeline-tasks.constants';
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class PipelineTaskFlushService {
|
||||||
|
constructor(
|
||||||
|
private readonly redisService: RedisService,
|
||||||
|
private readonly amqpConnection: AmqpConnection,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
@RabbitSubscribe({
|
||||||
|
exchange: EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||||
|
routingKey: ROUTE_PIPELINE_TASK_LOG,
|
||||||
|
queue: getSelfInstanceQueueKey(QUEUE_WRITE_PIPELINE_TASK_LOG),
|
||||||
|
queueOptions: {
|
||||||
|
autoDelete: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
async write(message: PipelineTaskEvent) {
|
||||||
|
await this.redisService
|
||||||
|
.getClient()
|
||||||
|
.rpush(this.getKey(message.taskId), JSON.stringify(message));
|
||||||
|
if (isNil(message.unit) && terminalTaskStatuses.includes(message.status)) {
|
||||||
|
this.amqpConnection.request({
|
||||||
|
exchange: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
|
routingKey: ROUTE_PIPELINE_TASK_DONE,
|
||||||
|
payload: { taskId: message.taskId, status: message.status },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async read(taskId: string) {
|
||||||
|
const raw = await this.redisService
|
||||||
|
.getClient()
|
||||||
|
.lrange(this.getKey(taskId), 0, -1);
|
||||||
|
return raw.map((it) => deserialize(PipelineTaskEvent, it));
|
||||||
|
}
|
||||||
|
|
||||||
|
private getKey(taskId: string) {
|
||||||
|
return `p-task:log:${taskId}`;
|
||||||
|
}
|
||||||
|
}
|
74
src/pipeline-tasks/pipeline-task.logger.spec.ts
Normal file
74
src/pipeline-tasks/pipeline-task.logger.spec.ts
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
import { Test, TestingModule } from '@nestjs/testing';
|
||||||
|
import { PipelineTaskLogger } from './pipeline-task.logger';
|
||||||
|
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
||||||
|
import { take, timeout } from 'rxjs/operators';
|
||||||
|
|
||||||
|
describe('PipelineTaskRunner', () => {
|
||||||
|
let logger: PipelineTaskLogger;
|
||||||
|
let module: TestingModule;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
module = await Test.createTestingModule({
|
||||||
|
providers: [PipelineTaskLogger],
|
||||||
|
}).compile();
|
||||||
|
|
||||||
|
logger = module.get(PipelineTaskLogger);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should be defined', () => {
|
||||||
|
expect(logger).toBeDefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getMessage$', () => {
|
||||||
|
it('normal', async () => {
|
||||||
|
const event = new PipelineTaskEvent();
|
||||||
|
event.taskId = 'test';
|
||||||
|
const emittedAt = new Date();
|
||||||
|
event.emittedAt = emittedAt.toISOString() as any;
|
||||||
|
const message$ = logger.getMessage$('test');
|
||||||
|
|
||||||
|
let receiveEvent;
|
||||||
|
message$.pipe(take(1)).subscribe((value) => (receiveEvent = value));
|
||||||
|
await logger.handleEvent(event);
|
||||||
|
expect(receiveEvent).toMatchObject({
|
||||||
|
...event,
|
||||||
|
emittedAt,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
it('no match', async () => {
|
||||||
|
const event = new PipelineTaskEvent();
|
||||||
|
event.taskId = 'test';
|
||||||
|
const message$ = logger.getMessage$('other');
|
||||||
|
setTimeout(() => {
|
||||||
|
logger.handleEvent(event);
|
||||||
|
});
|
||||||
|
expect(message$.pipe(take(1), timeout(100)).toPromise()).rejects.toMatch(
|
||||||
|
'timeout',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
it('multiple subscribers', async () => {
|
||||||
|
const event = new PipelineTaskEvent();
|
||||||
|
event.taskId = 'test';
|
||||||
|
const message$ = logger.getMessage$('test');
|
||||||
|
const message2$ = logger.getMessage$('test');
|
||||||
|
setTimeout(() => {
|
||||||
|
logger.handleEvent(event);
|
||||||
|
});
|
||||||
|
expect(message$.pipe(take(1), timeout(100)).toPromise()).resolves.toEqual(
|
||||||
|
event,
|
||||||
|
);
|
||||||
|
expect(
|
||||||
|
message2$.pipe(take(1), timeout(100)).toPromise(),
|
||||||
|
).resolves.toEqual(event);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('onModuleDestroy', () => {
|
||||||
|
it('complete observable when destroying module', async () => {
|
||||||
|
logger.onModuleDestroy();
|
||||||
|
await expect(
|
||||||
|
(logger as any).message$.toPromise(),
|
||||||
|
).resolves.toBeUndefined();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
37
src/pipeline-tasks/pipeline-task.logger.ts
Normal file
37
src/pipeline-tasks/pipeline-task.logger.ts
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
|
||||||
|
import { Injectable, OnModuleDestroy } from '@nestjs/common';
|
||||||
|
import { plainToClass } from 'class-transformer';
|
||||||
|
import { Observable, Subject } from 'rxjs';
|
||||||
|
import { filter } from 'rxjs/operators';
|
||||||
|
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
||||||
|
import {
|
||||||
|
EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||||
|
QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT,
|
||||||
|
ROUTE_PIPELINE_TASK_LOG,
|
||||||
|
} from './pipeline-tasks.constants';
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class PipelineTaskLogger implements OnModuleDestroy {
|
||||||
|
private readonly messageSubject = new Subject<PipelineTaskEvent>();
|
||||||
|
private readonly message$: Observable<PipelineTaskEvent> = this.messageSubject.pipe();
|
||||||
|
|
||||||
|
@RabbitSubscribe({
|
||||||
|
exchange: EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||||
|
routingKey: ROUTE_PIPELINE_TASK_LOG,
|
||||||
|
queue: QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT,
|
||||||
|
queueOptions: {
|
||||||
|
autoDelete: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
async handleEvent(message: PipelineTaskEvent) {
|
||||||
|
this.messageSubject.next(plainToClass(PipelineTaskEvent, message));
|
||||||
|
}
|
||||||
|
|
||||||
|
getMessage$(taskId: string) {
|
||||||
|
return this.message$.pipe(filter((event) => event.taskId === taskId));
|
||||||
|
}
|
||||||
|
|
||||||
|
onModuleDestroy() {
|
||||||
|
this.messageSubject.complete();
|
||||||
|
}
|
||||||
|
}
|
@ -8,6 +8,7 @@ import { TaskStatuses } from './enums/task-statuses.enum';
|
|||||||
import { getLoggerToken, PinoLogger } from 'nestjs-pino';
|
import { getLoggerToken, PinoLogger } from 'nestjs-pino';
|
||||||
import { PipelineTaskRunner } from './pipeline-task.runner';
|
import { PipelineTaskRunner } from './pipeline-task.runner';
|
||||||
import { WorkUnitMetadata } from './models/work-unit-metadata.model';
|
import { WorkUnitMetadata } from './models/work-unit-metadata.model';
|
||||||
|
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
|
||||||
describe('PipelineTaskRunner', () => {
|
describe('PipelineTaskRunner', () => {
|
||||||
let runner: PipelineTaskRunner;
|
let runner: PipelineTaskRunner;
|
||||||
let reposService: ReposService;
|
let reposService: ReposService;
|
||||||
@ -31,6 +32,10 @@ describe('PipelineTaskRunner', () => {
|
|||||||
useValue: () => undefined,
|
useValue: () => undefined,
|
||||||
},
|
},
|
||||||
PipelineTaskRunner,
|
PipelineTaskRunner,
|
||||||
|
{
|
||||||
|
provide: AmqpConnection,
|
||||||
|
useValue: {},
|
||||||
|
},
|
||||||
],
|
],
|
||||||
}).compile();
|
}).compile();
|
||||||
|
|
||||||
|
@ -5,10 +5,14 @@ import { ApplicationException } from '../commons/exceptions/application.exceptio
|
|||||||
import { PipelineUnits } from './enums/pipeline-units.enum';
|
import { PipelineUnits } from './enums/pipeline-units.enum';
|
||||||
import { TaskStatuses } from './enums/task-statuses.enum';
|
import { TaskStatuses } from './enums/task-statuses.enum';
|
||||||
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
|
import { InjectPinoLogger, PinoLogger } from 'nestjs-pino';
|
||||||
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
|
import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
|
||||||
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
||||||
import { last } from 'ramda';
|
import { last } from 'ramda';
|
||||||
import { Inject } from '@nestjs/common';
|
import { Inject } from '@nestjs/common';
|
||||||
|
import {
|
||||||
|
EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||||
|
ROUTE_PIPELINE_TASK_LOG,
|
||||||
|
} from './pipeline-tasks.constants';
|
||||||
|
|
||||||
type Spawn = typeof spawn;
|
type Spawn = typeof spawn;
|
||||||
|
|
||||||
@ -21,6 +25,7 @@ export class PipelineTaskRunner {
|
|||||||
private readonly logger: PinoLogger,
|
private readonly logger: PinoLogger,
|
||||||
@Inject('spawn')
|
@Inject('spawn')
|
||||||
private readonly spawn: Spawn,
|
private readonly spawn: Spawn,
|
||||||
|
private readonly amqpConnection: AmqpConnection,
|
||||||
) {}
|
) {}
|
||||||
@RabbitSubscribe({
|
@RabbitSubscribe({
|
||||||
exchange: 'new-pipeline-task',
|
exchange: 'new-pipeline-task',
|
||||||
@ -201,6 +206,15 @@ export class PipelineTaskRunner {
|
|||||||
messageType,
|
messageType,
|
||||||
status,
|
status,
|
||||||
};
|
};
|
||||||
|
this.amqpConnection
|
||||||
|
.publish(EXCHANGE_PIPELINE_TASK_FANOUT, ROUTE_PIPELINE_TASK_LOG, event)
|
||||||
|
.catch((error) => {
|
||||||
|
this.logger.error(
|
||||||
|
{ error, event },
|
||||||
|
'send event message to queue failed. %s',
|
||||||
|
error.message,
|
||||||
|
);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async runScript(
|
async runScript(
|
||||||
|
6
src/pipeline-tasks/pipeline-tasks.constants.ts
Normal file
6
src/pipeline-tasks/pipeline-tasks.constants.ts
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
export const EXCHANGE_PIPELINE_TASK_TOPIC = 'pipeline-task.topic';
|
||||||
|
export const EXCHANGE_PIPELINE_TASK_FANOUT = 'pipeline-task.fanout';
|
||||||
|
export const ROUTE_PIPELINE_TASK_LOG = 'pipeline-task-log';
|
||||||
|
export const QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT = 'pipeline-task-log';
|
||||||
|
export const QUEUE_WRITE_PIPELINE_TASK_LOG = 'write-pipeline-task-log';
|
||||||
|
export const ROUTE_PIPELINE_TASK_DONE = 'pipeline-task-done';
|
@ -10,6 +10,12 @@ import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
|
|||||||
import { ConfigModule, ConfigService } from '@nestjs/config';
|
import { ConfigModule, ConfigService } from '@nestjs/config';
|
||||||
import { PipelineTaskRunner } from './pipeline-task.runner';
|
import { PipelineTaskRunner } from './pipeline-task.runner';
|
||||||
import { spawn } from 'child_process';
|
import { spawn } from 'child_process';
|
||||||
|
import {
|
||||||
|
EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||||
|
EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
|
} from './pipeline-tasks.constants';
|
||||||
|
import { PipelineTaskLogger } from './pipeline-task.logger';
|
||||||
|
import { PipelineTaskFlushService } from './pipeline-task-flush.service';
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [
|
imports: [
|
||||||
@ -27,6 +33,7 @@ import { spawn } from 'child_process';
|
|||||||
type: 'fanout',
|
type: 'fanout',
|
||||||
options: {
|
options: {
|
||||||
durable: true,
|
durable: true,
|
||||||
|
autoDelete: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -34,6 +41,7 @@ import { spawn } from 'child_process';
|
|||||||
type: 'fanout',
|
type: 'fanout',
|
||||||
options: {
|
options: {
|
||||||
durable: true,
|
durable: true,
|
||||||
|
autoDelete: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -41,6 +49,23 @@ import { spawn } from 'child_process';
|
|||||||
type: 'fanout',
|
type: 'fanout',
|
||||||
options: {
|
options: {
|
||||||
durable: false,
|
durable: false,
|
||||||
|
autoDelete: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: EXCHANGE_PIPELINE_TASK_FANOUT,
|
||||||
|
type: 'fanout',
|
||||||
|
options: {
|
||||||
|
durable: false,
|
||||||
|
autoDelete: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: EXCHANGE_PIPELINE_TASK_TOPIC,
|
||||||
|
type: 'topic',
|
||||||
|
options: {
|
||||||
|
durable: false,
|
||||||
|
autoDelete: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
@ -52,10 +77,12 @@ import { spawn } from 'child_process';
|
|||||||
PipelineTasksService,
|
PipelineTasksService,
|
||||||
PipelineTasksResolver,
|
PipelineTasksResolver,
|
||||||
PipelineTaskRunner,
|
PipelineTaskRunner,
|
||||||
|
PipelineTaskLogger,
|
||||||
{
|
{
|
||||||
provide: 'spawn',
|
provide: 'spawn',
|
||||||
useValue: spawn,
|
useValue: spawn,
|
||||||
},
|
},
|
||||||
|
PipelineTaskFlushService,
|
||||||
],
|
],
|
||||||
exports: [PipelineTasksService],
|
exports: [PipelineTasksService],
|
||||||
})
|
})
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import { Test, TestingModule } from '@nestjs/testing';
|
import { Test, TestingModule } from '@nestjs/testing';
|
||||||
|
import { PipelineTaskLogger } from './pipeline-task.logger';
|
||||||
import { PipelineTasksResolver } from './pipeline-tasks.resolver';
|
import { PipelineTasksResolver } from './pipeline-tasks.resolver';
|
||||||
import { PipelineTasksService } from './pipeline-tasks.service';
|
import { PipelineTasksService } from './pipeline-tasks.service';
|
||||||
|
|
||||||
@ -13,6 +14,10 @@ describe('PipelineTasksResolver', () => {
|
|||||||
provide: PipelineTasksService,
|
provide: PipelineTasksService,
|
||||||
useValue: {},
|
useValue: {},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
provide: PipelineTaskLogger,
|
||||||
|
useValue: {},
|
||||||
|
},
|
||||||
],
|
],
|
||||||
}).compile();
|
}).compile();
|
||||||
|
|
||||||
|
@ -2,29 +2,35 @@ import { Resolver, Args, Mutation, Subscription, Query } from '@nestjs/graphql';
|
|||||||
import { PipelineTask } from './pipeline-task.entity';
|
import { PipelineTask } from './pipeline-task.entity';
|
||||||
import { PipelineTasksService } from './pipeline-tasks.service';
|
import { PipelineTasksService } from './pipeline-tasks.service';
|
||||||
import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input';
|
import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input';
|
||||||
import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module';
|
|
||||||
import { PipelineTaskLogArgs } from './dtos/pipeline-task-log.args';
|
import { PipelineTaskLogArgs } from './dtos/pipeline-task-log.args';
|
||||||
import { plainToClass } from 'class-transformer';
|
import { plainToClass } from 'class-transformer';
|
||||||
|
import { PipelineTaskLogger } from './pipeline-task.logger';
|
||||||
|
import { observableToAsyncIterable } from '@graphql-tools/utils';
|
||||||
|
import { PipelineTaskEvent } from './models/pipeline-task-event';
|
||||||
|
|
||||||
@Resolver()
|
@Resolver()
|
||||||
export class PipelineTasksResolver {
|
export class PipelineTasksResolver {
|
||||||
constructor(private readonly service: PipelineTasksService) {}
|
constructor(
|
||||||
|
private readonly service: PipelineTasksService,
|
||||||
|
private readonly taskLogger: PipelineTaskLogger,
|
||||||
|
) {}
|
||||||
|
|
||||||
@Mutation(() => PipelineTask)
|
@Mutation(() => PipelineTask)
|
||||||
async createPipelineTask(@Args('task') taskDto: CreatePipelineTaskInput) {
|
async createPipelineTask(@Args('task') taskDto: CreatePipelineTaskInput) {
|
||||||
return await this.service.addTask(taskDto);
|
return await this.service.addTask(taskDto);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Subscription(() => PipelineTaskLogMessage, {
|
@Subscription(() => PipelineTaskEvent, {
|
||||||
resolve: (value) => {
|
resolve: (value) => {
|
||||||
const data = plainToClass(PipelineTaskLogMessage, value);
|
const data = plainToClass(PipelineTaskEvent, value);
|
||||||
return data;
|
return data;
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
async pipelineTaskLog(@Args() args: PipelineTaskLogArgs) {
|
async pipelineTaskEvent(@Args() args: PipelineTaskLogArgs) {
|
||||||
// const task = await this.service.findTaskById(args.taskId);
|
const task = await this.service.findTaskById(args.taskId);
|
||||||
// const asyncIterator = this.logsService.watchLogs(task);
|
return observableToAsyncIterable<PipelineTaskEvent>(
|
||||||
// return asyncIterator;
|
this.taskLogger.getMessage$(task.id),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Subscription(() => PipelineTask, {
|
@Subscription(() => PipelineTask, {
|
||||||
|
@ -51,7 +51,10 @@ export class PipelineTasksService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async listTasksByCommitHash(hash: string) {
|
async listTasksByCommitHash(hash: string) {
|
||||||
return await this.repository.find({ commit: hash });
|
return await this.repository.find({
|
||||||
|
where: { commit: hash },
|
||||||
|
order: { createdAt: 'DESC' },
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
getRedisTokens(pipeline: Pipeline): [string, string] {
|
getRedisTokens(pipeline: Pipeline): [string, string] {
|
||||||
|
Loading…
Reference in New Issue
Block a user