feat-pipelines #1
@ -6,16 +6,19 @@ import { Field, HideField, ObjectType } from '@nestjs/graphql';
|
|||||||
export class PipelineTaskLogMessage {
|
export class PipelineTaskLogMessage {
|
||||||
@HideField()
|
@HideField()
|
||||||
task: PipelineTask;
|
task: PipelineTask;
|
||||||
@Field(() => PipelineUnits)
|
@Field(() => PipelineUnits, { nullable: true })
|
||||||
unit: PipelineUnits;
|
unit?: PipelineUnits;
|
||||||
|
@Field()
|
||||||
time: Date;
|
time: Date;
|
||||||
|
@Field()
|
||||||
message: string;
|
message: string;
|
||||||
|
|
||||||
static create(task: PipelineTask, message: string) {
|
static create(task: PipelineTask, unit: PipelineUnits, message: string) {
|
||||||
return Object.assign(new PipelineTaskLogMessage(), {
|
return Object.assign(new PipelineTaskLogMessage(), {
|
||||||
task,
|
task,
|
||||||
message,
|
message,
|
||||||
time: new Date(),
|
time: new Date(),
|
||||||
|
unit,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,7 +57,12 @@ export class PipelineTaskConsumer {
|
|||||||
}
|
}
|
||||||
for (const script of unit.scripts) {
|
for (const script of unit.scripts) {
|
||||||
unitLog.logs += `[RUN SCRIPT] ${script}`;
|
unitLog.logs += `[RUN SCRIPT] ${script}`;
|
||||||
const messages = await this.runScript(script, workspaceRoot, task);
|
const messages = await this.runScript(
|
||||||
|
script,
|
||||||
|
workspaceRoot,
|
||||||
|
task,
|
||||||
|
unit.type,
|
||||||
|
);
|
||||||
unitLog.logs += messages.join('');
|
unitLog.logs += messages.join('');
|
||||||
}
|
}
|
||||||
unitLog.status = TaskStatuses.success;
|
unitLog.status = TaskStatuses.success;
|
||||||
@ -83,6 +88,7 @@ export class PipelineTaskConsumer {
|
|||||||
script: string,
|
script: string,
|
||||||
workspaceRoot: string,
|
workspaceRoot: string,
|
||||||
task?: PipelineTask,
|
task?: PipelineTask,
|
||||||
|
unit?: PipelineUnits,
|
||||||
): Promise<string[]> {
|
): Promise<string[]> {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
const errorMessages: string[] = [];
|
const errorMessages: string[] = [];
|
||||||
@ -95,12 +101,16 @@ export class PipelineTaskConsumer {
|
|||||||
const str = data.toString();
|
const str = data.toString();
|
||||||
errorMessages.push(str);
|
errorMessages.push(str);
|
||||||
logs.push(str);
|
logs.push(str);
|
||||||
this.logsService.recordLog(PipelineTaskLogMessage.create(task, str));
|
this.logsService.recordLog(
|
||||||
|
PipelineTaskLogMessage.create(task, unit, str),
|
||||||
|
);
|
||||||
});
|
});
|
||||||
sub.stdout.on('data', (data: Buffer) => {
|
sub.stdout.on('data', (data: Buffer) => {
|
||||||
const str = data.toString();
|
const str = data.toString();
|
||||||
logs.push(str);
|
logs.push(str);
|
||||||
this.logsService.recordLog(PipelineTaskLogMessage.create(task, str));
|
this.logsService.recordLog(
|
||||||
|
PipelineTaskLogMessage.create(task, unit, str),
|
||||||
|
);
|
||||||
});
|
});
|
||||||
sub.addListener('close', (code) => {
|
sub.addListener('close', (code) => {
|
||||||
if (code === 0) {
|
if (code === 0) {
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import { Resolver, Args, Mutation, Subscription } from '@nestjs/graphql';
|
import { Resolver, Args, Mutation, Subscription, Query } from '@nestjs/graphql';
|
||||||
import { PipelineTask } from './pipeline-task.entity';
|
import { PipelineTask } from './pipeline-task.entity';
|
||||||
import { PipelineTasksService } from './pipeline-tasks.service';
|
import { PipelineTasksService } from './pipeline-tasks.service';
|
||||||
import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input';
|
import { CreatePipelineTaskInput } from './dtos/create-pipeline-task.input';
|
||||||
@ -18,10 +18,24 @@ export class PipelineTasksResolver {
|
|||||||
return await this.service.addTask(taskDto);
|
return await this.service.addTask(taskDto);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Subscription(() => PipelineTaskLogMessage)
|
@Subscription(() => PipelineTaskLogMessage, {
|
||||||
|
resolve: (value) => {
|
||||||
|
return value;
|
||||||
|
},
|
||||||
|
})
|
||||||
async pipelineTaskLog(@Args() args: PipelineTaskLogArgs) {
|
async pipelineTaskLog(@Args() args: PipelineTaskLogArgs) {
|
||||||
const task = await this.service.findTaskById(args.taskId);
|
const task = await this.service.findTaskById(args.taskId);
|
||||||
const asyncIterator = this.logsService.watchLogs(task);
|
const asyncIterator = this.logsService.watchLogs(task);
|
||||||
return asyncIterator;
|
return asyncIterator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Query(() => [PipelineTask])
|
||||||
|
async listPipelineTaskByPipelineId(@Args('pipelineId') pipelineId: string) {
|
||||||
|
return await this.service.listTasksByPipelineId(pipelineId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Query(() => PipelineTask)
|
||||||
|
async findPipelineTask(@Args('id') id: string) {
|
||||||
|
return await this.service.findTaskById(id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,10 @@ export class PipelineTasksService {
|
|||||||
return await this.repository.findOneOrFail({ id });
|
return await this.repository.findOneOrFail({ id });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async listTasksByPipelineId(pipelineId: string) {
|
||||||
|
return await this.repository.find({ pipelineId });
|
||||||
|
}
|
||||||
|
|
||||||
async doNextTask(pipeline: Pipeline) {
|
async doNextTask(pipeline: Pipeline) {
|
||||||
const [lckKey, tasksKey] = this.getRedisTokens(pipeline);
|
const [lckKey, tasksKey] = this.getRedisTokens(pipeline);
|
||||||
const redis = this.redis.getClient();
|
const redis = this.redis.getClient();
|
||||||
|
Loading…
Reference in New Issue
Block a user