diff --git a/CHANGELOG.md b/CHANGELOG.md index 060bd91..d803e73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,16 @@ # Changelog -## v1.3.1 (Unreleased) +## v1.4.0 + +### New + +* Support for Azure Functions target-based scaling ([#169](https://github.com/microsoft/durabletask-mssql/pull/169)) +* Added `net6.0` TFM to Microsoft.DurableTask.SqlServer.AzureFunctions ### Updates * Fix SQL retry logic to open a new connection if a previous failure closed the connection ([#221](https://github.com/microsoft/durabletask-mssql/pull/221)) - contributed by [@microrama](https://github.com/microrama) +* Pin Microsoft.Azure.WebJobs.Extensions.DurableTask dependency to 2.13.7 instead of wildcard to avoid accidental build breaks ## v1.3.0 diff --git a/src/DurableTask.SqlServer.AzureFunctions/DurableTask.SqlServer.AzureFunctions.csproj b/src/DurableTask.SqlServer.AzureFunctions/DurableTask.SqlServer.AzureFunctions.csproj index de2acd9..347943a 100644 --- a/src/DurableTask.SqlServer.AzureFunctions/DurableTask.SqlServer.AzureFunctions.csproj +++ b/src/DurableTask.SqlServer.AzureFunctions/DurableTask.SqlServer.AzureFunctions.csproj @@ -4,7 +4,11 @@ - netstandard2.0 + netstandard2.0;net6.0 + + + + $(DefineConstants);FUNCTIONS_V4 @@ -16,7 +20,7 @@ - + diff --git a/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProvider.cs b/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProvider.cs index 688a326..e1f8de4 100644 --- a/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProvider.cs +++ b/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProvider.cs @@ -23,6 +23,9 @@ class SqlDurabilityProvider : DurabilityProvider readonly SqlOrchestrationService service; SqlScaleMonitor? scaleMonitor; +#if FUNCTIONS_V4 + SqlTargetScaler? targetScaler; +#endif public SqlDurabilityProvider( SqlOrchestrationService service, @@ -197,8 +200,33 @@ public override bool TryGetScaleMonitor( string storageConnectionString, out IScaleMonitor scaleMonitor) { - scaleMonitor = this.scaleMonitor ??= new SqlScaleMonitor(this.service, hubName); + if (this.scaleMonitor == null) + { + var sqlMetricsProvider = new SqlMetricsProvider(this.service); + this.scaleMonitor = new SqlScaleMonitor(hubName, sqlMetricsProvider); + } + + scaleMonitor = this.scaleMonitor; + return true; + } + +#if FUNCTIONS_V4 + public override bool TryGetTargetScaler( + string functionId, + string functionName, + string hubName, + string connectionName, + out ITargetScaler targetScaler) + { + if (this.targetScaler == null) + { + var sqlMetricsProvider = new SqlMetricsProvider(this.service); + this.targetScaler = new SqlTargetScaler(hubName, sqlMetricsProvider); + } + + targetScaler = this.targetScaler; return true; } +#endif } } diff --git a/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProviderFactory.cs b/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProviderFactory.cs index 6ceb1f8..0510f16 100644 --- a/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProviderFactory.cs +++ b/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProviderFactory.cs @@ -66,7 +66,7 @@ public DurabilityProvider GetDurabilityProvider(DurableClientAttribute attribute lock (this.clientProviders) { string key = GetDurabilityProviderKey(attribute); - if (this.clientProviders.TryGetValue(key, out DurabilityProvider clientProvider)) + if (this.clientProviders.TryGetValue(key, out DurabilityProvider? clientProvider)) { return clientProvider; } diff --git a/src/DurableTask.SqlServer.AzureFunctions/SqlMetricsProvider.cs b/src/DurableTask.SqlServer.AzureFunctions/SqlMetricsProvider.cs new file mode 100644 index 0000000..8c17b88 --- /dev/null +++ b/src/DurableTask.SqlServer.AzureFunctions/SqlMetricsProvider.cs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.SqlServer.AzureFunctions +{ + using System.Threading; + using System.Threading.Tasks; + + public class SqlMetricsProvider + { + readonly SqlOrchestrationService service; + + public SqlMetricsProvider(SqlOrchestrationService service) + { + this.service = service; + } + + public virtual async Task GetMetricsAsync(int? previousWorkerCount = null) + { + // GetRecommendedReplicaCountAsync will write a trace if the recommendation results + // in a worker count that is different from the worker count we pass in as an argument. + int recommendedReplicaCount = await this.service.GetRecommendedReplicaCountAsync( + previousWorkerCount, + CancellationToken.None); + + return new SqlScaleMetric { RecommendedReplicaCount = recommendedReplicaCount }; + } + } +} diff --git a/src/DurableTask.SqlServer.AzureFunctions/SqlScaleMetric.cs b/src/DurableTask.SqlServer.AzureFunctions/SqlScaleMetric.cs index 1bbd337..2203ccc 100644 --- a/src/DurableTask.SqlServer.AzureFunctions/SqlScaleMetric.cs +++ b/src/DurableTask.SqlServer.AzureFunctions/SqlScaleMetric.cs @@ -5,7 +5,7 @@ namespace DurableTask.SqlServer.AzureFunctions { using Microsoft.Azure.WebJobs.Host.Scale; - class SqlScaleMetric : ScaleMetrics + public class SqlScaleMetric : ScaleMetrics { public int RecommendedReplicaCount { get; set; } } diff --git a/src/DurableTask.SqlServer.AzureFunctions/SqlScaleMonitor.cs b/src/DurableTask.SqlServer.AzureFunctions/SqlScaleMonitor.cs index 447dc3e..66022e1 100644 --- a/src/DurableTask.SqlServer.AzureFunctions/SqlScaleMonitor.cs +++ b/src/DurableTask.SqlServer.AzureFunctions/SqlScaleMonitor.cs @@ -19,14 +19,22 @@ class SqlScaleMonitor : IScaleMonitor static readonly ScaleStatus NoScaleVote = new ScaleStatus { Vote = ScaleVote.None }; static readonly ScaleStatus ScaleOutVote = new ScaleStatus { Vote = ScaleVote.ScaleOut }; - readonly SqlOrchestrationService service; + readonly SqlMetricsProvider metricsProvider; int? previousWorkerCount = -1; - public SqlScaleMonitor(SqlOrchestrationService service, string taskHubName) + public SqlScaleMonitor(string taskHubName, SqlMetricsProvider sqlMetricsProvider) { - this.service = service ?? throw new ArgumentNullException(nameof(service)); - this.Descriptor = new ScaleMonitorDescriptor($"DurableTask-SqlServer:{taskHubName ?? "default"}"); + // Scalers in Durable Functions are shared for all functions in the same task hub. + // So instead of using a function ID, we use the task hub name as the basis for the descriptor ID. + string id = $"DurableTask-SqlServer:{taskHubName ?? "default"}"; + +#if FUNCTIONS_V4 + this.Descriptor = new ScaleMonitorDescriptor(id: id, functionId: id); +#else + this.Descriptor = new ScaleMonitorDescriptor(id); +#endif + this.metricsProvider = sqlMetricsProvider ?? throw new ArgumentNullException(nameof(sqlMetricsProvider)); } /// @@ -38,13 +46,7 @@ public SqlScaleMonitor(SqlOrchestrationService service, string taskHubName) /// public async Task GetMetricsAsync() { - // GetRecommendedReplicaCountAsync will write a trace if the recommendation results - // in a worker count that is different from the worker count we pass in as an argument. - int recommendedReplicaCount = await this.service.GetRecommendedReplicaCountAsync( - this.previousWorkerCount, - CancellationToken.None); - - return new SqlScaleMetric { RecommendedReplicaCount = recommendedReplicaCount }; + return await this.metricsProvider.GetMetricsAsync(this.previousWorkerCount); } /// diff --git a/src/DurableTask.SqlServer.AzureFunctions/SqlTargetScaler.cs b/src/DurableTask.SqlServer.AzureFunctions/SqlTargetScaler.cs new file mode 100644 index 0000000..116efae --- /dev/null +++ b/src/DurableTask.SqlServer.AzureFunctions/SqlTargetScaler.cs @@ -0,0 +1,37 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#if FUNCTIONS_V4 +namespace DurableTask.SqlServer.AzureFunctions +{ + using System; + using System.Threading.Tasks; + using Microsoft.Azure.WebJobs.Host.Scale; + + public class SqlTargetScaler : ITargetScaler + { + readonly SqlMetricsProvider sqlMetricsProvider; + + public SqlTargetScaler(string taskHubName, SqlMetricsProvider sqlMetricsProvider) + { + this.sqlMetricsProvider = sqlMetricsProvider; + + // Scalers in Durable Functions are shared for all functions in the same task hub. + // So instead of using a function ID, we use the task hub name as the basis for the descriptor ID. + string id = $"DurableTask-SqlServer:{taskHubName ?? "default"}"; + this.TargetScalerDescriptor = new TargetScalerDescriptor(id); + } + + public TargetScalerDescriptor TargetScalerDescriptor { get; } + + public async Task GetScaleResultAsync(TargetScalerContext context) + { + SqlScaleMetric sqlScaleMetric = await this.sqlMetricsProvider.GetMetricsAsync(); + return new TargetScalerResult + { + TargetWorkerCount = Math.Max(0, sqlScaleMetric.RecommendedReplicaCount), + }; + } + } +} +#endif diff --git a/src/common.props b/src/common.props index ace5c3e..6ee3a62 100644 --- a/src/common.props +++ b/src/common.props @@ -16,7 +16,7 @@ 1 - 3 + 4 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) diff --git a/test/DurableTask.SqlServer.AzureFunctions.Tests/TargetBasedScalingTests.cs b/test/DurableTask.SqlServer.AzureFunctions.Tests/TargetBasedScalingTests.cs new file mode 100644 index 0000000..9c6a793 --- /dev/null +++ b/test/DurableTask.SqlServer.AzureFunctions.Tests/TargetBasedScalingTests.cs @@ -0,0 +1,56 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.SqlServer.AzureFunctions.Tests +{ + using DurableTask.Core; + using Microsoft.Azure.WebJobs.Extensions.DurableTask; + using Microsoft.Azure.WebJobs.Host.Scale; + using Moq; + using Xunit; + + public class TargetBasedScalingTests + { + readonly Mock metricsProviderMock; + readonly Mock orchestrationServiceMock; + + public TargetBasedScalingTests() + { + this.orchestrationServiceMock = new Mock(MockBehavior.Strict); + + SqlOrchestrationService? nullServiceArg = null; // not needed for this test + this.metricsProviderMock = new Mock( + behavior: MockBehavior.Strict, + nullServiceArg); + } + + [Theory] + [InlineData(0)] + [InlineData(10)] + [InlineData(20)] + public async void TargetBasedScalingTest(int expectedTargetWorkerCount) + { + var durabilityProviderMock = new Mock( + MockBehavior.Strict, + "storageProviderName", + this.orchestrationServiceMock.Object, + new Mock().Object, + "connectionName"); + + var sqlScaleMetric = new SqlScaleMetric() + { + RecommendedReplicaCount = expectedTargetWorkerCount, + }; + + this.metricsProviderMock.Setup(m => m.GetMetricsAsync(null)).ReturnsAsync(sqlScaleMetric); + + var targetScaler = new SqlTargetScaler( + "functionId", + this.metricsProviderMock.Object); + + TargetScalerResult result = await targetScaler.GetScaleResultAsync(new TargetScalerContext()); + + Assert.Equal(expectedTargetWorkerCount, result.TargetWorkerCount); + } + } +} diff --git a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs index 3f4f024..40e1692 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs @@ -503,7 +503,7 @@ async Task ValidateDatabaseSchemaAsync(TestDatabase database, string schemaName database.ConnectionString, schemaName); Assert.Equal(1, currentSchemaVersion.Major); - Assert.Equal(3, currentSchemaVersion.Minor); + Assert.Equal(4, currentSchemaVersion.Minor); Assert.Equal(0, currentSchemaVersion.Patch); }