Skip to content

Commit

Permalink
MqttClientService.cs added in order to wrap the connection to the mqt…
Browse files Browse the repository at this point in the history
…t client and keep reference to it
  • Loading branch information
juuwel committed Jun 5, 2024
1 parent 6f9fa80 commit 1d7414c
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Shared/Dtos/CreateConditionsLogDto.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ public class CreateConditionsLogDto
public double Light { get; set; }
public double Temperature { get; set; }
public double Humidity { get; set; }
public required string DeviceId { get; set; }
public required long DeviceId { get; set; }
}
10 changes: 8 additions & 2 deletions api/Core/Services/ConditionsLogsService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,20 @@ public async Task CreateConditionsLogAsync(CreateConditionsLogDto createConditio
Plants = allPlants
});

if (newMood != recentMood)
var moodDto = new MoodDto
{
Mood = newMood
};
await mqttPublisherService.PublishAsync(moodDto, createConditionsLogDto.DeviceId.ToString());

/*if (newMood != recentMood)
{
var moodDto = new MoodDto
{
Mood = newMood
};
await mqttPublisherService.PublishAsync(moodDto, createConditionsLogDto.DeviceId);
}
}*/
}

// TODO: make this more sensitive
Expand Down
34 changes: 34 additions & 0 deletions api/Core/Services/MqttClientService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using api.Core.Options;
using Microsoft.Extensions.Options;
using MQTTnet;
using MQTTnet.Client;

namespace api.Core.Services;

public class MqttClientService
{
private readonly IMqttClient _mqttClient;
private readonly IOptions<MqttOptions> _options;

public MqttClientService(IOptions<MqttOptions> options)
{
_options = options;
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
}

public async Task ConnectAsync()
{
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(_options.Value.Server, _options.Value.Port)
.WithCredentials(_options.Value.Username)
.Build();

await _mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
}

public IMqttClient GetClient()
{
return _mqttClient;
}
}
18 changes: 5 additions & 13 deletions api/Core/Services/MqttPublisherService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,28 @@ public class MqttPublisherService
{
private readonly IOptions<MqttOptions> _options;
private readonly JsonSerializerOptions _jsonSerializerOptions;
private readonly IMqttClient _mqttClient;


public MqttPublisherService(IOptions<MqttOptions> options)
public MqttPublisherService(IOptions<MqttOptions> options, MqttClientService mqttClientService)
{
_options = options;
_jsonSerializerOptions = new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };

_mqttClient = mqttClientService.GetClient();

if (string.IsNullOrEmpty(_options.Value.Username) || _options.Value.Username == "FILL_ME_IN")
throw new Exception("MQTT username not set in appsettings.json");
}

public async Task PublishAsync(MoodDto mood, string deviceId)
{
var mqttFactory = new MqttFactory();

using var mqttClient = mqttFactory.CreateMqttClient();
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(_options.Value.Server, _options.Value.Port)
.WithCredentials(_options.Value.Username)
.Build();

await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

var mqttPublishOptions = new MqttApplicationMessageBuilder()
.WithTopic($"{_options.Value.PublishTopic}/{deviceId}")
.WithPayload(JsonSerializer.Serialize(mood, options: _jsonSerializerOptions))
.WithRetainFlag()
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
.Build();

await mqttClient.PublishAsync(mqttPublishOptions, CancellationToken.None);
await _mqttClient.PublishAsync(mqttPublishOptions, CancellationToken.None);
}
}
24 changes: 7 additions & 17 deletions api/Core/Services/MqttSubscriberService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,24 @@ public class MqttSubscriberService
private readonly IOptions<MqttOptions> _options;
private readonly ConditionsLogsService _conditionsLogService;
private readonly JsonSerializerOptions _jsonSerializerOptions;
private readonly IMqttClient _mqttClient;

public MqttSubscriberService(IOptions<MqttOptions> options, ConditionsLogsService conditionsLogService)
public MqttSubscriberService(IOptions<MqttOptions> options, ConditionsLogsService conditionsLogService, MqttClientService mqttClientService)
{
_options = options;
_conditionsLogService = conditionsLogService;
_jsonSerializerOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
_mqttClient = mqttClientService.GetClient();

if (string.IsNullOrEmpty(_options.Value.Username) || _options.Value.Username == "FILL_ME_IN")
throw new Exception("MQTT username not set in appsettings.json");

_ = SubscribeAsync();
}

