Skip to content

Commit

Permalink
feat(Muse, Radio & Music): Migrate to tcp services & keep count of us…
Browse files Browse the repository at this point in the history
…ed instances by hand
  • Loading branch information
jurienhamaker committed Jun 26, 2024
1 parent 002bfde commit eecd2a4
Show file tree
Hide file tree
Showing 39 changed files with 773 additions and 639 deletions.
17 changes: 6 additions & 11 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ EXPOSED_API_PORT=3000
EXPOSED_DASHBOARD_PORT=3001
EXPOSED_DB_PORT=5432
EXPOSED_PROM_PORT=9090
EXPOSED_RABBIT_MANAGEMENT_PORT=15672

EXPOSED_MUSE_DEBUGGING_PORT=9229
EXPOSED_RADIO_DEBUGGING_PORT=9230
Expand All @@ -21,25 +20,21 @@ DISCORD_OAUTH_CLIENT_SECRET=
DISCORD_OAUTH_SCOPES=identify,guilds
DISCORD_OAUTH_CALLBACK_URL=http://localhost:3000/auth/callback

MC_RCON_HOST=
MC_RCON_PORT=
MC_RCON_PASS=

DISCORD_TOKEN=
RADIO_DISCORD_TOKEN=
MUSIC_DISCORD_TOKEN=
MUSIC_1_DISCORD_TOKEN=

SENTRY_DNS=
RADIO_BOT_HOST=radio
MUSIC_BOT_HOSTS=music

LOKI_URL=https://<username>:<password>@<host>/loki/api/v1/push
SENTRY_DNS=

RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
LOKI_URL=https:// <username >: <password >@ <host >/loki/api/v1/push

LAVALINK_ID=Main
LAVALINK_HOST=lavalink
LAVALINK_PORT=2333
LAVALINK_PASSWORD=docker

SPOTIFY_CLIENT_ID=
SPOTIFY_CLIENT_SECRET=
SPOTIFY_CLIENT_SECRET=
2 changes: 1 addition & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"],
"rules": {
"@nx/enforce-module-boundaries": false
"@nx/enforce-module-boundaries": 0
}
},
{
Expand Down
10 changes: 10 additions & 0 deletions apps/muse/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createLogger } from '@util';
import { MuseModule } from './muse.module';

