Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup builds & engines #510

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/Serval/src/Serval.Shared/Models/IInitializableEntity.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Serval.Shared.Models;

public interface IInitializableEntity : IEntity
{
bool? IsInitialized { get; set; }
DateTime? DateCreated { get; set; }
}
2 changes: 1 addition & 1 deletion src/Serval/src/Serval.Shared/Services/EntityServiceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ public abstract class EntityServiceBase<T>(IRepository<T> entities)
{
protected IRepository<T> Entities { get; } = entities;

public async Task<T> GetAsync(string id, CancellationToken cancellationToken = default)
public virtual async Task<T> GetAsync(string id, CancellationToken cancellationToken = default)
{
T? entity = await Entities.GetAsync(id, cancellationToken);
if (entity is null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using Microsoft.Extensions.DependencyInjection;
using SIL.ServiceToolkit.Services;

namespace Serval.Shared.Services;

public abstract class UninitializedCleanupService<T>(
IServiceProvider services,
ILogger<UninitializedCleanupService<T>> logger,
TimeSpan? timeout = null
) : RecurrentTask($"{typeof(T)} Cleanup Service", services, RefreshPeriod, logger)
where T : IInitializableEntity
{
private readonly ILogger<UninitializedCleanupService<T>> _logger = logger;
private readonly TimeSpan _timeout = timeout ?? TimeSpan.FromMinutes(2);
private static readonly TimeSpan RefreshPeriod = TimeSpan.FromDays(1);

protected override async Task DoWorkAsync(IServiceScope scope, CancellationToken cancellationToken)
{
_logger.LogInformation("Running build cleanup job");
var entities = scope.ServiceProvider.GetRequiredService<IRepository<T>>();
await CheckEntitiesAsync(entities, cancellationToken);
}

public async Task CheckEntitiesAsync(IRepository<T> entities, CancellationToken cancellationToken)
{
var now = DateTime.UtcNow;
IEnumerable<T> uninitializedEntities = await entities.GetAllAsync(
e =>
e.DateCreated != null
&& e.DateCreated < now - _timeout
&& e.IsInitialized != null
&& !e.IsInitialized.Value,
cancellationToken
);

foreach (T entity in uninitializedEntities)
{
_logger.LogInformation(
"Deleting {type} {id} because it was never successfully initialized.",
typeof(T),
entity.Id
);
await DeleteEntityAsync(entities, entity, cancellationToken);
}
}

protected virtual async Task DeleteEntityAsync(
IRepository<T> entities,
T entity,
CancellationToken cancellationToken
)
{
await entities.DeleteAsync(e => e.Id == entity.Id, cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ this IMongoDataAccessConfigurator configurator
await c.Indexes.CreateOrUpdateAsync(
new CreateIndexModel<Engine>(Builders<Engine>.IndexKeys.Ascending(e => e.Owner))
);
await c.Indexes.CreateOrUpdateAsync(
new CreateIndexModel<Engine>(Builders<Engine>.IndexKeys.Ascending(e => e.DateCreated))
);
}
);
configurator.AddRepository<Build>(
"translation.builds",
init: c =>
c.Indexes.CreateOrUpdateAsync(
init: async c =>
{
await c.Indexes.CreateOrUpdateAsync(
new CreateIndexModel<Build>(Builders<Build>.IndexKeys.Ascending(b => b.EngineRef))
)
);
await c.Indexes.CreateOrUpdateAsync(
new CreateIndexModel<Build>(Builders<Build>.IndexKeys.Ascending(b => b.DateCreated))
);
}
);
configurator.AddRepository<Pretranslation>(
"translation.pretranslations",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public static IServalBuilder AddTranslation(this IServalBuilder builder)
builder.Services.AddScoped<IPretranslationService, PretranslationService>();
builder.Services.AddScoped<IEngineService, EngineService>();

builder.Services.AddSingleton<EngineCleanupService>();
builder.Services.AddSingleton<BuildCleanupService>();

var translationOptions = new TranslationOptions();
builder.Configuration.GetSection(TranslationOptions.Key).Bind(translationOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1318,7 +1318,8 @@ private Engine Map(TranslationEngineConfigDto source)
Type = source.Type.ToPascalCase(),
Owner = Owner,
Corpora = [],
IsModelPersisted = source.IsModelPersisted
IsModelPersisted = source.IsModelPersisted,
IsInitialized = false
};
}

Expand All @@ -1331,7 +1332,8 @@ private static Build Map(Engine engine, TranslationBuildConfigDto source, string
Pretranslate = Map(engine, source.Pretranslate),
TrainOn = Map(engine, source.TrainOn),
Options = Map(source.Options),
DeploymentVersion = deploymentVersion
DeploymentVersion = deploymentVersion,
IsInitialized = false
};
}

Expand Down
4 changes: 3 additions & 1 deletion src/Serval/src/Serval.Translation/Models/Build.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Serval.Translation.Models;

public record Build : IEntity
public record Build : IInitializableEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
Expand All @@ -16,4 +16,6 @@ public record Build : IEntity
public DateTime? DateFinished { get; init; }
public IReadOnlyDictionary<string, object>? Options { get; init; }
public string? DeploymentVersion { get; init; }
public bool? IsInitialized { get; set; }
public DateTime? DateCreated { get; set; }
}
4 changes: 3 additions & 1 deletion src/Serval/src/Serval.Translation/Models/Engine.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Serval.Translation.Models;

public record Engine : IOwnedEntity
public record Engine : IOwnedEntity, IInitializableEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
Expand All @@ -16,4 +16,6 @@ public record Engine : IOwnedEntity
public int ModelRevision { get; init; }
public double Confidence { get; init; }
public int CorpusSize { get; init; }
public bool? IsInitialized { get; set; }
public DateTime? DateCreated { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Serval.Translation.Services;

public class BuildCleanupService(
IServiceProvider services,
ILogger<BuildCleanupService> logger,
TimeSpan? timeout = null
) : UninitializedCleanupService<Build>(services, logger, timeout) { }
32 changes: 28 additions & 4 deletions src/Serval/src/Serval.Translation/Services/BuildService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,30 @@ public class BuildService(IRepository<Build> builds) : EntityServiceBase<Build>(
{
public async Task<IEnumerable<Build>> GetAllAsync(string parentId, CancellationToken cancellationToken = default)
{
return await Entities.GetAllAsync(e => e.EngineRef == parentId, cancellationToken);
return await Entities.GetAllAsync(
e => e.EngineRef == parentId && (e.IsInitialized == null || e.IsInitialized.Value),
cancellationToken
);
}

public override async Task<Build> GetAsync(string id, CancellationToken cancellationToken = default)
{
Build? build = await Entities.GetAsync(
e => e.Id == id && (e.IsInitialized == null || e.IsInitialized.Value),
cancellationToken
);
if (build == null)
throw new EntityNotFoundException($"Could not find the {typeof(Build).Name} '{id}'.");
return build;
}

public Task<Build?> GetActiveAsync(string parentId, CancellationToken cancellationToken = default)
{
return Entities.GetAsync(
b => b.EngineRef == parentId && (b.State == JobState.Active || b.State == JobState.Pending),
b =>
b.EngineRef == parentId
&& (b.IsInitialized == null || b.IsInitialized.Value)
&& (b.State == JobState.Active || b.State == JobState.Pending),
cancellationToken
);
}
Expand All @@ -21,7 +38,11 @@ public Task<EntityChange<Build>> GetNewerRevisionAsync(
CancellationToken cancellationToken = default
)
{
return GetNewerRevisionAsync(e => e.Id == id, minRevision, cancellationToken);
return GetNewerRevisionAsync(
e => e.Id == id && (e.IsInitialized == null || e.IsInitialized.Value),
minRevision,
cancellationToken
);
}

public Task<EntityChange<Build>> GetActiveNewerRevisionAsync(
Expand All @@ -31,7 +52,10 @@ public Task<EntityChange<Build>> GetActiveNewerRevisionAsync(
)
{
return GetNewerRevisionAsync(
b => b.EngineRef == parentId && (b.State == JobState.Active || b.State == JobState.Pending),
b =>
b.EngineRef == parentId
&& (b.IsInitialized == null || b.IsInitialized.Value)
&& (b.State == JobState.Active || b.State == JobState.Pending),
minRevision,
cancellationToken
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Serval.Translation.Services;

public class EngineCleanupService(
IServiceProvider services,
ILogger<EngineCleanupService> logger,
TimeSpan? timeout = null
) : UninitializedCleanupService<Engine>(services, logger, timeout) { }
66 changes: 51 additions & 15 deletions src/Serval/src/Serval.Translation/Services/EngineService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@ IScriptureDataFileService scriptureDataFileService
private readonly ILogger<EngineService> _logger = loggerFactory.CreateLogger<EngineService>();
private readonly IScriptureDataFileService _scriptureDataFileService = scriptureDataFileService;

public override async Task<Engine> GetAsync(string id, CancellationToken cancellationToken = default)
{
Engine engine = await base.GetAsync(id, cancellationToken);
if (!(engine.IsInitialized ?? true))
throw new EntityNotFoundException($"Could not find the {typeof(Engine).Name} '{id}'.");
return engine;
}

public override async Task<IEnumerable<Engine>> GetAllAsync(
string owner,
CancellationToken cancellationToken = default
)
{
return await Entities.GetAllAsync(
e => e.Owner == owner && (e.IsInitialized == null || e.IsInitialized.Value),
cancellationToken
);
}

public async Task<Models.TranslationResult> TranslateAsync(
string engineId,
string segment,
Expand Down Expand Up @@ -120,9 +139,9 @@ await client.TrainSegmentPairAsync(

public override async Task<Engine> CreateAsync(Engine engine, CancellationToken cancellationToken = default)
{
bool updateIsModelPersisted = engine.IsModelPersisted is null;
try
{
engine.DateCreated = DateTime.UtcNow;
await Entities.InsertAsync(engine, cancellationToken);
TranslationEngineApi.TranslationEngineApiClient? client =
_grpcClientFactory.CreateClient<TranslationEngineApi.TranslationEngineApiClient>(engine.Type);
Expand All @@ -146,6 +165,15 @@ public override async Task<Engine> CreateAsync(Engine engine, CancellationToken
{
IsModelPersisted = createResponse.IsModelPersisted
};
await Entities.UpdateAsync(
engine,
u =>
{
u.Set(e => e.IsInitialized, true);
u.Set(e => e.IsModelPersisted, engine.IsModelPersisted);
},
cancellationToken: CancellationToken.None
);
}
catch (RpcException rpcex)
{
Expand All @@ -164,14 +192,6 @@ public override async Task<Engine> CreateAsync(Engine engine, CancellationToken
await Entities.DeleteAsync(engine, CancellationToken.None);
throw;
}
if (updateIsModelPersisted)
{
await Entities.UpdateAsync(
engine,
u => u.Set(e => e.IsModelPersisted, engine.IsModelPersisted),
cancellationToken: cancellationToken
);
}
return engine;
}

Expand Down Expand Up @@ -216,6 +236,7 @@ private Dictionary<string, List<int>> GetChapters(string fileLocation, string sc

public async Task StartBuildAsync(Build build, CancellationToken cancellationToken = default)
{
build.DateCreated = DateTime.UtcNow;
Engine engine = await GetAsync(build.EngineRef, cancellationToken);
await _builds.InsertAsync(build, cancellationToken);

Expand Down Expand Up @@ -325,6 +346,11 @@ pretranslate is null
_logger.LogInformation("{request}", JsonSerializer.Serialize(request));
}
await client.StartBuildAsync(request, cancellationToken: cancellationToken);
await _builds.UpdateAsync(
b => b.Id == build.Id,
u => u.Set(e => e.IsInitialized, true),
cancellationToken: CancellationToken.None
);
}
catch
{
Expand Down Expand Up @@ -382,7 +408,11 @@ public async Task<ModelDownloadUrl> GetModelDownloadUrlAsync(

public Task AddCorpusAsync(string engineId, Models.Corpus corpus, CancellationToken cancellationToken = default)
{
return Entities.UpdateAsync(engineId, u => u.Add(e => e.Corpora, corpus), cancellationToken: cancellationToken);
return Entities.UpdateAsync(
e => e.Id == engineId && (e.IsInitialized == null || e.IsInitialized.Value),
u => u.Add(e => e.Corpora, corpus),
cancellationToken: cancellationToken
);
}

public async Task<Models.Corpus> UpdateCorpusAsync(
Expand All @@ -394,7 +424,10 @@ public Task AddCorpusAsync(string engineId, Models.Corpus corpus, CancellationTo
)
{
Engine? engine = await Entities.UpdateAsync(
e => e.Id == engineId && e.Corpora.Any(c => c.Id == corpusId),
e =>
e.Id == engineId
&& (e.IsInitialized == null || e.IsInitialized.Value)
&& e.Corpora.Any(c => c.Id == corpusId),
u =>
{
if (sourceFiles is not null)
Expand All @@ -421,7 +454,7 @@ await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
originalEngine = await Entities.UpdateAsync(
engineId,
e => e.Id == engineId && (e.IsInitialized == null || e.IsInitialized.Value),
u => u.RemoveAll(e => e.Corpora, c => c.Id == corpusId),
returnOriginal: true,
cancellationToken: ct
Expand Down Expand Up @@ -456,7 +489,7 @@ public Task AddParallelCorpusAsync(
)
{
return Entities.UpdateAsync(
engineId,
e => e.Id == engineId && (e.IsInitialized == null || e.IsInitialized.Value),
u => u.Add(e => e.ParallelCorpora, corpus),
cancellationToken: cancellationToken
);
Expand All @@ -471,7 +504,10 @@ public Task AddParallelCorpusAsync(
)
{
Engine? engine = await Entities.UpdateAsync(
e => e.Id == engineId && e.ParallelCorpora.Any(c => c.Id == parallelCorpusId),
e =>
e.Id == engineId
&& (e.IsInitialized == null || e.IsInitialized.Value)
&& e.ParallelCorpora.Any(c => c.Id == parallelCorpusId),
u =>
{
if (sourceCorpora is not null)
Expand Down Expand Up @@ -502,7 +538,7 @@ await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
originalEngine = await Entities.UpdateAsync(
engineId,
e => e.Id == engineId && (e.IsInitialized == null || e.IsInitialized.Value),
u => u.RemoveAll(e => e.ParallelCorpora, c => c.Id == parallelCorpusId),
returnOriginal: true,
cancellationToken: ct
Expand Down
Loading
Loading