Skip to content

Commit

Permalink
refactor(queue): use native rpc implementation of rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
nicotsx committed Dec 7, 2024
1 parent f25d525 commit 12bcd22
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 151 deletions.
1 change: 0 additions & 1 deletion packages/backend/src/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@ export default async () => {
["./modules/system/dto/system.dto"]: await import("./modules/system/dto/system.dto")
};
return { "@nestjs/swagger": { "models": [[import("./modules/user/dto/user.dto"), { "UserDto": {} }], [import("./modules/marketplace/dto/marketplace.dto"), { "AppInfoSimpleDto": {}, "AppInfoDto": {}, "UpdateInfoDto": {}, "SearchAppsQueryDto": {}, "SearchAppsDto": {}, "AppDetailsDto": {} }], [import("./app.dto"), { "UserSettingsDto": {}, "PartialUserSettingsDto": {}, "AppContextDto": {}, "UserContextDto": {}, "AcknowledgeWelcomeBody": {} }], [import("./modules/queue/queue.entity"), { "Queue": { queueNameResponse: { required: true, type: () => String }, activeTasks: { required: true, type: () => Number }, taskQueue: { required: true, type: () => [Object] }, callbacks: { required: true } } }], [import("./modules/auth/dto/auth.dto"), { "LoginBody": {}, "VerifyTotpBody": {}, "LoginDto": {}, "RegisterBody": {}, "RegisterDto": {}, "ChangeUsernameBody": {}, "ChangePasswordBody": {}, "GetTotpUriBody": {}, "GetTotpUriDto": {}, "SetupTotpBody": {}, "DisableTotpBody": {}, "ResetPasswordBody": {}, "ResetPasswordDto": {}, "CheckResetPasswordRequestDto": {} }], [import("./modules/app-lifecycle/dto/app-lifecycle.dto"), { "AppFormBody": {}, "UninstallAppBody": {}, "UpdateAppBody": {} }], [import("./modules/app-stores/dto/app-store.dto"), { "PullDto": {}, "AllAppStoresDto": {} }], [import("./modules/apps/dto/app.dto"), { "AppDto": {}, "MyAppsDto": {}, "GuestAppsDto": {}, "GetAppDto": {} }], [import("./modules/backups/dto/backups.dto"), { "BackupDto": {}, "RestoreAppBackupDto": {}, "GetAppBackupsDto": {}, "GetAppBackupsQueryDto": {}, "DeleteAppBackupBodyDto": {} }], [import("./modules/links/dto/links.dto"), { "LinkBodyDto": {}, "EditLinkBodyDto": {}, "LinksDto": {} }], [import("./modules/system/dto/system.dto"), { "LoadDto": {} }]], "controllers": [[import("./app.controller"), { "AppController": { "userContext": { type: t["./app.dto"].UserContextDto }, "appContext": { type: t["./app.dto"].AppContextDto }, "updateUserSettings": {}, "acknowledgeWelcome": {} } }], [import("./modules/auth/auth.controller"), { "AuthController": { "login": { type: t["./modules/auth/dto/auth.dto"].LoginDto }, "verifyTotp": { type: t["./modules/auth/dto/auth.dto"].LoginDto }, "register": { type: t["./modules/auth/dto/auth.dto"].RegisterDto }, "logout": {}, "changeUsername": {}, "changePassword": {}, "getTotpUri": { type: t["./modules/auth/dto/auth.dto"].GetTotpUriDto }, "setupTotp": {}, "disableTotp": {}, "resetPassword": { type: t["./modules/auth/dto/auth.dto"].ResetPasswordDto }, "cancelResetPassword": {}, "checkResetPasswordRequest": { type: t["./modules/auth/dto/auth.dto"].CheckResetPasswordRequestDto } } }], [import("./modules/i18n/i18n.controller"), { "I18nController": { "getTranslation": { type: Object } } }], [import("./core/health/health.controller"), { "HealthController": { "check": { type: Object } } }], [import("./modules/app-stores/app-store.controller"), { "AppStoreController": { "pull": { type: t["./modules/app-stores/dto/app-store.dto"].PullDto }, "getAll": { type: t["./modules/app-stores/dto/app-store.dto"].AllAppStoresDto } } }], [import("./modules/marketplace/marketplace.controller"), { "MarketplaceController": { "searchApps": { type: t["./modules/marketplace/dto/marketplace.dto"].SearchAppsDto }, "getAppDetails": { type: t["./modules/marketplace/dto/marketplace.dto"].AppDetailsDto }, "getImage": {} } }], [import("./modules/apps/apps.controller"), { "AppsController": { "getInstalledApps": { type: t["./modules/apps/dto/app.dto"].MyAppsDto }, "getGuestApps": { type: t["./modules/apps/dto/app.dto"].GuestAppsDto }, "getApp": { type: t["./modules/apps/dto/app.dto"].GetAppDto } } }], [import("./modules/backups/backups.controller"), { "BackupsController": { "backupApp": {}, "restoreAppBackup": {}, "getAppBackups": { type: t["./modules/backups/dto/backups.dto"].GetAppBackupsDto }, "deleteAppBackup": {} } }], [import("./modules/app-lifecycle/app-lifecycle.controller"), { "AppLifecycleController": { "installApp": {}, "startApp": {}, "stopApp": {}, "restartApp": {}, "uninstallApp": {}, "resetApp": {}, "updateApp": {}, "updateAllApps": {}, "updateAppConfig": {} } }], [import("./modules/links/links.controller"), { "LinksController": { "getLinks": { type: t["./modules/links/dto/links.dto"].LinksDto }, "createLink": {}, "editLink": {}, "deleteLink": {} } }], [import("./modules/system/system.controller"), { "SystemController": { "systemLoad": { type: t["./modules/system/dto/system.dto"].LoadDto }, "downloadLocalCertificate": {} } }]] } };
};
26 changes: 13 additions & 13 deletions packages/backend/src/modules/app-lifecycle/app-lifecycle.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { AppsRepository } from '../apps/apps.repository';
import { AppsService } from '../apps/apps.service';
import { BackupManager } from '../backups/backup.manager';
import { MarketplaceService } from '../marketplace/marketplace.service';
import { type AppEventFormInput, AppEventsQueue, appEventSchema } from '../queue/entities/app-events';
import { type AppEventFormInput, AppEventsQueue, appEventResultSchema, appEventSchema } from '../queue/entities/app-events';
import { AppLifecycleCommandFactory } from './app-lifecycle-command.factory';
import { appFormSchema } from './dto/app-lifecycle.dto';

