Skip to content

Commit

Permalink
feat(back): Jobs launch and synchronize (#55)
Browse files Browse the repository at this point in the history
* feat(worker/gmail): Setup basic email sender and Docker

* feat(back): google send mail reaction

* fix: adjustments

* fix: send_email constant unused

* fix: workflow step id in python file

* feat(synchronize): kill all jobs on production when starting supervisor

* feat(back): synchronize jobs, first areas migration, job rename

* feat(back): area workflows chained

* fix(back): docker environment and synchronize killAll

* fix(back): debug print in gmail py

* fix: removed migration

---------

Co-authored-by: Reza Rahemtola <[email protected]>
  • Loading branch information
EdenComp and RezaRahemtola authored Oct 9, 2023
1 parent f9ef3ea commit 45c1960
Show file tree
Hide file tree
Showing 18 changed files with 211 additions and 85 deletions.
13 changes: 8 additions & 5 deletions backend/back/src/grpc/grpc.controller.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import { Controller } from "@nestjs/common";
import { Controller, forwardRef, Inject } from "@nestjs/common";
import { GrpcMethod } from "@nestjs/microservices";
import { ApiExcludeController } from "@nestjs/swagger";
import { JobData } from "./grpc.dto";
import { JobsService } from "../jobs/jobs.service";

@ApiExcludeController()
@Controller()
export class GrpcController {
constructor(@Inject(forwardRef(() => JobsService)) private readonly jobsService: JobsService) {}

@GrpcMethod("AreaBackService", "OnAction")
onAction(data: JobData): void {
console.log("OnAction", data);
async onAction(data: JobData): Promise<void> {
await this.jobsService.launchNextJob(data);
}

@GrpcMethod("AreaBackService", "OnReaction")
onReaction(data: JobData): void {
console.log("OnReaction", data);
async onReaction(data: JobData): Promise<void> {
await this.jobsService.launchNextJob(data);
}
}
4 changes: 3 additions & 1 deletion backend/back/src/grpc/grpc.module.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Module } from "@nestjs/common";
import { forwardRef, Module } from "@nestjs/common";
import { GrpcController } from "./grpc.controller";
import { GrpcService } from "./grpc.service";
import { ClientsModule, Transport } from "@nestjs/microservices";
import { ConfigModule, ConfigService } from "@nestjs/config";
import { JobsModule } from "../jobs/jobs.module";

@Module({
imports: [
Expand All @@ -24,6 +25,7 @@ import { ConfigModule, ConfigService } from "@nestjs/config";
}),
},
]),
forwardRef(() => JobsModule),
],
controllers: [GrpcController],
providers: [GrpcService],
Expand Down
19 changes: 12 additions & 7 deletions backend/back/src/grpc/grpc.service.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
import { Inject, Injectable, OnModuleInit } from "@nestjs/common";
import { forwardRef, Inject, Injectable, OnModuleInit } from "@nestjs/common";
import { ClientGrpc } from "@nestjs/microservices";
import { GrpcResponse, JobData, JobId, JobList } from "./grpc.dto";
import { firstValueFrom, Observable } from "rxjs";
import { JobsParams, JobsType } from "../types/jobs";
import { JobsIdentifiers } from "../types/jobIds";
import "../types/struct";
import { JobsService } from "../jobs/jobs.service";

interface AreaSupervisorService {
launchJob(data: JobData): Observable<GrpcResponse>;
killJob(job: JobId): Observable<GrpcResponse>;
killAllJobs(): Observable<GrpcResponse>;
listJobs(): Observable<JobList>;
killAllJobs(_: object): Observable<GrpcResponse>;
listJobs(_: object): Observable<JobList>;
}

