From 20612d4301bf4946bbd0f7139fd12727d99c67a3 Mon Sep 17 00:00:00 2001 From: Ivan Li Date: Sat, 5 Jun 2021 21:34:19 +0800 Subject: [PATCH] feat(pipeline-task): add pipelineTaskEvent api. --- .../models/pipeline-task-event.ts | 12 ++++++++++ .../pipeline-task.logger.spec.ts | 1 + src/pipeline-tasks/pipeline-task.logger.ts | 3 ++- src/pipeline-tasks/pipeline-tasks.module.ts | 2 ++ .../pipeline-tasks.resolver.spec.ts | 5 +++++ src/pipeline-tasks/pipeline-tasks.resolver.ts | 22 ++++++++++++------- src/pipeline-tasks/pipeline-tasks.service.ts | 5 ++++- 7 files changed, 40 insertions(+), 10 deletions(-) diff --git a/src/pipeline-tasks/models/pipeline-task-event.ts b/src/pipeline-tasks/models/pipeline-task-event.ts index 8021300..fe02c33 100644 --- a/src/pipeline-tasks/models/pipeline-task-event.ts +++ b/src/pipeline-tasks/models/pipeline-task-event.ts @@ -1,13 +1,25 @@ +import { Field, ObjectType } from '@nestjs/graphql'; import { PipelineUnits } from '../enums/pipeline-units.enum'; import { TaskStatuses } from '../enums/task-statuses.enum'; +import { Type } from 'class-transformer'; +@ObjectType() export class PipelineTaskEvent { + @Field() taskId: string; + @Field() pipelineId: string; + @Field() projectId: string; + @Field(() => PipelineUnits, { nullable: true }) unit: PipelineUnits | null; + @Field() + @Type(() => Date) emittedAt: Date; + @Field() message: string; + @Field() messageType: 'stdout' | 'stderr' | 'stdin'; + @Field(() => TaskStatuses) status: TaskStatuses; } diff --git a/src/pipeline-tasks/pipeline-task.logger.spec.ts b/src/pipeline-tasks/pipeline-task.logger.spec.ts index 7a2da27..0da2696 100644 --- a/src/pipeline-tasks/pipeline-task.logger.spec.ts +++ b/src/pipeline-tasks/pipeline-task.logger.spec.ts @@ -23,6 +23,7 @@ describe('PipelineTaskRunner', () => { it('normal', async () => { const event = new PipelineTaskEvent(); event.taskId = 'test'; + event.emittedAt = new Date().toISOString() as any; const message$ = logger.getMessage$('test'); let receiveEvent; diff --git a/src/pipeline-tasks/pipeline-task.logger.ts b/src/pipeline-tasks/pipeline-task.logger.ts index e729450..096b6c7 100644 --- a/src/pipeline-tasks/pipeline-task.logger.ts +++ b/src/pipeline-tasks/pipeline-task.logger.ts @@ -1,5 +1,6 @@ import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import { plainToClass } from 'class-transformer'; import { Observable, Subject } from 'rxjs'; import { filter, publish, tap } from 'rxjs/operators'; import { PipelineTaskEvent } from './models/pipeline-task-event'; @@ -19,7 +20,7 @@ export class PipelineTaskLogger implements OnModuleDestroy { queue: QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT, }) async handleEvent(message: PipelineTaskEvent) { - this.messageSubject.next(message); + this.messageSubject.next(plainToClass(PipelineTaskEvent, message)); } getMessage$(taskId: string) { diff --git a/src/pipeline-tasks/pipeline-tasks.module.ts b/src/pipeline-tasks/pipeline-tasks.module.ts index 1067c27..428c72d 100644 --- a/src/pipeline-tasks/pipeline-tasks.module.ts +++ b/src/pipeline-tasks/pipeline-tasks.module.ts @@ -11,6 +11,7 @@ import { ConfigModule, ConfigService } from '@nestjs/config'; import { PipelineTaskRunner } from './pipeline-task.runner'; import { spawn } from 'child_process'; import { EXCHANGE_PIPELINE_TASK_FANOUT } from './pipeline-tasks.constants'; +import { PipelineTaskLogger } from './pipeline-task.logger'; @Module({ imports: [ @@ -64,6 +65,7 @@ import { EXCHANGE_PIPELINE_TASK_FANOUT } from './pipeline-tasks.constants'; PipelineTasksService, PipelineTasksResolver, PipelineTaskRunner, + PipelineTaskLogger, { provide: 'spawn', useValue: spawn, diff --git a/src/pipeline-tasks/pipeline-tasks.resolver.spec.ts b/src/pipeline-tasks/pipeline-tasks.resolver.spec.ts index a4bf65d..aed0489 100644 --- a/src/pipeline-tasks/pipeline-tasks.resolver.spec.ts +++ b/src/pipeline-tasks/pipeline-tasks.resolver.spec.ts @@ -1,4 +1,5 @@ import { Test, TestingModule } from '@nestjs/testing'; +import { PipelineTaskLogger } from './pipeline-task.logger'; import { PipelineTasksResolver } from './pipeline-tasks.resolver'; import { PipelineTasksService } from './pipeline-tasks.service'; @@ -13,6 +14,10 @@ describe('PipelineTasksResolver', () => { provide: PipelineTasksService, useValue: {}, }, + { + provide: PipelineTaskLogger, + useValue: {}, + }, ], }).compile(); diff --git a/src/pipeline-tasks/pipeline-tasks.resolver.ts b/src/pipeline-tasks/pipeline-tasks.resolver.ts index 3dfb00b..079f4aa 100644 --- a/src/pipeline-tasks/pipeline-tasks.resolver.ts +++ b/src/pipeline-tasks/pipeline-tasks.resolver.ts @@ -2,29 +2,35 @@ import { Resolver, Args, Mutation, Subscription, Query } from '@nestjs/graphql'; import { PipelineTask } from './pipeline-task.entity'; import { PipelineTasksService } from './pipeline-tasks.service'; import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input'; -import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module'; import { PipelineTaskLogArgs } from './dtos/pipeline-task-log.args'; import { plainToClass } from 'class-transformer'; +import { PipelineTaskLogger } from './pipeline-task.logger'; +import { observableToAsyncIterable } from '@graphql-tools/utils'; +import { PipelineTaskEvent } from './models/pipeline-task-event'; @Resolver() export class PipelineTasksResolver { - constructor(private readonly service: PipelineTasksService) {} + constructor( + private readonly service: PipelineTasksService, + private readonly taskLogger: PipelineTaskLogger, + ) {} @Mutation(() => PipelineTask) async createPipelineTask(@Args('task') taskDto: CreatePipelineTaskInput) { return await this.service.addTask(taskDto); } - @Subscription(() => PipelineTaskLogMessage, { + @Subscription(() => PipelineTaskEvent, { resolve: (value) => { - const data = plainToClass(PipelineTaskLogMessage, value); + const data = plainToClass(PipelineTaskEvent, value); return data; }, }) - async pipelineTaskLog(@Args() args: PipelineTaskLogArgs) { - // const task = await this.service.findTaskById(args.taskId); - // const asyncIterator = this.logsService.watchLogs(task); - // return asyncIterator; + async pipelineTaskEvent(@Args() args: PipelineTaskLogArgs) { + const task = await this.service.findTaskById(args.taskId); + return observableToAsyncIterable( + this.taskLogger.getMessage$(task.id), + ); } @Subscription(() => PipelineTask, { diff --git a/src/pipeline-tasks/pipeline-tasks.service.ts b/src/pipeline-tasks/pipeline-tasks.service.ts index 0aeff08..812a3c3 100644 --- a/src/pipeline-tasks/pipeline-tasks.service.ts +++ b/src/pipeline-tasks/pipeline-tasks.service.ts @@ -51,7 +51,10 @@ export class PipelineTasksService { } async listTasksByCommitHash(hash: string) { - return await this.repository.find({ commit: hash }); + return await this.repository.find({ + where: { commit: hash }, + order: { createdAt: 'DESC' }, + }); } getRedisTokens(pipeline: Pipeline): [string, string] {