Skip to content

Commit

Permalink
Queue depth endpoint and status message (#177)
Browse files Browse the repository at this point in the history
* Support basic queue depth endpoint

* Working queue endpoint + position in queue added to build status message

* Move queueDepth to build property
  • Loading branch information
Enkidu93 authored Oct 13, 2023
1 parent 5e0f893 commit ba5855e
Show file tree
Hide file tree
Showing 14 changed files with 258 additions and 1 deletion.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ services:
- ASPNETCORE_TranslationEngines__0=SmtTransfer
- ASPNETCORE_TranslationEngines__1=Nmt
- ClearML__ApiServer=https://api.sil.hosted.allegro.ai
- ClearML__Queue=lambert_24gb
- ClearML__Queue=production
- ClearML__DockerImage=ghcr.io/sillsdev/machine.py:0.9.5.1
- "ClearML__AccessKey=${ClearML_AccessKey:?access key needed}"
- "ClearML__SecretKey=${ClearML_SecretKey:?secret key needed}"
Expand Down
5 changes: 5 additions & 0 deletions samples/EchoTranslationEngine/TranslationEngineServiceV1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,9 @@ public override Task<GetWordGraphResponse> GetWordGraph(GetWordGraphRequest requ
}
);
}

public override Task<GetQueueSizeResponse> GetQueueSize(GetQueueSizeRequest request, ServerCallContext context)
{
return Task.FromResult(new GetQueueSizeResponse { Size = 0 });
}
}
120 changes: 120 additions & 0 deletions src/Serval.Client/Client.g.cs
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,15 @@ public partial interface ITranslationEnginesClient
/// <exception cref="ServalApiException">A server side error occurred.</exception>
System.Threading.Tasks.Task DeleteAsync(string id, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));

/// <param name="cancellationToken">A cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <summary>
/// Get queue information for a given engine type
/// </summary>
/// <param name="engineType">A valid engine type: SmtTransfer, Nmt, or Echo</param>
/// <returns>Queue information for the specified engine type</returns>
/// <exception cref="ServalApiException">A server side error occurred.</exception>
System.Threading.Tasks.Task<Queue> GetQueueAsync(string engineType, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));

/// <param name="cancellationToken">A cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <summary>
/// Translate a segment of text
Expand Down Expand Up @@ -1511,6 +1520,102 @@ public string BaseUrl
}
}

