From ba5855e5997998ea4ce7ae8ab58c2c44eba8d85d Mon Sep 17 00:00:00 2001 From: "Eli C. Lowry" <83078660+Enkidu93@users.noreply.github.com> Date: Fri, 13 Oct 2023 10:32:48 -0400 Subject: [PATCH] Queue depth endpoint and status message (#177) * Support basic queue depth endpoint * Working queue endpoint + position in queue added to build status message * Move queueDepth to build property --- docker-compose.yml | 2 +- .../TranslationEngineServiceV1.cs | 5 + src/Serval.Client/Client.g.cs | 120 ++++++++++++++++++ .../Protos/serval/translation/v1/engine.proto | 9 ++ .../serval/translation/v1/platform.proto | 1 + src/Serval.Translation/Contracts/QueueDto.cs | 7 + .../Contracts/TranslationBuildDto.cs | 2 + .../TranslationEnginesController.cs | 33 +++++ src/Serval.Translation/Models/Build.cs | 1 + src/Serval.Translation/Models/Queue.cs | 7 + .../Services/EngineService.cs | 10 ++ .../Services/IEngineService.cs | 2 + .../Services/TranslationPlatformServiceV1.cs | 2 + tests/Serval.E2ETests/ServalApiTests.cs | 58 +++++++++ 14 files changed, 258 insertions(+), 1 deletion(-) create mode 100644 src/Serval.Translation/Contracts/QueueDto.cs create mode 100644 src/Serval.Translation/Models/Queue.cs diff --git a/docker-compose.yml b/docker-compose.yml index fd9a50fa..61d8a9a9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -136,7 +136,7 @@ services: - ASPNETCORE_TranslationEngines__0=SmtTransfer - ASPNETCORE_TranslationEngines__1=Nmt - ClearML__ApiServer=https://api.sil.hosted.allegro.ai - - ClearML__Queue=lambert_24gb + - ClearML__Queue=production - ClearML__DockerImage=ghcr.io/sillsdev/machine.py:0.9.5.1 - "ClearML__AccessKey=${ClearML_AccessKey:?access key needed}" - "ClearML__SecretKey=${ClearML_SecretKey:?secret key needed}" diff --git a/samples/EchoTranslationEngine/TranslationEngineServiceV1.cs b/samples/EchoTranslationEngine/TranslationEngineServiceV1.cs index e0ae2e09..756f86e0 100644 --- a/samples/EchoTranslationEngine/TranslationEngineServiceV1.cs +++ b/samples/EchoTranslationEngine/TranslationEngineServiceV1.cs @@ -293,4 +293,9 @@ public override Task GetWordGraph(GetWordGraphRequest requ } ); } + + public override Task GetQueueSize(GetQueueSizeRequest request, ServerCallContext context) + { + return Task.FromResult(new GetQueueSizeResponse { Size = 0 }); + } } diff --git a/src/Serval.Client/Client.g.cs b/src/Serval.Client/Client.g.cs index f48a41cc..239a678c 100644 --- a/src/Serval.Client/Client.g.cs +++ b/src/Serval.Client/Client.g.cs @@ -846,6 +846,15 @@ public partial interface ITranslationEnginesClient /// A server side error occurred. System.Threading.Tasks.Task DeleteAsync(string id, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); + /// A cancellation token that can be used by other objects or threads to receive notice of cancellation. + /// + /// Get queue information for a given engine type + /// + /// A valid engine type: SmtTransfer, Nmt, or Echo + /// Queue information for the specified engine type + /// A server side error occurred. + System.Threading.Tasks.Task GetQueueAsync(string engineType, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); + /// A cancellation token that can be used by other objects or threads to receive notice of cancellation. /// /// Translate a segment of text @@ -1511,6 +1520,102 @@ public string BaseUrl } } + /// A cancellation token that can be used by other objects or threads to receive notice of cancellation. + /// + /// Get queue information for a given engine type + /// + /// A valid engine type: SmtTransfer, Nmt, or Echo + /// Queue information for the specified engine type + /// A server side error occurred. + public virtual async System.Threading.Tasks.Task GetQueueAsync(string engineType, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) + { + if (engineType == null) + throw new System.ArgumentNullException("engineType"); + + var urlBuilder_ = new System.Text.StringBuilder(); + urlBuilder_.Append(BaseUrl != null ? BaseUrl.TrimEnd('/') : "").Append("/translation/engines/queues"); + + var client_ = _httpClient; + var disposeClient_ = false; + try + { + using (var request_ = new System.Net.Http.HttpRequestMessage()) + { + var json_ = Newtonsoft.Json.JsonConvert.SerializeObject(engineType, _settings.Value); + var content_ = new System.Net.Http.StringContent(json_); + content_.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse("application/json"); + request_.Content = content_; + request_.Method = new System.Net.Http.HttpMethod("GET"); + request_.Headers.Accept.Add(System.Net.Http.Headers.MediaTypeWithQualityHeaderValue.Parse("application/json")); + + PrepareRequest(client_, request_, urlBuilder_); + + var url_ = urlBuilder_.ToString(); + request_.RequestUri = new System.Uri(url_, System.UriKind.RelativeOrAbsolute); + + PrepareRequest(client_, request_, url_); + + var response_ = await client_.SendAsync(request_, System.Net.Http.HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); + var disposeResponse_ = true; + try + { + var headers_ = System.Linq.Enumerable.ToDictionary(response_.Headers, h_ => h_.Key, h_ => h_.Value); + if (response_.Content != null && response_.Content.Headers != null) + { + foreach (var item_ in response_.Content.Headers) + headers_[item_.Key] = item_.Value; + } + + ProcessResponse(client_, response_); + + var status_ = (int)response_.StatusCode; + if (status_ == 200) + { + var objectResponse_ = await ReadObjectResponseAsync(response_, headers_, cancellationToken).ConfigureAwait(false); + if (objectResponse_.Object == null) + { + throw new ServalApiException("Response was null which was not expected.", status_, objectResponse_.Text, headers_, null); + } + return objectResponse_.Object; + } + else + if (status_ == 401) + { + string responseText_ = ( response_.Content == null ) ? string.Empty : await response_.Content.ReadAsStringAsync().ConfigureAwait(false); + throw new ServalApiException("The client is not authenticated", status_, responseText_, headers_, null); + } + else + if (status_ == 403) + { + string responseText_ = ( response_.Content == null ) ? string.Empty : await response_.Content.ReadAsStringAsync().ConfigureAwait(false); + throw new ServalApiException("The authenticated client cannot perform the operation", status_, responseText_, headers_, null); + } + else + if (status_ == 503) + { + string responseText_ = ( response_.Content == null ) ? string.Empty : await response_.Content.ReadAsStringAsync().ConfigureAwait(false); + throw new ServalApiException("A necessary service is currently unavailable. Check `/health` for more details. ", status_, responseText_, headers_, null); + } + else + { + var responseData_ = response_.Content == null ? null : await response_.Content.ReadAsStringAsync().ConfigureAwait(false); + throw new ServalApiException("The HTTP status code of the response was not expected (" + status_ + ").", status_, responseData_, headers_, null); + } + } + finally + { + if (disposeResponse_) + response_.Dispose(); + } + } + } + finally + { + if (disposeClient_) + client_.Dispose(); + } + } + /// A cancellation token that can be used by other objects or threads to receive notice of cancellation. /// /// Translate a segment of text @@ -4017,6 +4122,18 @@ public partial class TranslationEngineConfig } + [System.CodeDom.Compiler.GeneratedCode("NJsonSchema", "13.18.2.0 (NJsonSchema v10.8.0.0 (Newtonsoft.Json v13.0.0.0))")] + public partial class Queue + { + [Newtonsoft.Json.JsonProperty("size", Required = Newtonsoft.Json.Required.Always)] + public int Size { get; set; } = default!; + + [Newtonsoft.Json.JsonProperty("engineType", Required = Newtonsoft.Json.Required.Always)] + [System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)] + public string EngineType { get; set; } = default!; + + } + [System.CodeDom.Compiler.GeneratedCode("NJsonSchema", "13.18.2.0 (NJsonSchema v10.8.0.0 (Newtonsoft.Json v13.0.0.0))")] public partial class TranslationResult { @@ -4323,6 +4440,9 @@ public partial class TranslationBuild [Newtonsoft.Json.JsonProperty("message", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)] public string? Message { get; set; } = default!; + [Newtonsoft.Json.JsonProperty("queueDepth", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)] + public int? QueueDepth { get; set; } = default!; + /// /// The current build job state. /// diff --git a/src/Serval.Grpc/Protos/serval/translation/v1/engine.proto b/src/Serval.Grpc/Protos/serval/translation/v1/engine.proto index 08f258fb..f98478ca 100644 --- a/src/Serval.Grpc/Protos/serval/translation/v1/engine.proto +++ b/src/Serval.Grpc/Protos/serval/translation/v1/engine.proto @@ -12,6 +12,7 @@ service TranslationEngineApi { rpc TrainSegmentPair(TrainSegmentPairRequest) returns (google.protobuf.Empty); rpc StartBuild(StartBuildRequest) returns (google.protobuf.Empty); rpc CancelBuild(CancelBuildRequest) returns (google.protobuf.Empty); + rpc GetQueueSize(GetQueueSizeRequest) returns (GetQueueSizeResponse); } message CreateRequest { @@ -69,6 +70,14 @@ message CancelBuildRequest { string engine_id = 2; } +message GetQueueSizeRequest { + string engine_type = 1; +} + +message GetQueueSizeResponse { + int32 size = 1; +} + message AlignedWordPair { int32 source_index = 1; int32 target_index = 2; diff --git a/src/Serval.Grpc/Protos/serval/translation/v1/platform.proto b/src/Serval.Grpc/Protos/serval/translation/v1/platform.proto index a56680ae..0c5773e3 100644 --- a/src/Serval.Grpc/Protos/serval/translation/v1/platform.proto +++ b/src/Serval.Grpc/Protos/serval/translation/v1/platform.proto @@ -21,6 +21,7 @@ message UpdateBuildStatusRequest { int32 step = 2; optional double percent_completed = 3; optional string message = 4; + optional int32 queue_depth = 5; } message BuildStartedRequest { diff --git a/src/Serval.Translation/Contracts/QueueDto.cs b/src/Serval.Translation/Contracts/QueueDto.cs new file mode 100644 index 00000000..0e19d34a --- /dev/null +++ b/src/Serval.Translation/Contracts/QueueDto.cs @@ -0,0 +1,7 @@ +namespace Serval.Translation.Contracts; + +public class QueueDto +{ + public int Size { get; set; } = default; + public string EngineType { get; set; } = default!; +} diff --git a/src/Serval.Translation/Contracts/TranslationBuildDto.cs b/src/Serval.Translation/Contracts/TranslationBuildDto.cs index 3e914b44..65fa4558 100644 --- a/src/Serval.Translation/Contracts/TranslationBuildDto.cs +++ b/src/Serval.Translation/Contracts/TranslationBuildDto.cs @@ -12,6 +12,8 @@ public class TranslationBuildDto public double? PercentCompleted { get; set; } public string? Message { get; set; } + public int? QueueDepth { get; set; } + /// /// The current build job state. /// diff --git a/src/Serval.Translation/Controllers/TranslationEnginesController.cs b/src/Serval.Translation/Controllers/TranslationEnginesController.cs index 953063bd..689cfd58 100644 --- a/src/Serval.Translation/Controllers/TranslationEnginesController.cs +++ b/src/Serval.Translation/Controllers/TranslationEnginesController.cs @@ -182,6 +182,36 @@ public async Task DeleteAsync([NotNull] string id, CancellationTok return Ok(); } + /// + /// Get queue information for a given engine type + /// + /// A valid engine type: SmtTransfer, Nmt, or Echo + /// + /// Queue information for the specified engine type + /// The client is not authenticated + /// The authenticated client cannot perform the operation + /// A necessary service is currently unavailable. Check `/health` for more details. + [Authorize(Scopes.ReadTranslationEngines)] + [HttpGet("queues")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(typeof(void), StatusCodes.Status401Unauthorized)] + [ProducesResponseType(typeof(void), StatusCodes.Status403Forbidden)] + [ProducesResponseType(typeof(void), StatusCodes.Status503ServiceUnavailable)] + public async Task> GetQueueAsync( + [FromBody] string engineType, + CancellationToken cancellationToken + ) + { + try + { + return Map(await _engineService.GetQueueAsync(engineType, cancellationToken)); + } + catch (InvalidOperationException ioe) + { + return BadRequest(ioe.Message); + } + } + /// /// Translate a segment of text /// @@ -1000,6 +1030,8 @@ private static Build Map(Engine engine, TranslationBuildConfigDto source) return build; } + private QueueDto Map(Queue source) => new() { Size = source.Size, EngineType = source.EngineType }; + private TranslationEngineDto Map(Engine source) { return new TranslationEngineDto @@ -1034,6 +1066,7 @@ private TranslationBuildDto Map(Build source) Step = source.Step, PercentCompleted = source.PercentCompleted, Message = source.Message, + QueueDepth = source.QueueDepth, State = source.State, DateFinished = source.DateFinished, Options = JsonSerializer.Serialize(source.Options) diff --git a/src/Serval.Translation/Models/Build.cs b/src/Serval.Translation/Models/Build.cs index d934a35c..fbfe2b3f 100644 --- a/src/Serval.Translation/Models/Build.cs +++ b/src/Serval.Translation/Models/Build.cs @@ -10,6 +10,7 @@ public class Build : IEntity public int Step { get; set; } public double? PercentCompleted { get; set; } public string? Message { get; set; } + public int? QueueDepth { get; set; } public JobState State { get; set; } = JobState.Pending; public DateTime? DateFinished { get; set; } public IDictionary? Options { get; set; } diff --git a/src/Serval.Translation/Models/Queue.cs b/src/Serval.Translation/Models/Queue.cs new file mode 100644 index 00000000..74b62688 --- /dev/null +++ b/src/Serval.Translation/Models/Queue.cs @@ -0,0 +1,7 @@ +namespace Serval.Translation.Models; + +public class Queue +{ + public int Size { get; set; } = default; + public string EngineType { get; set; } = default!; +} diff --git a/src/Serval.Translation/Services/EngineService.cs b/src/Serval.Translation/Services/EngineService.cs index 408cb9f3..be39dbed 100644 --- a/src/Serval.Translation/Services/EngineService.cs +++ b/src/Serval.Translation/Services/EngineService.cs @@ -298,6 +298,16 @@ public Task DeleteAllCorpusFilesAsync(string dataFileId, CancellationToken cance ); } + public async Task GetQueueAsync(string engineType, CancellationToken cancellationToken = default) + { + var client = _grpcClientFactory.CreateClient(engineType); + GetQueueSizeResponse response = await client.GetQueueSizeAsync( + new GetQueueSizeRequest { EngineType = engineType }, + cancellationToken: cancellationToken + ); + return new Queue { Size = response.Size, EngineType = engineType }; + } + private Models.TranslationResult Map(V1.TranslationResult source) { return new Models.TranslationResult diff --git a/src/Serval.Translation/Services/IEngineService.cs b/src/Serval.Translation/Services/IEngineService.cs index 0046bb24..a3397eef 100644 --- a/src/Serval.Translation/Services/IEngineService.cs +++ b/src/Serval.Translation/Services/IEngineService.cs @@ -46,4 +46,6 @@ Task TrainSegmentPairAsync( Task DeleteCorpusAsync(string engineId, string corpusId, CancellationToken cancellationToken = default); Task DeleteAllCorpusFilesAsync(string dataFileId, CancellationToken cancellationToken = default); + + Task GetQueueAsync(string engineType, CancellationToken cancellationToken = default); } diff --git a/src/Serval.Translation/Services/TranslationPlatformServiceV1.cs b/src/Serval.Translation/Services/TranslationPlatformServiceV1.cs index 665ac0fd..2b6a4edd 100644 --- a/src/Serval.Translation/Services/TranslationPlatformServiceV1.cs +++ b/src/Serval.Translation/Services/TranslationPlatformServiceV1.cs @@ -246,6 +246,8 @@ await _builds.UpdateAsync( } if (request.HasMessage) u.Set(b => b.Message, request.Message); + if (request.HasQueueDepth) + u.Set(b => b.QueueDepth, request.QueueDepth); }, cancellationToken: context.CancellationToken ); diff --git a/tests/Serval.E2ETests/ServalApiTests.cs b/tests/Serval.E2ETests/ServalApiTests.cs index 1ba907d5..06b1f1c1 100644 --- a/tests/Serval.E2ETests/ServalApiTests.cs +++ b/tests/Serval.E2ETests/ServalApiTests.cs @@ -133,6 +133,64 @@ public async Task NmtBatch() Assert.IsTrue(lTrans[0].Translation.Contains("dearly beloved Gaius")); } + [Test] + public async Task NmtQueueMultiple() + { + await _helperClient!.ClearEngines(); + const int NUM_ENGINES = 9; + const int NUM_WORKERS = 6; + string[] engineIds = new string[NUM_ENGINES]; + for (int i = 0; i < NUM_ENGINES; i++) + { + _helperClient.TranslationBuildConfig = new() + { + Pretranslate = new List(), + Options = "{\"max_steps\":10}" + }; + engineIds[i] = await _helperClient.CreateNewEngine("Nmt", "es", "en", $"NMT1_{i}"); + string engineId = engineIds[i]; + var books = new string[] { "MAT.txt", "1JN.txt", "2JN.txt" }; + await _helperClient.AddTextCorpusToEngine(engineId, books, "es", "en", false); + await _helperClient.AddTextCorpusToEngine(engineId, new string[] { "3JN.txt" }, "es", "en", true); + await _helperClient.StartBuildAsync(engineId); + //Ensure that tasks are enqueued roughly in order + await Task.Delay(500); + } + //Wait for at least some tasks to be queued + await Task.Delay(20_000); + string builds = ""; + for (int i = 0; i < NUM_ENGINES; i++) + { + TranslationBuild build = await _helperClient.translationEnginesClient.GetCurrentBuildAsync(engineIds[i]); + builds += $"{JsonSerializer.Serialize(build)}\n"; + } + + builds += "Depth = " + (await _helperClient.translationEnginesClient.GetQueueAsync("Nmt")).Size.ToString(); + + //Status message of last started build says that there is at least one job ahead of it in the queue + // (this variable due to how many jobs may already exist in the production queue from other Serval instances) + TranslationBuild newestEngineCurrentBuild = await _helperClient.translationEnginesClient.GetCurrentBuildAsync( + engineIds[NUM_ENGINES - 1] + ); + Assert.NotNull(newestEngineCurrentBuild.QueueDepth, JsonSerializer.Serialize(newestEngineCurrentBuild)); + Assert.Multiple(async () => + { + Assert.That(newestEngineCurrentBuild.QueueDepth, Is.GreaterThan(0), message: builds); + Assert.That( + (await _helperClient.translationEnginesClient.GetQueueAsync("Nmt")).Size, + Is.GreaterThanOrEqualTo(NUM_ENGINES - NUM_WORKERS) + ); + }); + for (int i = 0; i < NUM_ENGINES; i++) + { + try + { + await _helperClient.translationEnginesClient.CancelBuildAsync(engineIds[i]); + } + catch { } + } + } + [Test] public async Task NmtLargeBatch() {