Skip to content

Commit

Permalink
Add cleanup services
Browse files Browse the repository at this point in the history
Create parent class and initializable interface
Collapse query
Move cleanup service to serval.shared
  • Loading branch information
Enkidu93 authored and johnml1135 committed Dec 4, 2024
1 parent 69fa745 commit d376ec5
Show file tree
Hide file tree
Showing 14 changed files with 298 additions and 27 deletions.
7 changes: 7 additions & 0 deletions src/Serval/src/Serval.Shared/Models/IInitializableEntity.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Serval.Shared.Models;

public interface IInitializableEntity : IEntity
{
bool? IsInitialized { get; set; }
DateTime? DateCreated { get; set; }
}
2 changes: 1 addition & 1 deletion src/Serval/src/Serval.Shared/Services/EntityServiceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ public abstract class EntityServiceBase<T>(IRepository<T> entities)
{
protected IRepository<T> Entities { get; } = entities;

public async Task<T> GetAsync(string id, CancellationToken cancellationToken = default)
public virtual async Task<T> GetAsync(string id, CancellationToken cancellationToken = default)
{
T? entity = await Entities.GetAsync(id, cancellationToken);
if (entity is null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using Microsoft.Extensions.DependencyInjection;
using SIL.ServiceToolkit.Services;

namespace Serval.Shared.Services;

public abstract class UninitializedCleanupService<T>(
IServiceProvider services,
ILogger<UninitializedCleanupService<T>> logger,
TimeSpan? timeout = null
) : RecurrentTask($"{typeof(T)} Cleanup Service", services, RefreshPeriod, logger)
where T : IInitializableEntity
{
private readonly ILogger<UninitializedCleanupService<T>> _logger = logger;
private readonly TimeSpan _timeout = timeout ?? TimeSpan.FromMinutes(2);
private static readonly TimeSpan RefreshPeriod = TimeSpan.FromDays(1);

protected override async Task DoWorkAsync(IServiceScope scope, CancellationToken cancellationToken)
{
_logger.LogInformation("Running build cleanup job");
var entities = scope.ServiceProvider.GetRequiredService<IRepository<T>>();
await CheckEntitiesAsync(entities, cancellationToken);
}

public async Task CheckEntitiesAsync(IRepository<T> entities, CancellationToken cancellationToken)
{
var now = DateTime.UtcNow;
IEnumerable<T> uninitializedEntities = await entities.GetAllAsync(
e =>
e.DateCreated != null
&& e.DateCreated < now - _timeout
&& e.IsInitialized != null
&& !e.IsInitialized.Value,
cancellationToken
);

foreach (T entity in uninitializedEntities)
{
_logger.LogInformation(
"Deleting {type} {id} because it was never successfully initialized.",
typeof(T),
entity.Id
);
await DeleteEntityAsync(entities, entity, cancellationToken);
}
}

protected virtual async Task DeleteEntityAsync(
IRepository<T> entities,
T entity,
CancellationToken cancellationToken
)
{
await entities.DeleteAsync(e => e.Id == entity.Id, cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ this IMongoDataAccessConfigurator configurator
await c.Indexes.CreateOrUpdateAsync(
new CreateIndexModel<Engine>(Builders<Engine>.IndexKeys.Ascending(e => e.Owner))
);
await c.Indexes.CreateOrUpdateAsync(
new CreateIndexModel<Engine>(Builders<Engine>.IndexKeys.Ascending(e => e.DateCreated))
);
}
);
configurator.AddRepository<Build>(
"translation.builds",
init: c =>
c.Indexes.CreateOrUpdateAsync(
init: async c =>
{
await c.Indexes.CreateOrUpdateAsync(
new CreateIndexModel<Build>(Builders<Build>.IndexKeys.Ascending(b => b.EngineRef))
)
);
await c.Indexes.CreateOrUpdateAsync(
new CreateIndexModel<Build>(Builders<Build>.IndexKeys.Ascending(b => b.DateCreated))
);
}
);
configurator.AddRepository<Pretranslation>(
"translation.pretranslations",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public static IServalBuilder AddTranslation(this IServalBuilder builder)
builder.Services.AddScoped<IPretranslationService, PretranslationService>();
builder.Services.AddScoped<IEngineService, EngineService>();

builder.Services.AddSingleton<EngineCleanupService>();
builder.Services.AddSingleton<BuildCleanupService>();

var translationOptions = new TranslationOptions();
builder.Configuration.GetSection(TranslationOptions.Key).Bind(translationOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1318,7 +1318,8 @@ private Engine Map(TranslationEngineConfigDto source)
Type = source.Type.ToPascalCase(),
Owner = Owner,
Corpora = [],
IsModelPersisted = source.IsModelPersisted
IsModelPersisted = source.IsModelPersisted,
IsInitialized = false
};
}

Expand All @@ -1331,7 +1332,8 @@ private static Build Map(Engine engine, TranslationBuildConfigDto source, string
Pretranslate = Map(engine, source.Pretranslate),
TrainOn = Map(engine, source.TrainOn),
Options = Map(source.Options),
DeploymentVersion = deploymentVersion
DeploymentVersion = deploymentVersion,
IsInitialized = false
};
}

Expand Down
4 changes: 3 additions & 1 deletion src/Serval/src/Serval.Translation/Models/Build.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Serval.Translation.Models;

public record Build : IEntity
public record Build : IInitializableEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
Expand All @@ -16,4 +16,6 @@ public record Build : IEntity
public DateTime? DateFinished { get; init; }
public IReadOnlyDictionary<string, object>? Options { get; init; }
public string? DeploymentVersion { get; init; }
public bool? IsInitialized { get; set; }
public DateTime? DateCreated { get; set; }
}
4 changes: 3 additions & 1 deletion src/Serval/src/Serval.Translation/Models/Engine.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Serval.Translation.Models;

public record Engine : IOwnedEntity
public record Engine : IOwnedEntity, IInitializableEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
Expand All @@ -16,4 +16,6 @@ public record Engine : IOwnedEntity
public int ModelRevision { get; init; }
public double Confidence { get; init; }
public int CorpusSize { get; init; }
public bool? IsInitialized { get; set; }
public DateTime? DateCreated { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Serval.Translation.Services;

public class BuildCleanupService(
IServiceProvider services,
ILogger<BuildCleanupService> logger,
TimeSpan? timeout = null
) : UninitializedCleanupService<Build>(services, logger, timeout) { }
32 changes: 28 additions & 4 deletions src/Serval/src/Serval.Translation/Services/BuildService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,30 @@ public class BuildService(IRepository<Build> builds) : EntityServiceBase<Build>(
{
public async Task<IEnumerable<Build>> GetAllAsync(string parentId, CancellationToken cancellationToken = default)
{
return await Entities.GetAllAsync(e => e.EngineRef == parentId, cancellationToken);
return await Entities.GetAllAsync(
e => e.EngineRef == parentId && (e.IsInitialized == null || e.IsInitialized.Value),
cancellationToken
);
}

public override async Task<Build> GetAsync(string id, CancellationToken cancellationToken = default)
{
Build? build = await Entities.GetAsync(
e => e.Id == id && (e.IsInitialized == null || e.IsInitialized.Value),
cancellationToken
);
if (build == null)
throw new EntityNotFoundException($"Could not find the {typeof(Build).Name} '{id}'.");
return build;
}

public Task<Build?> GetActiveAsync(string parentId, CancellationToken cancellationToken = default)
{
return Entities.GetAsync(
b => b.EngineRef == parentId && (b.State == JobState.Active || b.State == JobState.Pending),
b =>
b.EngineRef == parentId
&& (b.IsInitialized == null || b.IsInitialized.Value)
&& (b.State == JobState.Active || b.State == JobState.Pending),
cancellationToken
);
}
Expand All @@ -21,7 +38,11 @@ public Task<EntityChange<Build>> GetNewerRevisionAsync(
CancellationToken cancellationToken = default
)
{
return GetNewerRevisionAsync(e => e.Id == id, minRevision, cancellationToken);
return GetNewerRevisionAsync(
e => e.Id == id && (e.IsInitialized == null || e.IsInitialized.Value),
minRevision,
cancellationToken
);
}

public Task<EntityChange<Build>> GetActiveNewerRevisionAsync(
Expand All @@ -31,7 +52,10 @@ public Task<EntityChange<Build>> GetActiveNewerRevisionAsync(
)
{
return GetNewerRevisionAsync(
b => b.EngineRef == parentId && (b.State == JobState.Active || b.State == JobState.Pending),
b =>
b.EngineRef == parentId
&& (b.IsInitialized == null || b.IsInitialized.Value)
&& (b.State == JobState.Active || b.State == JobState.Pending),
minRevision,
cancellationToken
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Serval.Translation.Services;

public class EngineCleanupService(
IServiceProvider services,
ILogger<EngineCleanupService> logger,
TimeSpan? timeout = null
) : UninitializedCleanupService<Engine>(services, logger, timeout) { }
66 changes: 51 additions & 15 deletions src/Serval/src/Serval.Translation/Services/EngineService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@ IScriptureDataFileService scriptureDataFileService
private readonly ILogger<EngineService> _logger = loggerFactory.CreateLogger<EngineService>();
private readonly IScriptureDataFileService _scriptureDataFileService = scriptureDataFileService;

public override async Task<Engine> GetAsync(string id, CancellationToken cancellationToken = default)
{
Engine engine = await base.GetAsync(id, cancellationToken);
if (!(engine.IsInitialized ?? true))
throw new EntityNotFoundException($"Could not find the {typeof(Engine).Name} '{id}'.");
return engine;
}

public override async Task<IEnumerable<Engine>> GetAllAsync(
string owner,
CancellationToken cancellationToken = default
)
{
return await Entities.GetAllAsync(
e => e.Owner == owner && (e.IsInitialized == null || e.IsInitialized.Value),
cancellationToken
);
}

public async Task<Models.TranslationResult> TranslateAsync(
string engineId,
string segment,
Expand Down Expand Up @@ -120,9 +139,9 @@ await client.TrainSegmentPairAsync(

public override async Task<Engine> CreateAsync(Engine engine, CancellationToken cancellationToken = default)
{
bool updateIsModelPersisted = engine.IsModelPersisted is null;
try
{
engine.DateCreated = DateTime.UtcNow;
await Entities.InsertAsync(engine, cancellationToken);
TranslationEngineApi.TranslationEngineApiClient? client =
_grpcClientFactory.CreateClient<TranslationEngineApi.TranslationEngineApiClient>(engine.Type);
Expand All @@ -146,6 +165,15 @@ public override async Task<Engine> CreateAsync(Engine engine, CancellationToken
{
IsModelPersisted = createResponse.IsModelPersisted
};
await Entities.UpdateAsync(
engine,
u =>
{
u.Set(e => e.IsInitialized, true);
u.Set(e => e.IsModelPersisted, engine.IsModelPersisted);
},
cancellationToken: CancellationToken.None
);
}
catch (RpcException rpcex)
{
Expand All @@ -164,14 +192,6 @@ public override async Task<Engine> CreateAsync(Engine engine, CancellationToken
await Entities.DeleteAsync(engine, CancellationToken.None);
throw;
}
if (updateIsModelPersisted)
{
await Entities.UpdateAsync(
engine,
u => u.Set(e => e.IsModelPersisted, engine.IsModelPersisted),
cancellationToken: cancellationToken
);
}
return engine;
}

Expand Down Expand Up @@ -216,6 +236,7 @@ private Dictionary<string, List<int>> GetChapters(string fileLocation, string sc

public async Task StartBuildAsync(Build build, CancellationToken cancellationToken = default)
{
build.DateCreated = DateTime.UtcNow;
Engine engine = await GetAsync(build.EngineRef, cancellationToken);
await _builds.InsertAsync(build, cancellationToken);

Expand Down Expand Up @@ -325,6 +346,11 @@ pretranslate is null
_logger.LogInformation("{request}", JsonSerializer.Serialize(request));
}
await client.StartBuildAsync(request, cancellationToken: cancellationToken);
await _builds.UpdateAsync(
b => b.Id == build.Id,
u => u.Set(e => e.IsInitialized, true),
cancellationToken: CancellationToken.None
);
}
catch
{
Expand Down Expand Up @@ -382,7 +408,11 @@ public async Task<ModelDownloadUrl> GetModelDownloadUrlAsync(

public Task AddCorpusAsync(string engineId, Models.Corpus corpus, CancellationToken cancellationToken = default)
{
return Entities.UpdateAsync(engineId, u => u.Add(e => e.Corpora, corpus), cancellationToken: cancellationToken);
return Entities.UpdateAsync(
e => e.Id == engineId && (e.IsInitialized == null || e.IsInitialized.Value),
u => u.Add(e => e.Corpora, corpus),
cancellationToken: cancellationToken
);
}

public async Task<Models.Corpus> UpdateCorpusAsync(
Expand All @@ -394,7 +424,10 @@ public Task AddCorpusAsync(string engineId, Models.Corpus corpus, CancellationTo
)
{
Engine? engine = await Entities.UpdateAsync(
e => e.Id == engineId && e.Corpora.Any(c => c.Id == corpusId),
e =>
e.Id == engineId
&& (e.IsInitialized == null || e.IsInitialized.Value)
&& e.Corpora.Any(c => c.Id == corpusId),
u =>
{
if (sourceFiles is not null)
Expand All @@ -421,7 +454,7 @@ await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
originalEngine = await Entities.UpdateAsync(
engineId,
e => e.Id == engineId && (e.IsInitialized == null || e.IsInitialized.Value),
u => u.RemoveAll(e => e.Corpora, c => c.Id == corpusId),
returnOriginal: true,
cancellationToken: ct
Expand Down Expand Up @@ -456,7 +489,7 @@ public Task AddParallelCorpusAsync(
)
{
return Entities.UpdateAsync(
engineId,
e => e.Id == engineId && (e.IsInitialized == null || e.IsInitialized.Value),
u => u.Add(e => e.ParallelCorpora, corpus),
cancellationToken: cancellationToken
);
Expand All @@ -471,7 +504,10 @@ public Task AddParallelCorpusAsync(
)
{
Engine? engine = await Entities.UpdateAsync(
e => e.Id == engineId && e.ParallelCorpora.Any(c => c.Id == parallelCorpusId),
e =>
e.Id == engineId
&& (e.IsInitialized == null || e.IsInitialized.Value)
&& e.ParallelCorpora.Any(c => c.Id == parallelCorpusId),
u =>
{
if (sourceCorpora is not null)
Expand Down Expand Up @@ -502,7 +538,7 @@ await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
originalEngine = await Entities.UpdateAsync(
engineId,
e => e.Id == engineId && (e.IsInitialized == null || e.IsInitialized.Value),
u => u.RemoveAll(e => e.ParallelCorpora, c => c.Id == parallelCorpusId),
returnOriginal: true,
cancellationToken: ct
Expand Down
Loading

0 comments on commit d376ec5

Please sign in to comment.