feat(pipeline-tasks): 任务更新推送时机修改。

This commit is contained in:
Ivan Li 2021-04-05 16:34:12 +08:00
parent a82f663354
commit 0e0781c4c4
4 changed files with 16 additions and 6 deletions

View File

@ -1,6 +1,7 @@
import { TaskStatuses } from '../enums/task-statuses.enum'; import { TaskStatuses } from '../enums/task-statuses.enum';
import { PipelineUnits } from '../enums/pipeline-units.enum'; import { PipelineUnits } from '../enums/pipeline-units.enum';
import { Field, ObjectType } from '@nestjs/graphql'; import { Field, ObjectType } from '@nestjs/graphql';
import { Type } from 'class-transformer';
@ObjectType() @ObjectType()
export class PipelineTaskLogs { export class PipelineTaskLogs {
@ -8,7 +9,9 @@ export class PipelineTaskLogs {
unit: PipelineUnits; unit: PipelineUnits;
@Field(() => TaskStatuses) @Field(() => TaskStatuses)
status: TaskStatuses; status: TaskStatuses;
@Type(() => Date)
startedAt?: Date; startedAt?: Date;
@Type(() => Date)
endedAt?: Date; endedAt?: Date;
logs = ''; logs = '';
} }

View File

@ -86,6 +86,7 @@ export class PipelineTaskConsumer {
taskLogs.find((tl) => tl.unit === unit.type)?.logs ?? '', taskLogs.find((tl) => tl.unit === unit.type)?.logs ?? '',
); );
task.logs.push(unitLog); task.logs.push(unitLog);
task = await this.service.updateTask(task);
await job.update(task); await job.update(task);
} }
} }

View File

@ -1,11 +1,16 @@
import { AppBaseEntity } from './../commons/entities/app-base-entity'; import { AppBaseEntity } from './../commons/entities/app-base-entity';
import { Field, ObjectType } from '@nestjs/graphql'; import { ObjectType } from '@nestjs/graphql';
import { Column, Entity, ManyToOne } from 'typeorm'; import { Column, Entity, ManyToOne, ValueTransformer } from 'typeorm';
import { Pipeline } from '../pipelines/pipeline.entity'; import { Pipeline } from '../pipelines/pipeline.entity';
import { PipelineTaskLogs } from './models/pipeline-task-logs.model'; import { PipelineTaskLogs } from './models/pipeline-task-logs.model';
import { TaskStatuses } from './enums/task-statuses.enum'; import { TaskStatuses } from './enums/task-statuses.enum';
import { PipelineUnits } from './enums/pipeline-units.enum'; import { PipelineUnits } from './enums/pipeline-units.enum';
import { plainToClass } from 'class-transformer';
const logsTransformer: ValueTransformer = {
from: (value) => plainToClass(PipelineTaskLogs, value),
to: (value) => value,
};
@ObjectType() @ObjectType()
@Entity() @Entity()
export class PipelineTask extends AppBaseEntity { export class PipelineTask extends AppBaseEntity {
@ -20,7 +25,7 @@ export class PipelineTask extends AppBaseEntity {
@Column({ type: 'enum', enum: PipelineUnits, array: true }) @Column({ type: 'enum', enum: PipelineUnits, array: true })
units: PipelineUnits[]; units: PipelineUnits[];
@Column({ type: 'jsonb', default: '[]' }) @Column({ type: 'jsonb', default: '[]', transformer: logsTransformer })
logs: PipelineTaskLogs[]; logs: PipelineTaskLogs[];
@Column({ type: 'enum', enum: TaskStatuses, default: TaskStatuses.pending }) @Column({ type: 'enum', enum: TaskStatuses, default: TaskStatuses.pending })

View File

@ -121,13 +121,14 @@ export class PipelineTasksService {
} }
async updateTask(task: PipelineTask) { async updateTask(task: PipelineTask) {
this.pubSub.publish(`task:${task.id}`, task); this.pubSub.publish(`pipeline-task:${task.id}`, task);
return await this.repository.save(task); return await this.repository.save(task);
} }
async watchTaskUpdated(id: string) { async watchTaskUpdated(id: string) {
log('watchTaskUpdated %s', id); return observableToAsyncIterable(
return observableToAsyncIterable(this.pubSub.message$(`task:${id}`)); this.pubSub.message$(`pipeline-task:${id}`),
);
} }
getRedisTokens(pipeline: Pipeline): [string, string] { getRedisTokens(pipeline: Pipeline): [string, string] {