/// <param name="cancellationToken">A cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <summary>
/// Get queue information for a given engine type
/// </summary>
/// <param name="engineType">A valid engine type: SmtTransfer, Nmt, or Echo</param>
/// <returns>Queue information for the specified engine type</returns>
/// <exception cref="ServalApiException">A server side error occurred.</exception>
public virtual async System.Threading.Tasks.Task<Queue> GetQueueAsync(string engineType, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken))
{
if (engineType == null)
throw new System.ArgumentNullException("engineType");

var urlBuilder_ = new System.Text.StringBuilder();
urlBuilder_.Append(BaseUrl != null ? BaseUrl.TrimEnd('/') : "").Append("/translation/engines/queues");

var client_ = _httpClient;
var disposeClient_ = false;
try
{
using (var request_ = new System.Net.Http.HttpRequestMessage())
{
var json_ = Newtonsoft.Json.JsonConvert.SerializeObject(engineType, _settings.Value);
var content_ = new System.Net.Http.StringContent(json_);
content_.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse("application/json");
request_.Content = content_;
request_.Method = new System.Net.Http.HttpMethod("GET");
request_.Headers.Accept.Add(System.Net.Http.Headers.MediaTypeWithQualityHeaderValue.Parse("application/json"));

PrepareRequest(client_, request_, urlBuilder_);

var url_ = urlBuilder_.ToString();
request_.RequestUri = new System.Uri(url_, System.UriKind.RelativeOrAbsolute);

PrepareRequest(client_, request_, url_);

var response_ = await client_.SendAsync(request_, System.Net.Http.HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
var disposeResponse_ = true;
try
{
var headers_ = System.Linq.Enumerable.ToDictionary(response_.Headers, h_ => h_.Key, h_ => h_.Value);
if (response_.Content != null && response_.Content.Headers != null)
{
foreach (var item_ in response_.Content.Headers)
headers_[item_.Key] = item_.Value;
}

ProcessResponse(client_, response_);

var status_ = (int)response_.StatusCode;
if (status_ == 200)
{
var objectResponse_ = await ReadObjectResponseAsync<Queue>(response_, headers_, cancellationToken).ConfigureAwait(false);
if (objectResponse_.Object == null)
{
throw new ServalApiException("Response was null which was not expected.", status_, objectResponse_.Text, headers_, null);
}
return objectResponse_.Object;
}
else
if (status_ == 401)
{
string responseText_ = ( response_.Content == null ) ? string.Empty : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
throw new ServalApiException("The client is not authenticated", status_, responseText_, headers_, null);
}
else
if (status_ == 403)
{
string responseText_ = ( response_.Content == null ) ? string.Empty : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
throw new ServalApiException("The authenticated client cannot perform the operation", status_, responseText_, headers_, null);
}
else
if (status_ == 503)
{
string responseText_ = ( response_.Content == null ) ? string.Empty : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
throw new ServalApiException("A necessary service is currently unavailable. Check `/health` for more details. ", status_, responseText_, headers_, null);
}
else
{
var responseData_ = response_.Content == null ? null : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
throw new ServalApiException("The HTTP status code of the response was not expected (" + status_ + ").", status_, responseData_, headers_, null);
}
}
finally
{
if (disposeResponse_)
response_.Dispose();
}
}
}
finally
{
if (disposeClient_)
client_.Dispose();
}
}

/// <param name="cancellationToken">A cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <summary>
/// Translate a segment of text
Expand Down Expand Up @@ -4017,6 +4122,18 @@ public partial class TranslationEngineConfig

}

[System.CodeDom.Compiler.GeneratedCode("NJsonSchema", "13.18.2.0 (NJsonSchema v10.8.0.0 (Newtonsoft.Json v13.0.0.0))")]
public partial class Queue
{
[Newtonsoft.Json.JsonProperty("size", Required = Newtonsoft.Json.Required.Always)]
public int Size { get; set; } = default!;

[Newtonsoft.Json.JsonProperty("engineType", Required = Newtonsoft.Json.Required.Always)]
[System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)]
public string EngineType { get; set; } = default!;

}

