Skip to content

Commit

Permalink
Support basic queue depth endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Enkidu93 committed Oct 10, 2023
1 parent 99875d1 commit 78527a7
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 10 deletions.
5 changes: 5 additions & 0 deletions samples/EchoTranslationEngine/TranslationEngineServiceV1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,9 @@ public override Task<GetWordGraphResponse> GetWordGraph(GetWordGraphRequest requ
}
);
}

public override Task<GetQueueDepthResponse> GetQueueDepth(GetQueueDepthRequest request, ServerCallContext context)
{
return Task.FromResult(new GetQueueDepthResponse { Depth = 0 });
}
}
130 changes: 120 additions & 10 deletions src/Serval.Client/Client.g.cs
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,15 @@ public partial interface ITranslationEnginesClient
/// <exception cref="ServalApiException">A server side error occurred.</exception>
System.Threading.Tasks.Task DeleteAsync(string id, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));

/// <param name="cancellationToken">A cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <summary>
/// Get queue depth
/// </summary>
/// <param name="engineType">A valid engine type: SmtTransfer, Nmt, or Echo</param>
/// <returns>The queue depth for the specified engine type</returns>
/// <exception cref="ServalApiException">A server side error occurred.</exception>
System.Threading.Tasks.Task<Queue> GetQueueDepthAsync(string engineType, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));

/// <param name="cancellationToken">A cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <summary>
/// Translate a segment of text
Expand Down Expand Up @@ -1011,13 +1020,13 @@ public partial interface ITranslationEnginesClient
/// Get a build job
/// </summary>
/// <remarks>
/// If the `minRevision` is not defined, the current build at whatever state it is
/// If the `minRevision` is not defined, the current build, at whatever state it is,
/// <br/>will be immediately returned. If `minRevision` is defined, Serval will wait for
/// <br/>up to 40 seconds for the engine to build to the `minRevision` specified, else
/// <br/>will timeout.
/// <br/>A use case is to actively query the state of the current build, where the subsequent
/// <br/>request sets the `minRevision` to the returned `revision` + 1. Note: this method
/// <br/>should use request throttling.
/// <br/>request sets the `minRevision` to the returned `revision` + 1 and timeouts are handled gracefully.
/// <br/>Note: this method should use request throttling.
/// </remarks>
/// <param name="id">The translation engine id</param>
/// <param name="buildId">The build job id</param>
Expand All @@ -1031,7 +1040,7 @@ public partial interface ITranslationEnginesClient
/// Get the currently running build job for a translation engine
/// </summary>
/// <remarks>
/// See "Get a Build Job" for details on minimum revision.
/// See documentation on endpoint /translation/engines/{id}/builds/{id} - "Get a Build Job" for details on using `minRevision`.
/// </remarks>
/// <param name="id">The translation engine id</param>
/// <param name="minRevision">The minimum revision</param>
Expand Down Expand Up @@ -1501,6 +1510,99 @@ public string BaseUrl
}
}