public async Task SubscribeAsync()
{
var mqttFactory = new MqttFactory();

using var mqttClient = mqttFactory.CreateMqttClient();
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(_options.Value.Server, _options.Value.Port)
.WithCredentials(_options.Value.Username)
.Build();

mqttClient.ApplicationMessageReceivedAsync += async e =>
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
Console.WriteLine("Received message");
var payload = Encoding.UTF8.GetString((ReadOnlySpan<byte>)e.ApplicationMessage.PayloadSegment);
var conditions = JsonSerializer.Deserialize<CreateConditionsLogDto>(payload, options:
_jsonSerializerOptions);
Expand All @@ -48,13 +41,10 @@ public async Task SubscribeAsync()
}
};

await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

var mqttFactory = new MqttFactory();
var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder().WithTopicFilter(
f => { f.WithTopic(_options.Value.SubscribeTopic); }).Build();

await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);

Console.ReadLine();
await _mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
}
}
6 changes: 3 additions & 3 deletions api/DbInitializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private async Task CreateHistoricalData()
await conditionsLogService.CreateConditionsLogAsync(
new CreateConditionsLogDto
{
DeviceId = _plants["Aloe Vera"].DeviceId!,
DeviceId = long.Parse(_plants["Aloe Vera"].DeviceId!),
SoilMoisturePercentage = GetValueNearOrInIdealRange(aloeVeraRequirements.SoilMoistureLevel),
Light = GetValueNearOrInIdealRange(aloeVeraRequirements.LightLevel),
Temperature = GetRandomTemperature(),
Expand All @@ -172,7 +172,7 @@ await conditionsLogService.CreateConditionsLogAsync(
await conditionsLogService.CreateConditionsLogAsync(
new CreateConditionsLogDto
{
DeviceId = _plants["Prickly Pear"].DeviceId!,
DeviceId = long.Parse(_plants["Prickly Pear"].DeviceId!),
SoilMoisturePercentage = GetValueNearOrInIdealRange(pricklyPearRequirements.SoilMoistureLevel),
Light = GetValueNearOrInIdealRange(pricklyPearRequirements.LightLevel),
Temperature = GetRandomTemperature(),
Expand All @@ -197,7 +197,7 @@ await conditionsLogService.CreateConditionsLogAsync(

new CreateConditionsLogDto
{
DeviceId = dyingPlant.DeviceId!,
DeviceId = long.Parse(dyingPlant.DeviceId!),
SoilMoisturePercentage = GetValueOutsideOfIdealRange(dyingPlant.Requirements!.SoilMoistureLevel),
Light = GetValueOutsideOfIdealRange(dyingPlant.Requirements!.LightLevel),
Temperature = dyingPlant.Requirements!.TemperatureLevel - 10,
Expand Down
8 changes: 6 additions & 2 deletions api/Extensions/AddServicesAndRepositoriesExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using api.Core.Services.External.BlobStorage;
using api.Events.Auth.Client;
using Infrastructure.Repositories;
using MQTTnet.Client;

namespace api.Extensions;

Expand All @@ -17,6 +18,11 @@ public static void AddServicesAndRepositories(this IServiceCollection services)
services.AddSingleton<ConditionsLogsRepository>();
services.AddSingleton<CollectionsRepository>();

// Mqtt
services.AddSingleton<MqttClientService>();
services.AddSingleton<MqttSubscriberService>();
services.AddSingleton<MqttPublisherService>();

// Services
services.AddSingleton<WebSocketConnectionService>();
services.AddSingleton<JwtService>();
Expand All @@ -25,8 +31,6 @@ public static void AddServicesAndRepositories(this IServiceCollection services)
services.AddSingleton<CollectionsService>();
services.AddSingleton<ConditionsLogsService>();
services.AddSingleton<RequirementService>();
services.AddSingleton<MqttSubscriberService>();
services.AddSingleton<MqttPublisherService>();
services.AddSingleton<StatsService>();

// Helpers
Expand Down
1 change: 1 addition & 0 deletions api/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public static async Task<WebApplication> StartApi(string[] args)
var services = builder.FindAndInjectClientEventHandlers(Assembly.GetExecutingAssembly());

var app = builder.Build();
await app.Services.GetRequiredService<MqttClientService>().ConnectAsync();
await app.Services.GetRequiredService<MqttSubscriberService>().SubscribeAsync();

// be careful with using --db-init on production, it will delete all data
Expand Down

0 comments on commit 1d7414c

Please sign in to comment.