@Injectable()
export class GrpcService implements OnModuleInit {
private areaSupervisorService: AreaSupervisorService;

constructor(@Inject("AREA_SUPERVISOR_PACKAGE") private readonly client: ClientGrpc) {}
constructor(
@Inject("AREA_SUPERVISOR_PACKAGE") private readonly client: ClientGrpc,
@Inject(forwardRef(() => JobsService)) private readonly jobsService: JobsService,
) {}

onModuleInit() {
async onModuleInit() {
this.areaSupervisorService = this.client.getService<AreaSupervisorService>("AreaSupervisorService");
await this.jobsService.synchronizeJobs();
}

launchJob<TJob extends JobsType, TParams extends JobsParams["mappings"][TJob]>(
Expand All @@ -46,10 +51,10 @@ export class GrpcService implements OnModuleInit {
}

killAllJobs() {
return firstValueFrom(this.areaSupervisorService.killAllJobs());
return firstValueFrom(this.areaSupervisorService.killAllJobs({}));
}

listJobs() {
return firstValueFrom(this.areaSupervisorService.listJobs());
return firstValueFrom(this.areaSupervisorService.listJobs({}));
}
}
13 changes: 0 additions & 13 deletions backend/back/src/jobs/jobs.dto.ts

This file was deleted.

2 changes: 1 addition & 1 deletion backend/back/src/jobs/jobs.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { TypeOrmModule } from "@nestjs/typeorm";
import WorkflowArea from "../workflows/entities/workflow-area.entity";

@Module({
imports: [GrpcModule, WorkflowsModule, TypeOrmModule.forFeature([WorkflowArea]), forwardRef(() => WorkflowsModule)],
imports: [TypeOrmModule.forFeature([WorkflowArea]), forwardRef(() => WorkflowsModule), forwardRef(() => GrpcModule)],
controllers: [],
providers: [JobsService],
exports: [JobsService],
Expand Down
69 changes: 49 additions & 20 deletions backend/back/src/jobs/jobs.service.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,46 @@
import { BadRequestException, HttpException, Injectable } from "@nestjs/common";
import { BadRequestException, forwardRef, Inject, Injectable } from "@nestjs/common";
import { GrpcService } from "../grpc/grpc.service";
import { LaunchJobDto } from "./jobs.dto";
import { JobParamsClasses, JobsParams, JobsType } from "../types/jobs";
import { validate } from "class-validator";
import { plainToInstance } from "class-transformer";
import { WorkflowsService } from "../workflows/workflows.service";
import { InjectRepository } from "@nestjs/typeorm";
import WorkflowArea from "../workflows/entities/workflow-area.entity";
import { Repository } from "typeorm";
import { JobData } from "../grpc/grpc.dto";
import { RuntimeException } from "@nestjs/core/errors/exceptions";

@Injectable()
export class JobsService {
constructor(
private readonly grpcService: GrpcService,
private readonly workflowsService: WorkflowsService,
@InjectRepository(WorkflowArea)
private readonly workflowAreaRepository: Repository<WorkflowArea>,
@Inject(forwardRef(() => GrpcService)) private readonly grpcService: GrpcService,
@Inject(forwardRef(() => WorkflowsService)) private readonly workflowsService: WorkflowsService,
@InjectRepository(WorkflowArea) private readonly workflowAreaRepository: Repository<WorkflowArea>,
) {}

async getActionJobsToStart() {
async getActionJobsToStart(): Promise<JobData[]> {
return (await this.workflowsService.getWorkflowsWithAreas(undefined, true)).map(
({ action: { jobId, parameters } }) => ({ jobId, params: parameters }),
({ action: { areaId, areaServiceId, jobId: identifier, parameters: params } }) => ({
name: `${areaServiceId}-${areaId}`,
identifier,
params,
}),
);
}

async getReactionsForJob(jobId: string) {
return (
await this.workflowAreaRepository.find({
where: { jobId },
relations: { nextWorkflowReactions: true },
})
).map(({ nextWorkflowReactions }) => nextWorkflowReactions.map(({ jobId, parameters }) => ({ jobId, parameters })));
async getReactionsForJob(jobId: string): Promise<JobData[]> {
const jobs = await this.workflowAreaRepository.find({
where: { jobId },
relations: { nextWorkflowReactions: { area: true } },
});

return jobs.flatMap(({ nextWorkflowReactions }) =>
nextWorkflowReactions.map(({ jobId: identifier, parameters: params, area }) => ({
name: `${area.serviceId}-${area.id}`,
identifier,
params,
})),
);
}

async convertParams<TJobs extends JobsType>(job: JobsType, params: unknown): Promise<JobsParams["mappings"][TJobs]> {
Expand All @@ -55,12 +65,31 @@ export class JobsService {
return data as JobsParams["mappings"][TJobs];
}

async launchJob(job: LaunchJobDto): Promise<void> {
const params = await this.convertParams(job.job, job.params);
const response = await this.grpcService.launchJob(job.job, params);
async launchJobs(jobs: JobData[]): Promise<void> {
for (const job of jobs) {
const jobType: JobsType = job.name as JobsType;
const params = await this.convertParams(jobType as JobsType, job.params);
const response = await this.grpcService.launchJob(jobType, params);

if (response.error) {
// TODO: Error handling in db & front
throw new RuntimeException(`Error while launching job: ${response.error.message}`);
}
}
}

async launchNextJob(data: JobData): Promise<void> {
const jobs = await this.getReactionsForJob(data.identifier);
await this.launchJobs(jobs);
}

async synchronizeJobs(): Promise<void> {
const jobs = await this.getActionJobsToStart();

if (response.error) {
throw new HttpException(response.error.message, response.error.code);
const res = await this.grpcService.killAllJobs();
if (res.error) {
throw new RuntimeException(`Error while synchronizing: ${res.error.message}`);
}
await this.launchJobs(jobs);
}
}
2 changes: 1 addition & 1 deletion backend/back/src/types/jobIds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ type JobsIdentifiers = {
};

export const JobsIdentifiers: JobsIdentifiers = {
"seconds-interval": ({ workflowStepId }) => `seconds-interval-${workflowStepId}`,
"timer-seconds-interval": ({ workflowStepId }) => `timer-seconds-interval-${workflowStepId}`,
"google-send-email": ({ workflowStepId }) => `google-send-email-${workflowStepId}`,
};
2 changes: 1 addition & 1 deletion backend/back/src/types/jobParams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class UniqueJobParams {
workflowStepId: string;
}

export class SecondIntervalParams extends UniqueJobParams {
export class TimerSecondIntervalParams extends UniqueJobParams {
@IsNumber()
seconds: number;
}
Expand Down
8 changes: 4 additions & 4 deletions backend/back/src/types/jobs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { GoogleSendEmailParams, SecondIntervalParams } from "./jobParams";
import { GoogleSendEmailParams, TimerSecondIntervalParams } from "./jobParams";

export enum Jobs {
"seconds-interval" = "seconds-interval",
"timer-seconds-interval" = "timer-seconds-interval",
"google-send-email" = "google-send-email",
}

Expand All @@ -12,11 +12,11 @@ interface Mapper<TMappings extends Record<JobsType, object>> {
}

export const JobParamsClasses = {
"seconds-interval": SecondIntervalParams,
"timer-seconds-interval": TimerSecondIntervalParams,
"google-send-email": GoogleSendEmailParams,
};

export type JobsParams = Mapper<{
"seconds-interval": SecondIntervalParams;
"timer-seconds-interval": TimerSecondIntervalParams;
"google-send-email": GoogleSendEmailParams;
}>;
6 changes: 6 additions & 0 deletions backend/supervisor/.env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
GO_ENV=development

GRPC_SERVER_PORT=50051

GRPC_CALLBACK_HOST=localhost
GRPC_CALLBACK_PORT=50050

GOOGLE_CLIENT_ID=
GOOGLE_CLIENT_SECRET=
GOOGLE_TOKEN_URI=
7 changes: 6 additions & 1 deletion backend/supervisor/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ services:

gmail:
container_name: gmail
build: ../workers/gmail
build:
context: ../workers/gmail
args:
- GOOGLE_CLIENT_ID=${GOOGLE_CLIENT_ID}
- GOOGLE_CLIENT_SECRET=${GOOGLE_CLIENT_SECRET}
- GOOGLE_TOKEN_URI=${GOOGLE_TOKEN_URI}
18 changes: 16 additions & 2 deletions backend/supervisor/jobs/images.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
package jobs

var JobToImage = map[string]string{
"seconds-interval": "seconds-interval",
"google-send-email": "gmail",
"timer-seconds-interval": "supervisor-seconds-interval",
"google-send-email": "supervisor-gmail",
}

var OptArgument = map[string]string{
"timer-seconds-interval": "",
"google-send-email": "send-email",
}

func GetImages() []string {
v := make([]string, 0, len(JobToImage))

for _, value := range JobToImage {
v = append(v, value)
}
return v
}
Loading

0 comments on commit 45c1960

Please sign in to comment.