/// <param name="cancellationToken">A cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <summary>
/// Get queue depth
/// </summary>
/// <param name="engineType">A valid engine type: SmtTransfer, Nmt, or Echo</param>
/// <returns>The queue depth for the specified engine type</returns>
/// <exception cref="ServalApiException">A server side error occurred.</exception>
public virtual async System.Threading.Tasks.Task<Queue> GetQueueDepthAsync(string engineType, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken))
{
if (engineType == null)
throw new System.ArgumentNullException("engineType");

var urlBuilder_ = new System.Text.StringBuilder();
urlBuilder_.Append(BaseUrl != null ? BaseUrl.TrimEnd('/') : "").Append("/translation/engines/queues/{engineType}");
urlBuilder_.Replace("{engineType}", System.Uri.EscapeDataString(ConvertToString(engineType, System.Globalization.CultureInfo.InvariantCulture)));

var client_ = _httpClient;
var disposeClient_ = false;
try
{
using (var request_ = new System.Net.Http.HttpRequestMessage())
{
request_.Method = new System.Net.Http.HttpMethod("GET");
request_.Headers.Accept.Add(System.Net.Http.Headers.MediaTypeWithQualityHeaderValue.Parse("application/json"));

PrepareRequest(client_, request_, urlBuilder_);

var url_ = urlBuilder_.ToString();
request_.RequestUri = new System.Uri(url_, System.UriKind.RelativeOrAbsolute);

PrepareRequest(client_, request_, url_);

var response_ = await client_.SendAsync(request_, System.Net.Http.HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
var disposeResponse_ = true;
try
{
var headers_ = System.Linq.Enumerable.ToDictionary(response_.Headers, h_ => h_.Key, h_ => h_.Value);
if (response_.Content != null && response_.Content.Headers != null)
{
foreach (var item_ in response_.Content.Headers)
headers_[item_.Key] = item_.Value;
}

ProcessResponse(client_, response_);

var status_ = (int)response_.StatusCode;
if (status_ == 200)
{
var objectResponse_ = await ReadObjectResponseAsync<Queue>(response_, headers_, cancellationToken).ConfigureAwait(false);
if (objectResponse_.Object == null)
{
throw new ServalApiException("Response was null which was not expected.", status_, objectResponse_.Text, headers_, null);
}
return objectResponse_.Object;
}
else
if (status_ == 401)
{
string responseText_ = ( response_.Content == null ) ? string.Empty : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
throw new ServalApiException("The client is not authenticated", status_, responseText_, headers_, null);
}
else
if (status_ == 403)
{
string responseText_ = ( response_.Content == null ) ? string.Empty : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
throw new ServalApiException("The authenticated client cannot perform the operation", status_, responseText_, headers_, null);
}
else
if (status_ == 503)
{
string responseText_ = ( response_.Content == null ) ? string.Empty : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
throw new ServalApiException("A necessary service is currently unavailable. Check `/health` for more details. ", status_, responseText_, headers_, null);
}
else
{
var responseData_ = response_.Content == null ? null : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
throw new ServalApiException("The HTTP status code of the response was not expected (" + status_ + ").", status_, responseData_, headers_, null);
}
}
finally
{
if (disposeResponse_)
response_.Dispose();
}
}
}
finally
{
if (disposeClient_)
client_.Dispose();
}
}

/// <param name="cancellationToken">A cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <summary>
/// Translate a segment of text
Expand Down Expand Up @@ -2922,13 +3024,13 @@ public string BaseUrl
/// Get a build job
/// </summary>
/// <remarks>
/// If the `minRevision` is not defined, the current build at whatever state it is
/// If the `minRevision` is not defined, the current build, at whatever state it is,
/// <br/>will be immediately returned. If `minRevision` is defined, Serval will wait for
/// <br/>up to 40 seconds for the engine to build to the `minRevision` specified, else
/// <br/>will timeout.
/// <br/>A use case is to actively query the state of the current build, where the subsequent
/// <br/>request sets the `minRevision` to the returned `revision` + 1. Note: this method
/// <br/>should use request throttling.
/// <br/>request sets the `minRevision` to the returned `revision` + 1 and timeouts are handled gracefully.
/// <br/>Note: this method should use request throttling.
/// </remarks>
/// <param name="id">The translation engine id</param>
/// <param name="buildId">The build job id</param>
Expand Down Expand Up @@ -3020,7 +3122,7 @@ public string BaseUrl
if (status_ == 408)
{
string responseText_ = ( response_.Content == null ) ? string.Empty : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
throw new ServalApiException("The long polling request timed out", status_, responseText_, headers_, null);
throw new ServalApiException("The long polling request timed out. This is expected behavior if you\'re using long-polling with the minRevision strategy specified in the docs", status_, responseText_, headers_, null);
}
else
if (status_ == 503)
Expand Down Expand Up @@ -3053,7 +3155,7 @@ public string BaseUrl
/// Get the currently running build job for a translation engine
/// </summary>
/// <remarks>
/// See "Get a Build Job" for details on minimum revision.
/// See documentation on endpoint /translation/engines/{id}/builds/{id} - "Get a Build Job" for details on using `minRevision`.
/// </remarks>
/// <param name="id">The translation engine id</param>
/// <param name="minRevision">The minimum revision</param>
Expand Down Expand Up @@ -3146,7 +3248,7 @@ public string BaseUrl
if (status_ == 408)
{
string responseText_ = ( response_.Content == null ) ? string.Empty : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
throw new ServalApiException("The long polling request timed out. Did you start the build?", status_, responseText_, headers_, null);
throw new ServalApiException("The long polling request timed out. This is expected behavior if you\'re using long-polling with the minRevision strategy specified in the docs", status_, responseText_, headers_, null);
}
else
if (status_ == 503)
Expand Down Expand Up @@ -4003,6 +4105,14 @@ public partial class TranslationEngineConfig

}

[System.CodeDom.Compiler.GeneratedCode("NJsonSchema", "13.18.2.0 (NJsonSchema v10.8.0.0 (Newtonsoft.Json v13.0.0.0))")]
public partial class Queue
{
[Newtonsoft.Json.JsonProperty("depth", Required = Newtonsoft.Json.Required.Always)]
public int Depth { get; set; } = default!;

}

