Compare commits
2 Commits
a82f663354
...
24a2f80e46
Author | SHA1 | Date | |
---|---|---|---|
|
24a2f80e46 | ||
|
0e0781c4c4 |
@ -1,8 +1,8 @@
|
|||||||
import { DEFAULT_PUB_SUB_NAME } from '../pub-sub.constants';
|
import { DEFAULT_PUB_SUB_NAME } from '../pub-sub.constants';
|
||||||
|
|
||||||
export function getPubSubToken(name) {
|
export function getPubSubToken(name = DEFAULT_PUB_SUB_NAME) {
|
||||||
return `app:pub-usb:${name || DEFAULT_PUB_SUB_NAME}`;
|
return `app:pub-usb:${name || DEFAULT_PUB_SUB_NAME}`;
|
||||||
}
|
}
|
||||||
export function getPubSubConfigToken(name) {
|
export function getPubSubConfigToken(name = DEFAULT_PUB_SUB_NAME) {
|
||||||
return `app:pub-usb:config:${name || DEFAULT_PUB_SUB_NAME}`;
|
return `app:pub-usb:config:${name || DEFAULT_PUB_SUB_NAME}`;
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
import { TaskStatuses } from '../enums/task-statuses.enum';
|
import { TaskStatuses } from '../enums/task-statuses.enum';
|
||||||
import { PipelineUnits } from '../enums/pipeline-units.enum';
|
import { PipelineUnits } from '../enums/pipeline-units.enum';
|
||||||
import { Field, ObjectType } from '@nestjs/graphql';
|
import { Field, ObjectType } from '@nestjs/graphql';
|
||||||
|
import { Type } from 'class-transformer';
|
||||||
|
|
||||||
@ObjectType()
|
@ObjectType()
|
||||||
export class PipelineTaskLogs {
|
export class PipelineTaskLogs {
|
||||||
@ -8,7 +9,9 @@ export class PipelineTaskLogs {
|
|||||||
unit: PipelineUnits;
|
unit: PipelineUnits;
|
||||||
@Field(() => TaskStatuses)
|
@Field(() => TaskStatuses)
|
||||||
status: TaskStatuses;
|
status: TaskStatuses;
|
||||||
|
@Type(() => Date)
|
||||||
startedAt?: Date;
|
startedAt?: Date;
|
||||||
|
@Type(() => Date)
|
||||||
endedAt?: Date;
|
endedAt?: Date;
|
||||||
logs = '';
|
logs = '';
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
import { Test, TestingModule } from '@nestjs/testing';
|
import { Test, TestingModule } from '@nestjs/testing';
|
||||||
import { PipelineTaskLogsService } from './pipeline-task-logs.service';
|
import { PipelineTaskLogsService } from './pipeline-task-logs.service';
|
||||||
import { RedisService } from 'nestjs-redis';
|
import { RedisService } from 'nestjs-redis';
|
||||||
|
import { getPubSubToken } from '../commons/pub-sub/utils/token';
|
||||||
|
|
||||||
describe('PipelineTaskLogsService', () => {
|
describe('PipelineTaskLogsService', () => {
|
||||||
let service: PipelineTaskLogsService;
|
let service: PipelineTaskLogsService;
|
||||||
@ -13,6 +14,10 @@ describe('PipelineTaskLogsService', () => {
|
|||||||
provide: RedisService,
|
provide: RedisService,
|
||||||
useValue: {},
|
useValue: {},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
provide: getPubSubToken(),
|
||||||
|
useValue: {},
|
||||||
|
},
|
||||||
],
|
],
|
||||||
}).compile();
|
}).compile();
|
||||||
|
|
||||||
|
@ -174,7 +174,7 @@ describe('PipelineTaskConsumer', () => {
|
|||||||
|
|
||||||
await consumer.doTask(job);
|
await consumer.doTask(job);
|
||||||
|
|
||||||
expect(updateTask).toHaveBeenCalledTimes(2);
|
expect(updateTask).toHaveBeenCalledTimes(4);
|
||||||
expect(updateTask.mock.calls[0][0].startedAt).toBeDefined();
|
expect(updateTask.mock.calls[0][0].startedAt).toBeDefined();
|
||||||
expect(updateTask.mock.calls[1][0].endedAt).toBeDefined();
|
expect(updateTask.mock.calls[1][0].endedAt).toBeDefined();
|
||||||
expect(updateTask.mock.calls[1][0].status).toEqual(TaskStatuses.success);
|
expect(updateTask.mock.calls[1][0].status).toEqual(TaskStatuses.success);
|
||||||
@ -192,7 +192,7 @@ describe('PipelineTaskConsumer', () => {
|
|||||||
|
|
||||||
await consumer.doTask(job);
|
await consumer.doTask(job);
|
||||||
|
|
||||||
expect(updateTask).toHaveBeenCalledTimes(2);
|
expect(updateTask).toHaveBeenCalledTimes(4);
|
||||||
expect(updateTask.mock.calls[0][0].startedAt).toBeDefined();
|
expect(updateTask.mock.calls[0][0].startedAt).toBeDefined();
|
||||||
expect(updateTask.mock.calls[1][0].endedAt).toBeDefined();
|
expect(updateTask.mock.calls[1][0].endedAt).toBeDefined();
|
||||||
expect(updateTask.mock.calls[1][0].status).toEqual(TaskStatuses.failed);
|
expect(updateTask.mock.calls[1][0].status).toEqual(TaskStatuses.failed);
|
||||||
@ -211,7 +211,7 @@ describe('PipelineTaskConsumer', () => {
|
|||||||
await consumer.doTask(job);
|
await consumer.doTask(job);
|
||||||
|
|
||||||
expect(runScript).toHaveBeenCalledTimes(1);
|
expect(runScript).toHaveBeenCalledTimes(1);
|
||||||
expect(updateTask).toHaveBeenCalledTimes(2);
|
expect(updateTask).toHaveBeenCalledTimes(4);
|
||||||
const taskDto: PipelineTask = updateTask.mock.calls[0][0];
|
const taskDto: PipelineTask = updateTask.mock.calls[0][0];
|
||||||
expect(taskDto.logs).toHaveLength(2);
|
expect(taskDto.logs).toHaveLength(2);
|
||||||
expect(taskDto.logs[0].status).toEqual(TaskStatuses.success);
|
expect(taskDto.logs[0].status).toEqual(TaskStatuses.success);
|
||||||
@ -232,7 +232,7 @@ describe('PipelineTaskConsumer', () => {
|
|||||||
|
|
||||||
await consumer.doTask(job);
|
await consumer.doTask(job);
|
||||||
|
|
||||||
expect(updateTask).toHaveBeenCalledTimes(2);
|
expect(updateTask).toHaveBeenCalledTimes(4);
|
||||||
const taskDto: PipelineTask = updateTask.mock.calls[0][0];
|
const taskDto: PipelineTask = updateTask.mock.calls[0][0];
|
||||||
expect(taskDto.logs).toHaveLength(2);
|
expect(taskDto.logs).toHaveLength(2);
|
||||||
expect(taskDto.logs[0].status).toEqual(TaskStatuses.success);
|
expect(taskDto.logs[0].status).toEqual(TaskStatuses.success);
|
||||||
|
@ -86,6 +86,7 @@ export class PipelineTaskConsumer {
|
|||||||
taskLogs.find((tl) => tl.unit === unit.type)?.logs ?? '',
|
taskLogs.find((tl) => tl.unit === unit.type)?.logs ?? '',
|
||||||
);
|
);
|
||||||
task.logs.push(unitLog);
|
task.logs.push(unitLog);
|
||||||
|
task = await this.service.updateTask(task);
|
||||||
await job.update(task);
|
await job.update(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,11 +1,16 @@
|
|||||||
import { AppBaseEntity } from './../commons/entities/app-base-entity';
|
import { AppBaseEntity } from './../commons/entities/app-base-entity';
|
||||||
import { Field, ObjectType } from '@nestjs/graphql';
|
import { ObjectType } from '@nestjs/graphql';
|
||||||
import { Column, Entity, ManyToOne } from 'typeorm';
|
import { Column, Entity, ManyToOne, ValueTransformer } from 'typeorm';
|
||||||
import { Pipeline } from '../pipelines/pipeline.entity';
|
import { Pipeline } from '../pipelines/pipeline.entity';
|
||||||
import { PipelineTaskLogs } from './models/pipeline-task-logs.model';
|
import { PipelineTaskLogs } from './models/pipeline-task-logs.model';
|
||||||
import { TaskStatuses } from './enums/task-statuses.enum';
|
import { TaskStatuses } from './enums/task-statuses.enum';
|
||||||
import { PipelineUnits } from './enums/pipeline-units.enum';
|
import { PipelineUnits } from './enums/pipeline-units.enum';
|
||||||
|
import { plainToClass } from 'class-transformer';
|
||||||
|
|
||||||
|
const logsTransformer: ValueTransformer = {
|
||||||
|
from: (value) => plainToClass(PipelineTaskLogs, value),
|
||||||
|
to: (value) => value,
|
||||||
|
};
|
||||||
@ObjectType()
|
@ObjectType()
|
||||||
@Entity()
|
@Entity()
|
||||||
export class PipelineTask extends AppBaseEntity {
|
export class PipelineTask extends AppBaseEntity {
|
||||||
@ -20,7 +25,7 @@ export class PipelineTask extends AppBaseEntity {
|
|||||||
@Column({ type: 'enum', enum: PipelineUnits, array: true })
|
@Column({ type: 'enum', enum: PipelineUnits, array: true })
|
||||||
units: PipelineUnits[];
|
units: PipelineUnits[];
|
||||||
|
|
||||||
@Column({ type: 'jsonb', default: '[]' })
|
@Column({ type: 'jsonb', default: '[]', transformer: logsTransformer })
|
||||||
logs: PipelineTaskLogs[];
|
logs: PipelineTaskLogs[];
|
||||||
|
|
||||||
@Column({ type: 'enum', enum: TaskStatuses, default: TaskStatuses.pending })
|
@Column({ type: 'enum', enum: TaskStatuses, default: TaskStatuses.pending })
|
||||||
|
@ -10,6 +10,7 @@ import { EntityNotFoundError } from 'typeorm/error/EntityNotFoundError';
|
|||||||
import { Repository } from 'typeorm';
|
import { Repository } from 'typeorm';
|
||||||
import { Queue } from 'bull';
|
import { Queue } from 'bull';
|
||||||
import { LockFailedException } from '../commons/exceptions/lock-failed.exception';
|
import { LockFailedException } from '../commons/exceptions/lock-failed.exception';
|
||||||
|
import { getPubSubToken } from '../commons/pub-sub/utils/token';
|
||||||
|
|
||||||
describe('PipelineTasksService', () => {
|
describe('PipelineTasksService', () => {
|
||||||
let service: PipelineTasksService;
|
let service: PipelineTasksService;
|
||||||
@ -68,6 +69,10 @@ describe('PipelineTasksService', () => {
|
|||||||
getClient: jest.fn(() => redisClient),
|
getClient: jest.fn(() => redisClient),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
provide: getPubSubToken(),
|
||||||
|
useValue: {},
|
||||||
|
},
|
||||||
],
|
],
|
||||||
}).compile();
|
}).compile();
|
||||||
|
|
||||||
|
@ -121,13 +121,14 @@ export class PipelineTasksService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async updateTask(task: PipelineTask) {
|
async updateTask(task: PipelineTask) {
|
||||||
this.pubSub.publish(`task:${task.id}`, task);
|
this.pubSub.publish(`pipeline-task:${task.id}`, task);
|
||||||
return await this.repository.save(task);
|
return await this.repository.save(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
async watchTaskUpdated(id: string) {
|
async watchTaskUpdated(id: string) {
|
||||||
log('watchTaskUpdated %s', id);
|
return observableToAsyncIterable(
|
||||||
return observableToAsyncIterable(this.pubSub.message$(`task:${id}`));
|
this.pubSub.message$(`pipeline-task:${id}`),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
getRedisTokens(pipeline: Pipeline): [string, string] {
|
getRedisTokens(pipeline: Pipeline): [string, string] {
|
||||||
|
Loading…
Reference in New Issue
Block a user