feat(pipeline-task): add pipelineTaskEvent api.

This commit is contained in:
Ivan Li 2021-06-05 21:34:19 +08:00
parent 7091f9df6a
commit 20612d4301
7 changed files with 40 additions and 10 deletions

View File

@ -1,13 +1,25 @@
import { Field, ObjectType } from '@nestjs/graphql';
import { PipelineUnits } from '../enums/pipeline-units.enum'; import { PipelineUnits } from '../enums/pipeline-units.enum';
import { TaskStatuses } from '../enums/task-statuses.enum'; import { TaskStatuses } from '../enums/task-statuses.enum';
import { Type } from 'class-transformer';
@ObjectType()
export class PipelineTaskEvent { export class PipelineTaskEvent {
@Field()
taskId: string; taskId: string;
@Field()
pipelineId: string; pipelineId: string;
@Field()
projectId: string; projectId: string;
@Field(() => PipelineUnits, { nullable: true })
unit: PipelineUnits | null; unit: PipelineUnits | null;
@Field()
@Type(() => Date)
emittedAt: Date; emittedAt: Date;
@Field()
message: string; message: string;
@Field()
messageType: 'stdout' | 'stderr' | 'stdin'; messageType: 'stdout' | 'stderr' | 'stdin';
@Field(() => TaskStatuses)
status: TaskStatuses; status: TaskStatuses;
} }

View File

@ -23,6 +23,7 @@ describe('PipelineTaskRunner', () => {
it('normal', async () => { it('normal', async () => {
const event = new PipelineTaskEvent(); const event = new PipelineTaskEvent();
event.taskId = 'test'; event.taskId = 'test';
event.emittedAt = new Date().toISOString() as any;
const message$ = logger.getMessage$('test'); const message$ = logger.getMessage$('test');
let receiveEvent; let receiveEvent;

View File

@ -1,5 +1,6 @@
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { Injectable, OnModuleDestroy } from '@nestjs/common'; import { Injectable, OnModuleDestroy } from '@nestjs/common';
import { plainToClass } from 'class-transformer';
import { Observable, Subject } from 'rxjs'; import { Observable, Subject } from 'rxjs';
import { filter, publish, tap } from 'rxjs/operators'; import { filter, publish, tap } from 'rxjs/operators';
import { PipelineTaskEvent } from './models/pipeline-task-event'; import { PipelineTaskEvent } from './models/pipeline-task-event';
@ -19,7 +20,7 @@ export class PipelineTaskLogger implements OnModuleDestroy {
queue: QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT, queue: QUEUE_HANDLE_PIPELINE_TASK_LOG_EVENT,
}) })
async handleEvent(message: PipelineTaskEvent) { async handleEvent(message: PipelineTaskEvent) {
this.messageSubject.next(message); this.messageSubject.next(plainToClass(PipelineTaskEvent, message));
} }
getMessage$(taskId: string) { getMessage$(taskId: string) {

View File

@ -11,6 +11,7 @@ import { ConfigModule, ConfigService } from '@nestjs/config';
import { PipelineTaskRunner } from './pipeline-task.runner'; import { PipelineTaskRunner } from './pipeline-task.runner';
import { spawn } from 'child_process'; import { spawn } from 'child_process';
import { EXCHANGE_PIPELINE_TASK_FANOUT } from './pipeline-tasks.constants'; import { EXCHANGE_PIPELINE_TASK_FANOUT } from './pipeline-tasks.constants';
import { PipelineTaskLogger } from './pipeline-task.logger';
@Module({ @Module({
imports: [ imports: [
@ -64,6 +65,7 @@ import { EXCHANGE_PIPELINE_TASK_FANOUT } from './pipeline-tasks.constants';
PipelineTasksService, PipelineTasksService,
PipelineTasksResolver, PipelineTasksResolver,
PipelineTaskRunner, PipelineTaskRunner,
PipelineTaskLogger,
{ {
provide: 'spawn', provide: 'spawn',
useValue: spawn, useValue: spawn,

View File

@ -1,4 +1,5 @@
import { Test, TestingModule } from '@nestjs/testing'; import { Test, TestingModule } from '@nestjs/testing';
import { PipelineTaskLogger } from './pipeline-task.logger';
import { PipelineTasksResolver } from './pipeline-tasks.resolver'; import { PipelineTasksResolver } from './pipeline-tasks.resolver';
import { PipelineTasksService } from './pipeline-tasks.service'; import { PipelineTasksService } from './pipeline-tasks.service';
@ -13,6 +14,10 @@ describe('PipelineTasksResolver', () => {
provide: PipelineTasksService, provide: PipelineTasksService,
useValue: {}, useValue: {},
}, },
{
provide: PipelineTaskLogger,
useValue: {},
},
], ],
}).compile(); }).compile();

View File

@ -2,29 +2,35 @@ import { Resolver, Args, Mutation, Subscription, Query } from '@nestjs/graphql';
import { PipelineTask } from './pipeline-task.entity'; import { PipelineTask } from './pipeline-task.entity';
import { PipelineTasksService } from './pipeline-tasks.service'; import { PipelineTasksService } from './pipeline-tasks.service';
import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input'; import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input';
import { PipelineTaskLogMessage } from './models/pipeline-task-log-message.module';
import { PipelineTaskLogArgs } from './dtos/pipeline-task-log.args'; import { PipelineTaskLogArgs } from './dtos/pipeline-task-log.args';
import { plainToClass } from 'class-transformer'; import { plainToClass } from 'class-transformer';
import { PipelineTaskLogger } from './pipeline-task.logger';
import { observableToAsyncIterable } from '@graphql-tools/utils';
import { PipelineTaskEvent } from './models/pipeline-task-event';
@Resolver() @Resolver()
export class PipelineTasksResolver { export class PipelineTasksResolver {
constructor(private readonly service: PipelineTasksService) {} constructor(
private readonly service: PipelineTasksService,
private readonly taskLogger: PipelineTaskLogger,
) {}
@Mutation(() => PipelineTask) @Mutation(() => PipelineTask)
async createPipelineTask(@Args('task') taskDto: CreatePipelineTaskInput) { async createPipelineTask(@Args('task') taskDto: CreatePipelineTaskInput) {
return await this.service.addTask(taskDto); return await this.service.addTask(taskDto);
} }
@Subscription(() => PipelineTaskLogMessage, { @Subscription(() => PipelineTaskEvent, {
resolve: (value) => { resolve: (value) => {
const data = plainToClass(PipelineTaskLogMessage, value); const data = plainToClass(PipelineTaskEvent, value);
return data; return data;
}, },
}) })
async pipelineTaskLog(@Args() args: PipelineTaskLogArgs) { async pipelineTaskEvent(@Args() args: PipelineTaskLogArgs) {
// const task = await this.service.findTaskById(args.taskId); const task = await this.service.findTaskById(args.taskId);
// const asyncIterator = this.logsService.watchLogs(task); return observableToAsyncIterable<PipelineTaskEvent>(
// return asyncIterator; this.taskLogger.getMessage$(task.id),
);
} }
@Subscription(() => PipelineTask, { @Subscription(() => PipelineTask, {

View File

@ -51,7 +51,10 @@ export class PipelineTasksService {
} }
async listTasksByCommitHash(hash: string) { async listTasksByCommitHash(hash: string) {
return await this.repository.find({ commit: hash }); return await this.repository.find({
where: { commit: hash },
order: { createdAt: 'DESC' },
});
} }
getRedisTokens(pipeline: Pipeline): [string, string] { getRedisTokens(pipeline: Pipeline): [string, string] {