import { ShutdownSignal } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
const app = await NestFactory.create(MuseModule, {
Expand All @@ -13,6 +14,14 @@ async function bootstrap() {
app.setGlobalPrefix('api');
app.enableShutdownHooks(Object.values(ShutdownSignal));

app.connectMicroservice<MicroserviceOptions>({
transport: Transport.TCP,
options: {
host: '0.0.0.0',
port: 1337,
},
});

if (process.env.NODE_ENV !== 'production') {
const config = new DocumentBuilder()
.setTitle('Muse API')
Expand All @@ -25,6 +34,7 @@ async function bootstrap() {
SwaggerModule.setup('swagger', app, document);
}

await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { LavalinkMusicEvent } from '@music';
import { Controller, Logger } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import { PlayerState } from 'kazagumo';
import { MusicInstancesService } from '../services/instances.service';

@Controller()
export class MusicController {
private readonly _logger = new Logger(MusicController.name);

constructor(private _instances: MusicInstancesService) {}

@EventPattern('MUSIC_INSTANCE_BOOTED')
booted(
@Payload()
{ instance }: { instance: number },
) {
this._logger.log(`Music instance booted up ${instance}`);
this._instances.clearInstance(instance);
}

@EventPattern('MUSIC_STATE_CHANGED')
stateChanged(
@Payload()
{
instance,
guildId,
state,
}: {
instance: number;
guildId: string;
state: PlayerState;
},
) {
this._logger.log(`Music instance state changed ${instance} - ${state}`);
this._instances.updateState(instance, guildId, state);
}

@EventPattern('MUSIC_CONNECTED')
connected(
@Payload()
{
instance,
guildId,
voiceChannelId,
}: { instance: number } & LavalinkMusicEvent,
) {
this._logger.log(`Received music connected message for ${instance}`);
this._instances.connect(instance, guildId, voiceChannelId);
}

@EventPattern('MUSIC_DISCONNECTED')
disconnected(
@Payload()
{ instance, guildId }: { instance: number } & LavalinkMusicEvent,
) {
this._logger.log(`Received music disconnected message for ${instance}`);
this._instances.disconnect(instance, guildId);
}
}
28 changes: 13 additions & 15 deletions apps/muse/src/modules/discord/music/music.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,29 @@ import { MusicSettingsCommands } from './commands/settings.commands';
import { MusicShuffleCommands } from './commands/shuffle.command';
import { MusicStopCommands } from './commands/stop.command';
import { MusicVolumeCommands } from './commands/volume.command';
import { MusicController } from './controllers/music.controller';
import { MusicService } from './services';
import { MusicInstancesService } from './services/instances.service';
import { MusicSettingsService } from './services/settings.service';

const musicBotHosts = process.env.MUSIC_BOT_HOSTS.split(',');
@Module({
imports: [
ClientsModule.register([
{
name: 'MUSIC_SERVICE',
transport: Transport.RMQ,
ClientsModule.register(
musicBotHosts.map((host, i) => ({
name: `MUSIC_SERVICE_${i + 1}`,
transport: Transport.TCP,
options: {
urls: [
`amqp://${process.env.RABBITMQ_HOST}:${process.env.RABBITMQ_PORT}`,
],
noAck: false,
queue: 'music_queue',
queueOptions: {
durable: false,
},
host,
port: 1337,
},
},
]),
})),
),
SharedModule,
],
controllers: [],
controllers: [MusicController],
providers: [
MusicInstancesService,
MusicService,
MusicSettingsService,
MusicSettingsCommands,
Expand Down
1 change: 1 addition & 0 deletions apps/muse/src/modules/discord/music/services/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './instances.service';
export * from './music.service';
export * from './settings.service';
199 changes: 199 additions & 0 deletions apps/muse/src/modules/discord/music/services/instances.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import {
Inject,
Injectable,
Logger,
OnModuleInit,
Optional,
} from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { Cron } from '@nestjs/schedule';
import { PrismaService } from '@prisma';
import { PlayerState } from 'kazagumo';
import { firstValueFrom, take, timeout } from 'rxjs';

@Injectable()
export class MusicInstancesService implements OnModuleInit {
private readonly _logger = new Logger(MusicInstancesService.name);

private _instances: ClientProxy[] = [];

constructor(
private _prisma: PrismaService,
@Optional() @Inject('MUSIC_SERVICE_1') private _music: ClientProxy,
@Optional() @Inject('MUSIC_SERVICE_2') private _music2: ClientProxy,
@Optional() @Inject('MUSIC_SERVICE_3') private _music3: ClientProxy,
) {
this._instances.push(this._music);
this._instances.push(this._music2);
this._instances.push(this._music3);
}

onModuleInit() {
setTimeout(() => this._checkInstances(), 2000);
}

async getAvailableOrExisting(guildId: string, voiceChannelId: string) {
const exists = await this.getByVoiceId(guildId, voiceChannelId);
if (exists) {
return exists;
}

const all = await this._prisma.musicServiceMap.findMany({
where: {
guildId,
},
});

if (all.length === this._instances.length) {
return null;
}

const available: number[] = this._instances
.filter((i) => !!i)
.map((_, i) => i + 1);
for (const entry of all) {
const index = available.indexOf(entry.instance);
available.splice(index, 1);
}

if (!available.length) {
return null;
}

return available[0];
}

async getByVoiceId(guildId: string, voiceChannelId: string) {
const current = await this._prisma.musicServiceMap.findFirst({
where: {
guildId,
voiceChannelId,
},
});

if (!current) {
return null;
}

return current.instance;
}

get(instance: number) {
if (instance < 1) {
instance = 1;
}

return this._instances[instance - 1];
}

async connect(instance: number, guildId: string, voiceChannelId: string) {
await this.disconnect(instance, guildId);

const result = await this.sendCommand<{ state: number }>(
instance,
'MUSIC_STATUS',
{
guildId,
},
);

await this._prisma.musicServiceMap.create({
data: {
guildId,
voiceChannelId,
state: result.state,
instance,
},
});
}

async disconnect(instance: number, guildId: string) {
const hasOne = await this._prisma.musicServiceMap.findFirst({
where: {
guildId,
instance,
},
});

if (hasOne) {
await this._prisma.musicServiceMap.delete({
where: {
id: hasOne.id,
},
});
}
}

async clearInstance(instance: number) {
await this._prisma.musicServiceMap.deleteMany({
where: {
instance,
},
});
}

async sendCommand<T = any>(instance: number, command: string, data: any) {
const _instance = this.get(instance);

if (!_instance) {
return null;
}

const result = await (firstValueFrom(
_instance.send(command, data).pipe(take(1), timeout(5000)),
) as Promise<T & { result: string }>);

return result;
}

async updateState(instance: number, guildId: string, state: PlayerState) {
const entry = await this._prisma.musicServiceMap.findFirst({
where: {
instance,
guildId,
},
});

if (!entry) {
return null;
}

return this._prisma.musicServiceMap.update({
where: {
id: entry.id,
},
data: {
state,
},
});
}

@Cron('0 */10 * * * *')
protected async _checkInstances() {
this._logger.log('Checking alive instances');
const unfinished = await this._prisma.musicServiceMap.findMany();

for (const entry of unfinished) {
const result = await this.sendCommand<{ state: number }>(
entry.instance,
'MUSIC_STATUS',
{
guildId: entry.guildId,
},
).catch(() => null);

if (
!result ||
[
PlayerState.DISCONNECTED,
PlayerState.DISCONNECTING,
PlayerState.DESTROYING,
PlayerState.DESTROYED,
-1,
].indexOf(result.state) >= 0
) {
this.disconnect(entry.instance, entry.guildId);
}
}
}
}
Loading

0 comments on commit eecd2a4

Please sign in to comment.