[System.CodeDom.Compiler.GeneratedCode("NJsonSchema", "13.18.2.0 (NJsonSchema v10.8.0.0 (Newtonsoft.Json v13.0.0.0))")]
public partial class TranslationResult
{
Expand Down
9 changes: 9 additions & 0 deletions src/Serval.Grpc/Protos/serval/translation/v1/engine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ service TranslationEngineApi {
rpc TrainSegmentPair(TrainSegmentPairRequest) returns (google.protobuf.Empty);
rpc StartBuild(StartBuildRequest) returns (google.protobuf.Empty);
rpc CancelBuild(CancelBuildRequest) returns (google.protobuf.Empty);
rpc GetQueueDepth(GetQueueDepthRequest) returns (GetQueueDepthResponse);
}

message CreateRequest {
Expand Down Expand Up @@ -69,6 +70,14 @@ message CancelBuildRequest {
string engine_id = 2;
}

message GetQueueDepthRequest {
string engine_type = 1;
}

message GetQueueDepthResponse {
int32 depth = 1;
}

message AlignedWordPair {
int32 source_index = 1;
int32 target_index = 2;
Expand Down
6 changes: 6 additions & 0 deletions src/Serval.Translation/Contracts/QueueDto.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Serval.Translation.Contracts;

public class QueueDto
{
public int Depth { get; set; } = default;
}
32 changes: 32 additions & 0 deletions src/Serval.Translation/Controllers/TranslationEnginesController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,36 @@ public async Task<ActionResult> DeleteAsync([NotNull] string id, CancellationTok
return Ok();
}

/// <summary>
/// Get queue depth
/// </summary>
/// <param name="engineType">A valid engine type: SmtTransfer, Nmt, or Echo</param>
/// <param name="cancellationToken"></param>
/// <response code="200">The queue depth for the specified engine type</response>
/// <response code="401">The client is not authenticated</response>
/// <response code="403">The authenticated client cannot perform the operation</response>
/// <response code="503">A necessary service is currently unavailable. Check `/health` for more details. </response>
[Authorize(Scopes.ReadTranslationEngines)]
[HttpGet("queues/{engineType}")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(typeof(void), StatusCodes.Status401Unauthorized)]
[ProducesResponseType(typeof(void), StatusCodes.Status403Forbidden)]
[ProducesResponseType(typeof(void), StatusCodes.Status503ServiceUnavailable)]
public async Task<ActionResult<QueueDto>> GetQueueDepth(
[NotNull] string engineType,
CancellationToken cancellationToken
)
{
try
{
return Map(await _engineService.GetQueueDepthAsync(engineType, cancellationToken));
}
catch (InvalidOperationException ioe)
{
return BadRequest(ioe.Message);
}
}

/// <summary>
/// Translate a segment of text
/// </summary>
Expand Down Expand Up @@ -996,6 +1026,8 @@ private static Build Map(Engine engine, TranslationBuildConfigDto source)
return build;
}

private QueueDto Map(Queue source) => new() { Depth = source.Depth };

private TranslationEngineDto Map(Engine source)
{
return new TranslationEngineDto
Expand Down
6 changes: 6 additions & 0 deletions src/Serval.Translation/Models/Queue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Serval.Translation.Models;

public class Queue
{
public int Depth { get; set; } = default;
}
10 changes: 10 additions & 0 deletions src/Serval.Translation/Services/EngineService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,16 @@ public Task DeleteAllCorpusFilesAsync(string dataFileId, CancellationToken cance
);
}

public async Task<Queue> GetQueueDepthAsync(string engineType, CancellationToken cancellationToken = default)
{
var client = _grpcClientFactory.CreateClient<TranslationEngineApi.TranslationEngineApiClient>(engineType);
GetQueueDepthResponse response = await client.GetQueueDepthAsync(
new GetQueueDepthRequest { EngineType = engineType },
cancellationToken: cancellationToken
);
return new Queue { Depth = response.Depth };
}

private Models.TranslationResult Map(V1.TranslationResult source)
{
return new Models.TranslationResult
Expand Down
2 changes: 2 additions & 0 deletions src/Serval.Translation/Services/IEngineService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,6 @@ Task<bool> TrainSegmentPairAsync(
Task<bool> DeleteCorpusAsync(string engineId, string corpusId, CancellationToken cancellationToken = default);

Task DeleteAllCorpusFilesAsync(string dataFileId, CancellationToken cancellationToken = default);

Task<Queue> GetQueueDepthAsync(string engineType, CancellationToken cancellationToken = default);
}

0 comments on commit 78527a7

Please sign in to comment.