Expand All @@ -32,17 +32,17 @@ export class AppLifecycleService {
private readonly backupManager: BackupManager,
) {
this.logger.debug('Subscribing to app events...');
this.appEventsQueue.onEvent(({ eventId, ...data }) => this.invokeCommand(eventId, data));
this.appEventsQueue.onEvent(({ ...data }, reply) => this.invokeCommand(data, reply));
}

async invokeCommand(eventId: string, data: z.infer<typeof appEventSchema>) {
async invokeCommand(data: z.infer<typeof appEventSchema>, reply: (response: z.output<typeof appEventResultSchema>) => void) {
try {
const command = this.commandFactory.createCommand(data);
const { success, message } = await command.execute(data.appid, data.form);
await this.appEventsQueue.sendEventResponse(eventId, { success, message });
reply({ success, message });
} catch (err) {
this.logger.error(`Error invoking command: ${err}`);
await this.appEventsQueue.sendEventResponse(eventId, { success: false, message: String(err) });
reply({ success: false, message: String(err) });
}
}

Expand All @@ -57,7 +57,7 @@ export class AppLifecycleService {
await this.appRepository.updateApp(appId, { status: 'starting' });
this.socketManager.emit({ type: 'app', event: 'status_change', data: { appId, appStatus: 'starting' } });

this.appEventsQueue.publishAsync({ appid: appId, command: 'start', form: app.config }).then(async ({ success, message }) => {
this.appEventsQueue.publish({ appid: appId, command: 'start', form: app.config }).then(async ({ success, message }) => {
if (success) {
this.logger.info(`App ${appId} started successfully`);
this.socketManager.emit({ type: 'app', event: 'start_success', data: { appId, appStatus: 'running' } });
Expand Down Expand Up @@ -141,7 +141,7 @@ export class AppLifecycleService {
});

// Send install command to the queue
this.appEventsQueue.publishAsync({ appid: appId, command: 'install', form }).then(async ({ success, message }) => {
this.appEventsQueue.publish({ appid: appId, command: 'install', form }).then(async ({ success, message }) => {
if (success) {
this.logger.info(`App ${appId} installed successfully`);
await this.socketManager.emit({ type: 'app', event: 'install_success', data: { appId, appStatus: 'running' } });
Expand Down Expand Up @@ -170,7 +170,7 @@ export class AppLifecycleService {
await this.appRepository.updateApp(appId, { status: 'stopping' });

// Send stop command to the queue
this.appEventsQueue.publishAsync({ command: 'stop', appid: appId, form: app.config }).then(async ({ success, message }) => {
this.appEventsQueue.publish({ command: 'stop', appid: appId, form: app.config }).then(async ({ success, message }) => {
if (success) {
this.socketManager.emit({ type: 'app', event: 'stop_success', data: { appId, appStatus: 'stopped' } });
this.logger.info(`App ${appId} stopped successfully`);
Expand All @@ -197,7 +197,7 @@ export class AppLifecycleService {
this.socketManager.emit({ type: 'app', event: 'status_change', data: { appId, appStatus: 'restarting' } });
await this.appRepository.updateApp(appId, { status: 'restarting' });

this.appEventsQueue.publishAsync({ command: 'restart', appid: appId, form: app.config }).then(async ({ success, message }) => {
this.appEventsQueue.publish({ command: 'restart', appid: appId, form: app.config }).then(async ({ success, message }) => {
if (success) {
this.logger.info(`App ${appId} restarted successfully`);
this.socketManager.emit({ type: 'app', event: 'restart_success', data: { appId, appStatus: 'running' } });
Expand Down Expand Up @@ -229,7 +229,7 @@ export class AppLifecycleService {
await this.appRepository.updateApp(appId, { status: 'uninstalling' });
this.socketManager.emit({ type: 'app', event: 'status_change', data: { appId, appStatus: 'uninstalling' } });

this.appEventsQueue.publishAsync({ command: 'uninstall', appid: appId, form: app.config }).then(async ({ success, message }) => {
this.appEventsQueue.publish({ command: 'uninstall', appid: appId, form: app.config }).then(async ({ success, message }) => {
if (success) {
this.logger.info(`App ${appId} uninstalled successfully`);
await this.appRepository.deleteApp(appId);
Expand Down Expand Up @@ -257,7 +257,7 @@ export class AppLifecycleService {
this.socketManager.emit({ type: 'app', event: 'status_change', data: { appId, appStatus: 'resetting' } });
await this.appRepository.updateApp(appId, { status: 'resetting' });

this.appEventsQueue.publishAsync({ command: 'reset', appid: appId, form: app.config }).then(async ({ success, message }) => {
this.appEventsQueue.publish({ command: 'reset', appid: appId, form: app.config }).then(async ({ success, message }) => {
if (success) {
this.logger.info(`App ${appId} reset successfully`);
await this.socketManager.emit({ type: 'app', event: 'reset_success', data: { appId, appStatus: 'stopped' } });
Expand Down Expand Up @@ -317,7 +317,7 @@ export class AppLifecycleService {
}
}

const { success, message } = await this.appEventsQueue.publishAsync({
const { success, message } = await this.appEventsQueue.publish({
command: 'generate_env',
appid: appId,
form: parsedForm,
Expand Down Expand Up @@ -358,7 +358,7 @@ export class AppLifecycleService {
const appStatusBeforeUpdate = app.status;
this.socketManager.emit({ type: 'app', event: 'status_change', data: { appId, appStatus: 'updating' } });

this.appEventsQueue.publishAsync({ command: 'update', appid: appId, form: app.config, performBackup }).then(async ({ success, message }) => {
this.appEventsQueue.publish({ command: 'update', appid: appId, form: app.config, performBackup }).then(async ({ success, message }) => {
if (success) {
const appInfo = await this.appFilesManager.getInstalledAppInfo(appId);

Expand Down
10 changes: 5 additions & 5 deletions packages/backend/src/modules/app-stores/app-store.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,32 @@ export class AppStoreService {
private readonly config: ConfigurationService,
private readonly appStoreRepository: AppStoreRepository,
) {
this.repoQueue.onEvent(async (data) => {
this.repoQueue.onEvent(async (data, reply) => {
switch (data.command) {
case 'update_all': {
const stores = await this.appStoreRepository.getEnabledAppStores();
for (const store of stores) {
await this.repoHelpers.pullRepo(store.url, store.id.toString());
}
this.repoQueue.sendEventResponse(data.eventId, { success: true, message: 'All repos updated' });
reply({ success: true, message: 'All repos updated' });
break;
}
case 'clone_all': {
const stores = await this.appStoreRepository.getEnabledAppStores();
for (const store of stores) {
await this.repoHelpers.cloneRepo(store.url, store.id.toString());
}
this.repoQueue.sendEventResponse(data.eventId, { success: true, message: 'All repos cloned' });
reply({ success: true, message: 'All repos cloned' });
break;
}
case 'clone': {
const { success, message } = await this.repoHelpers.cloneRepo(data.url, data.id);
this.repoQueue.sendEventResponse(data.eventId, { success, message });
reply({ success, message });
break;
}
case 'update': {
const { success, message } = await this.repoHelpers.pullRepo(data.url, data.id);
this.repoQueue.sendEventResponse(data.eventId, { success, message });
reply({ success, message });
break;
}
}
Expand Down
32 changes: 15 additions & 17 deletions packages/backend/src/modules/backups/backups.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class BackupsService {
await this.appsRepository.updateApp(appId, { status: 'backing_up' });
this.socketManager.emit({ type: 'app', event: 'status_change', data: { appId, appStatus: 'starting' } });

this.appEventsQueue.publishAsync({ appid: appId, command: 'backup', form: app.config }, 1000 * 60 * 15).then(async ({ success, message }) => {
this.appEventsQueue.publish({ appid: appId, command: 'backup', form: app.config }).then(async ({ success, message }) => {
if (success) {
if (appStatusBeforeUpdate === 'running') {
await this.appLifecycle.startApp({ appId });
Expand Down Expand Up @@ -69,24 +69,22 @@ export class BackupsService {
await this.appsRepository.updateApp(appId, { status: 'restoring' });
await this.socketManager.emit({ type: 'app', event: 'status_change', data: { appId, appStatus: 'restoring' } });

this.appEventsQueue
.publishAsync({ appid: appId, command: 'restore', filename, form: app.config }, 1000 * 60 * 15)
.then(async ({ success, message }) => {
if (success) {
const restoredAppConfig = await this.appFilesManager.getInstalledAppInfo(appId);
await this.appsRepository.updateApp(appId, { version: restoredAppConfig?.tipi_version });

if (appStatusBeforeUpdate === 'running') {
await this.appLifecycle.startApp({ appId });
} else {
await this.appsRepository.updateApp(appId, { status: appStatusBeforeUpdate });
this.socketManager.emit({ type: 'app', event: 'restore_success', data: { appId, appStatus: 'stopped' } });
}
this.appEventsQueue.publish({ appid: appId, command: 'restore', filename, form: app.config }).then(async ({ success, message }) => {
if (success) {
const restoredAppConfig = await this.appFilesManager.getInstalledAppInfo(appId);
await this.appsRepository.updateApp(appId, { version: restoredAppConfig?.tipi_version });

if (appStatusBeforeUpdate === 'running') {
await this.appLifecycle.startApp({ appId });
} else {
this.logger.error(`Failed to restore app ${appId}: ${message}`);
await this.appsRepository.updateApp(appId, { status: 'stopped' });
await this.appsRepository.updateApp(appId, { status: appStatusBeforeUpdate });
this.socketManager.emit({ type: 'app', event: 'restore_success', data: { appId, appStatus: 'stopped' } });
}
});
} else {
this.logger.error(`Failed to restore app ${appId}: ${message}`);
await this.appsRepository.updateApp(appId, { status: 'stopped' });
}
});
}

public async getAppBackups(params: { appId: string; page: number; pageSize: number }) {
Expand Down
Loading

0 comments on commit 12bcd22

Please sign in to comment.