Compare commits
	
		
			2 Commits
		
	
	
		
			a82f663354
			...
			24a2f80e46
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 24a2f80e46 | ||
|  | 0e0781c4c4 | 
| @@ -1,8 +1,8 @@ | ||||
| import { DEFAULT_PUB_SUB_NAME } from '../pub-sub.constants'; | ||||
|  | ||||
| export function getPubSubToken(name) { | ||||
| export function getPubSubToken(name = DEFAULT_PUB_SUB_NAME) { | ||||
|   return `app:pub-usb:${name || DEFAULT_PUB_SUB_NAME}`; | ||||
| } | ||||
| export function getPubSubConfigToken(name) { | ||||
| export function getPubSubConfigToken(name = DEFAULT_PUB_SUB_NAME) { | ||||
|   return `app:pub-usb:config:${name || DEFAULT_PUB_SUB_NAME}`; | ||||
| } | ||||
|   | ||||
| @@ -1,6 +1,7 @@ | ||||
| import { TaskStatuses } from '../enums/task-statuses.enum'; | ||||
| import { PipelineUnits } from '../enums/pipeline-units.enum'; | ||||
| import { Field, ObjectType } from '@nestjs/graphql'; | ||||
| import { Type } from 'class-transformer'; | ||||
|  | ||||
| @ObjectType() | ||||
| export class PipelineTaskLogs { | ||||
| @@ -8,7 +9,9 @@ export class PipelineTaskLogs { | ||||
|   unit: PipelineUnits; | ||||
|   @Field(() => TaskStatuses) | ||||
|   status: TaskStatuses; | ||||
|   @Type(() => Date) | ||||
|   startedAt?: Date; | ||||
|   @Type(() => Date) | ||||
|   endedAt?: Date; | ||||
|   logs = ''; | ||||
| } | ||||
|   | ||||
| @@ -1,6 +1,7 @@ | ||||
| import { Test, TestingModule } from '@nestjs/testing'; | ||||
| import { PipelineTaskLogsService } from './pipeline-task-logs.service'; | ||||
| import { RedisService } from 'nestjs-redis'; | ||||
| import { getPubSubToken } from '../commons/pub-sub/utils/token'; | ||||
|  | ||||
| describe('PipelineTaskLogsService', () => { | ||||
|   let service: PipelineTaskLogsService; | ||||
| @@ -13,6 +14,10 @@ describe('PipelineTaskLogsService', () => { | ||||
|           provide: RedisService, | ||||
|           useValue: {}, | ||||
|         }, | ||||
|         { | ||||
|           provide: getPubSubToken(), | ||||
|           useValue: {}, | ||||
|         }, | ||||
|       ], | ||||
|     }).compile(); | ||||
|  | ||||
|   | ||||
| @@ -174,7 +174,7 @@ describe('PipelineTaskConsumer', () => { | ||||
|  | ||||
|       await consumer.doTask(job); | ||||
|  | ||||
|       expect(updateTask).toHaveBeenCalledTimes(2); | ||||
|       expect(updateTask).toHaveBeenCalledTimes(4); | ||||
|       expect(updateTask.mock.calls[0][0].startedAt).toBeDefined(); | ||||
|       expect(updateTask.mock.calls[1][0].endedAt).toBeDefined(); | ||||
|       expect(updateTask.mock.calls[1][0].status).toEqual(TaskStatuses.success); | ||||
| @@ -192,7 +192,7 @@ describe('PipelineTaskConsumer', () => { | ||||
|  | ||||
|       await consumer.doTask(job); | ||||
|  | ||||
|       expect(updateTask).toHaveBeenCalledTimes(2); | ||||
|       expect(updateTask).toHaveBeenCalledTimes(4); | ||||
|       expect(updateTask.mock.calls[0][0].startedAt).toBeDefined(); | ||||
|       expect(updateTask.mock.calls[1][0].endedAt).toBeDefined(); | ||||
|       expect(updateTask.mock.calls[1][0].status).toEqual(TaskStatuses.failed); | ||||
| @@ -211,7 +211,7 @@ describe('PipelineTaskConsumer', () => { | ||||
|       await consumer.doTask(job); | ||||
|  | ||||
|       expect(runScript).toHaveBeenCalledTimes(1); | ||||
|       expect(updateTask).toHaveBeenCalledTimes(2); | ||||
|       expect(updateTask).toHaveBeenCalledTimes(4); | ||||
|       const taskDto: PipelineTask = updateTask.mock.calls[0][0]; | ||||
|       expect(taskDto.logs).toHaveLength(2); | ||||
|       expect(taskDto.logs[0].status).toEqual(TaskStatuses.success); | ||||
| @@ -232,7 +232,7 @@ describe('PipelineTaskConsumer', () => { | ||||
|  | ||||
|       await consumer.doTask(job); | ||||
|  | ||||
|       expect(updateTask).toHaveBeenCalledTimes(2); | ||||
|       expect(updateTask).toHaveBeenCalledTimes(4); | ||||
|       const taskDto: PipelineTask = updateTask.mock.calls[0][0]; | ||||
|       expect(taskDto.logs).toHaveLength(2); | ||||
|       expect(taskDto.logs[0].status).toEqual(TaskStatuses.success); | ||||
|   | ||||
| @@ -86,6 +86,7 @@ export class PipelineTaskConsumer { | ||||
|                 taskLogs.find((tl) => tl.unit === unit.type)?.logs ?? '', | ||||
|             ); | ||||
|           task.logs.push(unitLog); | ||||
|           task = await this.service.updateTask(task); | ||||
|           await job.update(task); | ||||
|         } | ||||
|       } | ||||
|   | ||||
| @@ -1,11 +1,16 @@ | ||||
| import { AppBaseEntity } from './../commons/entities/app-base-entity'; | ||||
| import { Field, ObjectType } from '@nestjs/graphql'; | ||||
| import { Column, Entity, ManyToOne } from 'typeorm'; | ||||
| import { ObjectType } from '@nestjs/graphql'; | ||||
| import { Column, Entity, ManyToOne, ValueTransformer } from 'typeorm'; | ||||
| import { Pipeline } from '../pipelines/pipeline.entity'; | ||||
| import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; | ||||
| import { TaskStatuses } from './enums/task-statuses.enum'; | ||||
| import { PipelineUnits } from './enums/pipeline-units.enum'; | ||||
| import { plainToClass } from 'class-transformer'; | ||||
|  | ||||
| const logsTransformer: ValueTransformer = { | ||||
|   from: (value) => plainToClass(PipelineTaskLogs, value), | ||||
|   to: (value) => value, | ||||
| }; | ||||
| @ObjectType() | ||||
| @Entity() | ||||
| export class PipelineTask extends AppBaseEntity { | ||||
| @@ -20,7 +25,7 @@ export class PipelineTask extends AppBaseEntity { | ||||
|   @Column({ type: 'enum', enum: PipelineUnits, array: true }) | ||||
|   units: PipelineUnits[]; | ||||
|  | ||||
|   @Column({ type: 'jsonb', default: '[]' }) | ||||
|   @Column({ type: 'jsonb', default: '[]', transformer: logsTransformer }) | ||||
|   logs: PipelineTaskLogs[]; | ||||
|  | ||||
|   @Column({ type: 'enum', enum: TaskStatuses, default: TaskStatuses.pending }) | ||||
|   | ||||
| @@ -10,6 +10,7 @@ import { EntityNotFoundError } from 'typeorm/error/EntityNotFoundError'; | ||||
| import { Repository } from 'typeorm'; | ||||
| import { Queue } from 'bull'; | ||||
| import { LockFailedException } from '../commons/exceptions/lock-failed.exception'; | ||||
| import { getPubSubToken } from '../commons/pub-sub/utils/token'; | ||||
|  | ||||
| describe('PipelineTasksService', () => { | ||||
|   let service: PipelineTasksService; | ||||
| @@ -68,6 +69,10 @@ describe('PipelineTasksService', () => { | ||||
|             getClient: jest.fn(() => redisClient), | ||||
|           }, | ||||
|         }, | ||||
|         { | ||||
|           provide: getPubSubToken(), | ||||
|           useValue: {}, | ||||
|         }, | ||||
|       ], | ||||
|     }).compile(); | ||||
|  | ||||
|   | ||||
| @@ -121,13 +121,14 @@ export class PipelineTasksService { | ||||
|   } | ||||
|  | ||||
|   async updateTask(task: PipelineTask) { | ||||
|     this.pubSub.publish(`task:${task.id}`, task); | ||||
|     this.pubSub.publish(`pipeline-task:${task.id}`, task); | ||||
|     return await this.repository.save(task); | ||||
|   } | ||||
|  | ||||
|   async watchTaskUpdated(id: string) { | ||||
|     log('watchTaskUpdated %s', id); | ||||
|     return observableToAsyncIterable(this.pubSub.message$(`task:${id}`)); | ||||
|     return observableToAsyncIterable( | ||||
|       this.pubSub.message$(`pipeline-task:${id}`), | ||||
|     ); | ||||
|   } | ||||
|  | ||||
|   getRedisTokens(pipeline: Pipeline): [string, string] { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user