Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add MultiQueue #18933

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions framework/Volo.Abp.sln
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,19 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.Imaging.MagickNet.
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.Imaging.AspNetCore.Tests", "test\Volo.Abp.Imaging.AspNetCore.Tests\Volo.Abp.Imaging.AspNetCore.Tests.csproj", "{983B0136-384B-4439-B374-31111FFAA286}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Maui.Client", "src\Volo.Abp.Maui.Client\Volo.Abp.Maui.Client.csproj", "{F19A6E0C-F719-4ED9-A024-14E4B8D40883}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.Maui.Client", "src\Volo.Abp.Maui.Client\Volo.Abp.Maui.Client.csproj", "{F19A6E0C-F719-4ED9-A024-14E4B8D40883}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Imaging.SkiaSharp", "src\Volo.Abp.Imaging.SkiaSharp\Volo.Abp.Imaging.SkiaSharp.csproj", "{198683D0-7DC6-40F2-B81B-8E446E70A9DE}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.Imaging.SkiaSharp", "src\Volo.Abp.Imaging.SkiaSharp\Volo.Abp.Imaging.SkiaSharp.csproj", "{198683D0-7DC6-40F2-B81B-8E446E70A9DE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Imaging.SkiaSharp.Tests", "test\Volo.Abp.Imaging.SkiaSharp.Tests\Volo.Abp.Imaging.SkiaSharp.Tests.csproj", "{DFAF8763-D1D6-4EB4-B459-20E31007FE2F}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.Imaging.SkiaSharp.Tests", "test\Volo.Abp.Imaging.SkiaSharp.Tests\Volo.Abp.Imaging.SkiaSharp.Tests.csproj", "{DFAF8763-D1D6-4EB4-B459-20E31007FE2F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.RemoteServices.Tests", "test\Volo.Abp.RemoteServices.Tests\Volo.Abp.RemoteServices.Tests.csproj", "{DACD4485-61BE-4DE5-ACAE-4FFABC122500}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.RemoteServices.Tests", "test\Volo.Abp.RemoteServices.Tests\Volo.Abp.RemoteServices.Tests.csproj", "{DACD4485-61BE-4DE5-ACAE-4FFABC122500}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.MultiQueue", "src\Volo.Abp.MultiQueue\Volo.Abp.MultiQueue.csproj", "{E61D87D0-2E95-4182-A2CF-34457873E7F6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.MultiQueue.Kafka", "src\Volo.Abp.MultiQueue.Kafka\Volo.Abp.MultiQueue.Kafka.csproj", "{12E20B84-E2F3-48AD-8696-A3191ED1AD7E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.MultiQueue.Tests", "test\Volo.Abp.MultiQueue.Tests\Volo.Abp.MultiQueue.Tests.csproj", "{46405A27-6C12-473D-ACA7-A7EC8D00CBAE}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -1387,6 +1393,18 @@ Global
{DACD4485-61BE-4DE5-ACAE-4FFABC122500}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DACD4485-61BE-4DE5-ACAE-4FFABC122500}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DACD4485-61BE-4DE5-ACAE-4FFABC122500}.Release|Any CPU.Build.0 = Release|Any CPU
{E61D87D0-2E95-4182-A2CF-34457873E7F6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E61D87D0-2E95-4182-A2CF-34457873E7F6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E61D87D0-2E95-4182-A2CF-34457873E7F6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E61D87D0-2E95-4182-A2CF-34457873E7F6}.Release|Any CPU.Build.0 = Release|Any CPU
{12E20B84-E2F3-48AD-8696-A3191ED1AD7E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{12E20B84-E2F3-48AD-8696-A3191ED1AD7E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{12E20B84-E2F3-48AD-8696-A3191ED1AD7E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{12E20B84-E2F3-48AD-8696-A3191ED1AD7E}.Release|Any CPU.Build.0 = Release|Any CPU
{46405A27-6C12-473D-ACA7-A7EC8D00CBAE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{46405A27-6C12-473D-ACA7-A7EC8D00CBAE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{46405A27-6C12-473D-ACA7-A7EC8D00CBAE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{46405A27-6C12-473D-ACA7-A7EC8D00CBAE}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1621,6 +1639,9 @@ Global
{198683D0-7DC6-40F2-B81B-8E446E70A9DE} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{DFAF8763-D1D6-4EB4-B459-20E31007FE2F} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{DACD4485-61BE-4DE5-ACAE-4FFABC122500} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{E61D87D0-2E95-4182-A2CF-34457873E7F6} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{12E20B84-E2F3-48AD-8696-A3191ED1AD7E} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{46405A27-6C12-473D-ACA7-A7EC8D00CBAE} = {447C8A77-E5F0-4538-8687-7383196D04EA}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {BB97ECF4-9A84-433F-A80B-2A3285BDD1D5}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="..\..\..\configureawait.props" />
<Import Project="..\..\..\common.props" />

<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0</TargetFrameworks>
<!--<Nullable>enable</Nullable>-->
<!--<WarningsAsErrors>Nullable</WarningsAsErrors>-->
<AssemblyName>Volo.Abp.MultiQueue.Kafka</AssemblyName>
<PackageId>Volo.Abp.MultiQueue.Kafka</PackageId>
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<RootNamespace />
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Volo.Abp.MultiQueue\Volo.Abp.MultiQueue.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using Volo.Abp.Modularity;
using Volo.Abp.MultiQueue.Publisher;
using Volo.Abp.MultiQueue.Subscriber;

namespace Volo.Abp.MultiQueue.Kafka;

[DependsOn(typeof(AbpMultiQueueModule))]
public class AbpMultiQueueKafkaModule : AbpMultiQueueModuleBase<KafkaQueueOptions>
{
protected override (Type ServiceType, Type ImplementationType) GetQueuePublisherType(Type optionsType)
{
var implementationType = typeof(KafkaQueuePublisher<>).MakeGenericType(optionsType);
var serviceType = typeof(IQueuePublisher<>).MakeGenericType(optionsType);
return (serviceType, implementationType);
}

protected override (Type ServiceType, Type ImplementationType) GetQueueSubscriberType(Type optionType)
{
var implementationType = typeof(KafkaQueueSubscriber<>).MakeGenericType(optionType);
var serviceType = typeof(IQueueSubscriber<>).MakeGenericType(optionType);
return (serviceType, implementationType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Confluent.Kafka;
using Volo.Abp.MultiQueue.Options;

namespace Volo.Abp.MultiQueue.Kafka;

[QueueOptionsType("Kafka")]
public class KafkaQueueOptions : QueueOptions, IQueueOptions
{
public virtual string Address { get; set; }

public virtual string GroupId { get; set; }

public virtual string UserName { get; set; }

public virtual string Password { get; set; }

public virtual int? MessageMaxBytes { get; set; }

public virtual SecurityProtocol? SecurityProtocol { get; set; }

public virtual SaslMechanism? SaslMechanism { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Volo.Abp.MultiQueue.Publisher;

namespace Volo.Abp.MultiQueue.Kafka;

public class KafkaQueuePublisher<TOptions> : IQueuePublisher<TOptions> where TOptions : KafkaQueueOptions
{
private readonly IOptions<TOptions> _options;
private readonly IProducer<Null, byte[]> _producer;

protected TOptions Options => _options.Value;
protected ILogger Logger { get; }

public KafkaQueuePublisher(ILogger<KafkaQueuePublisher<TOptions>> logger, IOptions<TOptions> options)
{
Logger = logger;
_options = options;
_producer = GetProducer();
}

public async Task PublishAsync(string topic, object data)
{
if (data == null) return;

try
{
byte[] pubData = ToByteData(data);
if (pubData == null) return;

await _producer.ProduceAsync(topic, new Message<Null, byte[]> { Value = pubData });
}
catch (Exception ex)
{
Logger.LogError(ex, "Kafka PublishAsync Error");
throw;
}
}

public async Task BatchPublishAsync(string topic, object[] data)
{
try
{
foreach (var item in data)
{
byte[] pubData = ToByteData(item);
if (pubData == null) continue;

_producer.Produce(topic, new Message<Null, byte[]> { Value = pubData });
}
_producer.Flush(TimeSpan.FromSeconds(10));
}
catch (Exception ex)
{
Logger.LogError(ex, "Kafka BatchPublishAsync Error");
throw;
}
await Task.CompletedTask;
}

protected virtual byte[] ToByteData(object data)
{
byte[] pubData = null;
if (data is byte[] byteData)
{
pubData = byteData;
}
else if (data is string strData)
{
pubData = System.Text.Encoding.UTF8.GetBytes(strData);
}
else
{
string jsonOrVal = null;
try
{
jsonOrVal = JsonConvert.SerializeObject(data);
}
catch
{
jsonOrVal = data.ToString();
}

if (jsonOrVal != null)
pubData = System.Text.Encoding.UTF8.GetBytes(jsonOrVal);
}

return pubData;
}

protected virtual ProducerConfig GetConfig()
{
var config = new ProducerConfig
{
BootstrapServers = Options.Address,
SaslUsername = Options.UserName,
SaslPassword = Options.Password,
MessageMaxBytes = Options.MessageMaxBytes,
SecurityProtocol = Options.SecurityProtocol,
SaslMechanism = Options.SaslMechanism
};
return config;
}

protected virtual IProducer<Null, byte[]> GetProducer()
{
var config = GetConfig();
return new ProducerBuilder<Null, byte[]>(config).Build();
}

public void Dispose()
{
if (_producer != null)
_producer.Dispose();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using System;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Local;
using Volo.Abp.MultiQueue.Options;
using Volo.Abp.MultiQueue.Subscriber;

namespace Volo.Abp.MultiQueue.Kafka;

public class KafkaQueueSubscriber<TQueueOptions> : QueueSubscriber<TQueueOptions> where TQueueOptions : class, IQueueOptions
{
private readonly IOptions<TQueueOptions> _options;
private readonly ILocalEventBus _localEventBus;

protected TQueueOptions Options => _options.Value;

public KafkaQueueSubscriber(IServiceProvider serviceProvider, IOptions<TQueueOptions> options, ILocalEventBus localEventBus) : base(serviceProvider)
{
_options = options;
_localEventBus = localEventBus;
}
protected virtual IConsumer<Ignore, byte[]> GetConsumer(TQueueOptions options)
{
if (options is KafkaQueueOptions kafkaQueueOptions)
{
if (string.IsNullOrWhiteSpace(kafkaQueueOptions.GroupId))
throw new ArgumentNullException($"[GroupId] can`t null or empty.");

if (string.IsNullOrWhiteSpace(kafkaQueueOptions.Address))
throw new ArgumentNullException($"[Address] can`t null or empty.");

var consumer = new ConsumerBuilder<Ignore, byte[]>(new ConsumerConfig
{
BootstrapServers = kafkaQueueOptions.Address,
GroupId = kafkaQueueOptions.GroupId,
SaslUsername = kafkaQueueOptions.UserName,
SaslPassword = kafkaQueueOptions.Password

}).Build();
return consumer;
}
throw new ArgumentException("Options must be [KafkaQueueOptions]");
}

protected override async Task StartQueueAsync(CancellationToken cancellationToken = default)
{
var consumer = GetConsumer(Options);
consumer.Subscribe(EventMap.Keys);
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationToken);
if (consumeResult == null) continue;
if (consumeResult.IsPartitionEOF) continue;

if (EventMap.TryGetValue(consumeResult.Topic, out var eventType))
{
try
{
var data = Activator.CreateInstance(eventType) as IQueueResult;
data.Time = consumeResult.Message.Timestamp.UtcDateTime.ToLocalTime();
data.Source = "Kafka";

var setDataFunc = eventType.GetMethod("SetData", BindingFlags.Instance | BindingFlags.NonPublic);
setDataFunc.Invoke(data, new object[] { consumeResult.Message.Value });

await _localEventBus.PublishAsync(eventType, data);
}
catch (Exception ex)
{
Logger.LogError(ex, "Kafka SubscribeAsync Error");
}
}
}
}
}
28 changes: 28 additions & 0 deletions framework/src/Volo.Abp.MultiQueue/Volo.Abp.MultiQueue.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="..\..\..\configureawait.props" />
<Import Project="..\..\..\common.props" />

<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0</TargetFrameworks>
<!--<Nullable>enable</Nullable>-->
<!--<WarningsAsErrors>Nullable</WarningsAsErrors>-->
<AssemblyName>Volo.Abp.MultiQueue</AssemblyName>
<PackageId>Volo.Abp.MultiQueue</PackageId>
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<RootNamespace />
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Newtonsoft.Json" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Volo.Abp.Core\Volo.Abp.Core.csproj" />
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace Volo.Abp.MultiQueue;
public static class AbpMultiQueueConst
{
public const string ConfigurationKey = "MultiQueue";
}
Loading