[System.CodeDom.Compiler.GeneratedCode("NJsonSchema", "13.18.2.0 (NJsonSchema v10.8.0.0 (Newtonsoft.Json v13.0.0.0))")]
public partial class TranslationResult
{
Expand Down Expand Up @@ -4323,6 +4440,9 @@ public partial class TranslationBuild
[Newtonsoft.Json.JsonProperty("message", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)]
public string? Message { get; set; } = default!;

[Newtonsoft.Json.JsonProperty("queueDepth", Required = Newtonsoft.Json.Required.Default, NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore)]
public int? QueueDepth { get; set; } = default!;

/// <summary>
/// The current build job state.
/// </summary>
Expand Down
9 changes: 9 additions & 0 deletions src/Serval.Grpc/Protos/serval/translation/v1/engine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ service TranslationEngineApi {
rpc TrainSegmentPair(TrainSegmentPairRequest) returns (google.protobuf.Empty);
rpc StartBuild(StartBuildRequest) returns (google.protobuf.Empty);
rpc CancelBuild(CancelBuildRequest) returns (google.protobuf.Empty);
rpc GetQueueSize(GetQueueSizeRequest) returns (GetQueueSizeResponse);
}

message CreateRequest {
Expand Down Expand Up @@ -69,6 +70,14 @@ message CancelBuildRequest {
string engine_id = 2;
}

message GetQueueSizeRequest {
string engine_type = 1;
}

message GetQueueSizeResponse {
int32 size = 1;
}

message AlignedWordPair {
int32 source_index = 1;
int32 target_index = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ message UpdateBuildStatusRequest {
int32 step = 2;
optional double percent_completed = 3;
optional string message = 4;
optional int32 queue_depth = 5;
}

message BuildStartedRequest {
Expand Down
7 changes: 7 additions & 0 deletions src/Serval.Translation/Contracts/QueueDto.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Serval.Translation.Contracts;

public class QueueDto
{
public int Size { get; set; } = default;
public string EngineType { get; set; } = default!;
}
2 changes: 2 additions & 0 deletions src/Serval.Translation/Contracts/TranslationBuildDto.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class TranslationBuildDto
public double? PercentCompleted { get; set; }
public string? Message { get; set; }

public int? QueueDepth { get; set; }

/// <summary>
/// The current build job state.
/// </summary>
Expand Down
33 changes: 33 additions & 0 deletions src/Serval.Translation/Controllers/TranslationEnginesController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,36 @@ public async Task<ActionResult> DeleteAsync([NotNull] string id, CancellationTok
return Ok();
}

/// <summary>
/// Get queue information for a given engine type
/// </summary>
/// <param name="engineType">A valid engine type: SmtTransfer, Nmt, or Echo</param>
/// <param name="cancellationToken"></param>
/// <response code="200">Queue information for the specified engine type</response>
/// <response code="401">The client is not authenticated</response>
/// <response code="403">The authenticated client cannot perform the operation</response>
/// <response code="503">A necessary service is currently unavailable. Check `/health` for more details. </response>
[Authorize(Scopes.ReadTranslationEngines)]
[HttpGet("queues")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(typeof(void), StatusCodes.Status401Unauthorized)]
[ProducesResponseType(typeof(void), StatusCodes.Status403Forbidden)]
[ProducesResponseType(typeof(void), StatusCodes.Status503ServiceUnavailable)]
public async Task<ActionResult<QueueDto>> GetQueueAsync(
[FromBody] string engineType,
CancellationToken cancellationToken
)
{
try
{
return Map(await _engineService.GetQueueAsync(engineType, cancellationToken));
}
catch (InvalidOperationException ioe)
{
return BadRequest(ioe.Message);
}
}

/// <summary>
/// Translate a segment of text
/// </summary>
Expand Down Expand Up @@ -1000,6 +1030,8 @@ private static Build Map(Engine engine, TranslationBuildConfigDto source)
return build;
}

private QueueDto Map(Queue source) => new() { Size = source.Size, EngineType = source.EngineType };

private TranslationEngineDto Map(Engine source)
{
return new TranslationEngineDto
Expand Down Expand Up @@ -1034,6 +1066,7 @@ private TranslationBuildDto Map(Build source)
Step = source.Step,
PercentCompleted = source.PercentCompleted,
Message = source.Message,
QueueDepth = source.QueueDepth,
State = source.State,
DateFinished = source.DateFinished,
Options = JsonSerializer.Serialize(source.Options)
Expand Down
1 change: 1 addition & 0 deletions src/Serval.Translation/Models/Build.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class Build : IEntity
public int Step { get; set; }
public double? PercentCompleted { get; set; }
public string? Message { get; set; }
public int? QueueDepth { get; set; }
public JobState State { get; set; } = JobState.Pending;
public DateTime? DateFinished { get; set; }
public IDictionary<string, object>? Options { get; set; }
Expand Down
7 changes: 7 additions & 0 deletions src/Serval.Translation/Models/Queue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Serval.Translation.Models;

public class Queue
{
public int Size { get; set; } = default;
public string EngineType { get; set; } = default!;
}
10 changes: 10 additions & 0 deletions src/Serval.Translation/Services/EngineService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,16 @@ public Task DeleteAllCorpusFilesAsync(string dataFileId, CancellationToken cance
);
}

public async Task<Queue> GetQueueAsync(string engineType, CancellationToken cancellationToken = default)
{
var client = _grpcClientFactory.CreateClient<TranslationEngineApi.TranslationEngineApiClient>(engineType);
GetQueueSizeResponse response = await client.GetQueueSizeAsync(
new GetQueueSizeRequest { EngineType = engineType },
cancellationToken: cancellationToken
);
return new Queue { Size = response.Size, EngineType = engineType };
}

private Models.TranslationResult Map(V1.TranslationResult source)
{
return new Models.TranslationResult
Expand Down
2 changes: 2 additions & 0 deletions src/Serval.Translation/Services/IEngineService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,6 @@ Task<bool> TrainSegmentPairAsync(
Task<bool> DeleteCorpusAsync(string engineId, string corpusId, CancellationToken cancellationToken = default);

Task DeleteAllCorpusFilesAsync(string dataFileId, CancellationToken cancellationToken = default);

Task<Queue> GetQueueAsync(string engineType, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ await _builds.UpdateAsync(
}
if (request.HasMessage)
u.Set(b => b.Message, request.Message);
if (request.HasQueueDepth)
u.Set(b => b.QueueDepth, request.QueueDepth);
},
cancellationToken: context.CancellationToken
);
Expand Down
58 changes: 58 additions & 0 deletions tests/Serval.E2ETests/ServalApiTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,64 @@ public async Task NmtBatch()
Assert.IsTrue(lTrans[0].Translation.Contains("dearly beloved Gaius"));
}

[Test]
public async Task NmtQueueMultiple()
{
await _helperClient!.ClearEngines();
const int NUM_ENGINES = 9;
const int NUM_WORKERS = 6;
string[] engineIds = new string[NUM_ENGINES];
for (int i = 0; i < NUM_ENGINES; i++)
{
_helperClient.TranslationBuildConfig = new()
{
Pretranslate = new List<PretranslateCorpusConfig>(),
Options = "{\"max_steps\":10}"
};
engineIds[i] = await _helperClient.CreateNewEngine("Nmt", "es", "en", $"NMT1_{i}");
string engineId = engineIds[i];
var books = new string[] { "MAT.txt", "1JN.txt", "2JN.txt" };
await _helperClient.AddTextCorpusToEngine(engineId, books, "es", "en", false);
await _helperClient.AddTextCorpusToEngine(engineId, new string[] { "3JN.txt" }, "es", "en", true);
await _helperClient.StartBuildAsync(engineId);
//Ensure that tasks are enqueued roughly in order
await Task.Delay(500);
}
//Wait for at least some tasks to be queued
await Task.Delay(20_000);
string builds = "";
for (int i = 0; i < NUM_ENGINES; i++)
{
TranslationBuild build = await _helperClient.translationEnginesClient.GetCurrentBuildAsync(engineIds[i]);
builds += $"{JsonSerializer.Serialize(build)}\n";
}

builds += "Depth = " + (await _helperClient.translationEnginesClient.GetQueueAsync("Nmt")).Size.ToString();

//Status message of last started build says that there is at least one job ahead of it in the queue
// (this variable due to how many jobs may already exist in the production queue from other Serval instances)
TranslationBuild newestEngineCurrentBuild = await _helperClient.translationEnginesClient.GetCurrentBuildAsync(
engineIds[NUM_ENGINES - 1]
);
Assert.NotNull(newestEngineCurrentBuild.QueueDepth, JsonSerializer.Serialize(newestEngineCurrentBuild));
Assert.Multiple(async () =>
{
Assert.That(newestEngineCurrentBuild.QueueDepth, Is.GreaterThan(0), message: builds);
Assert.That(
(await _helperClient.translationEnginesClient.GetQueueAsync("Nmt")).Size,
Is.GreaterThanOrEqualTo(NUM_ENGINES - NUM_WORKERS)
);
});
for (int i = 0; i < NUM_ENGINES; i++)
{
try
{
await _helperClient.translationEnginesClient.CancelBuildAsync(engineIds[i]);
}
catch { }
}
}

[Test]
public async Task NmtLargeBatch()
{
Expand Down

0 comments on commit ba5855e

Please sign in to comment.