Skip to content

Commit

Permalink
Fix #137: Reading for irregular data sources is quite slow because th…
Browse files Browse the repository at this point in the history
…e current algorithm does not fast forward to the next available file
  • Loading branch information
Apollo3zehn committed Aug 26, 2024
1 parent 96f31d5 commit e1b31ac
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
5 changes: 5 additions & 0 deletions src/Nexus.Sources.StructuredFile/DateTimeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,9 @@ public static DateTime RoundDown(this DateTime dateTime, TimeSpan timeSpan)
{
return new DateTime(dateTime.Ticks - (dateTime.Ticks % timeSpan.Ticks), dateTime.Kind);
}

public static TimeSpan RoundDown(this TimeSpan timespan, TimeSpan timeSpan2)
{
return new TimeSpan(timespan.Ticks - (timespan.Ticks % timeSpan2.Ticks));
}
}
54 changes: 40 additions & 14 deletions src/Nexus.Sources.StructuredFile/StructuredFileDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -402,14 +402,24 @@ protected virtual async Task ReadAsync(
var consumedFilePeriod = currentBegin - regularUtcFileBegin;
var remainingFilePeriod = fileSource.FilePeriod - consumedFilePeriod;
var currentPeriod = TimeSpan.FromTicks(Math.Min(remainingFilePeriod.Ticks, remainingPeriod.Ticks));

Logger.LogTrace("Process period {CurrentBegin} to {CurrentEnd}", currentBegin, currentBegin + currentPeriod);

var fileBlock = (int)(currentPeriod.Ticks / samplePeriod.Ticks);
var fileOffset = consumedFilePeriod.Ticks / samplePeriod.Ticks;

foreach (var (filePath, fileBeginOffset) in fileInfos)
{
/* This happens when FindFileBeginAndPathsAsync did not find files for the current time period */
if (fileBeginOffset >= fileSource.FilePeriod)
{
/* Modify loop status update variables and leave the loop */
currentPeriod = fileBeginOffset.RoundDown(fileSource.FilePeriod);
fileBlock = (int)(currentPeriod.Ticks / samplePeriod.Ticks);

break;
}

if (File.Exists(filePath))
{
// compensate offsets and lengths in case of incomplete or irregular file
Expand All @@ -423,7 +433,7 @@ protected virtual async Task ReadAsync(
? 0

/* The irregular or incomplete file contains no data for the current buffer position, so compensate for it */
: + fileCompensation
: +fileCompensation
);

/* The irregular or incomplete file contains not enough data, so make the file block smaller */
Expand All @@ -435,15 +445,15 @@ protected virtual async Task ReadAsync(
fileCompensation < 0 /* = irregular file */

/* The irregular file starts earlier than expected, so compensate for it */
? - fileCompensation
? -fileCompensation

/* The irregular or incomplete file starts later than expected */
: - fileCompensation
: -fileCompensation
);

/* The maximum value for fileCompensation is MaxFileBlock = FilePeriod / SamplePeriod
* so there is no need to check for actualFileOffset >= MaxFileBlock.
* However, it might happen that actualFileOffset < 0. This must be compensated. */
* so there is no need to check for actualFileOffset >= MaxFileBlock.
* However, it might happen that actualFileOffset < 0. This must be compensated. */
if (actualFileOffset < 0)
actualFileOffset = 0;

Expand All @@ -458,13 +468,13 @@ protected virtual async Task ReadAsync(

var slicedData = request.Data
.Slice(
start: actualBufferOffset * representation.ElementSize,
start: actualBufferOffset * representation.ElementSize,
length: actualFileBlock * representation.ElementSize
);

var slicedStatus = request.Status
.Slice(
start: actualBufferOffset,
start: actualBufferOffset,
length: actualFileBlock
);

Expand Down Expand Up @@ -534,7 +544,7 @@ protected virtual async Task ReadAsync(
Logger.LogError(ex, "Could not read file source group");
}

++fileSourceGroupIndex;
fileSourceGroupIndex++;
}
}

Expand All @@ -550,7 +560,7 @@ protected abstract Task ReadAsync(
StructuredFileReadRequest[] readRequests,
CancellationToken cancellationToken);

private protected Task<(DateTime RegularUtcFileBegin, IEnumerable<(string FilePath, TimeSpan FileBeginOffset)>)>
private protected Task<(DateTime RegularUtcFileBegin, (string FilePath, TimeSpan FileBeginOffset)[])>
FindFileBeginAndPathsAsync(DateTime begin, FileSource fileSource)
{
/* This implementation assumes that files are stored in regular time intervals.
Expand Down Expand Up @@ -588,7 +598,7 @@ protected abstract Task ReadAsync(
fileSource.UtcOffset
).UtcDateTime;

IEnumerable<(string FilePath, TimeSpan FileBeginOffset)> fileInfos;
(string FilePath, TimeSpan FileBeginOffset)[] fileInfos;

if (fileSource.FileTemplate.Contains('?') || fileSource.FileTemplate.Contains('*'))
{
Expand All @@ -598,14 +608,18 @@ protected abstract Task ReadAsync(

var regularUtcFileEnd = regularUtcFileBegin + fileSource.FilePeriod;

fileInfos = GetCandidateFiles(
// get all files that match the pattern
var tmpFileInfos = GetCandidateFiles(
rootPath: Root,
begin: regularUtcFileBegin,
end: regularUtcFileBegin + fileSource.FilePeriod,
fileSource,
CancellationToken.None
)
.Where(current =>
.ToArray();

// keep only files for current period
fileInfos = tmpFileInfos.Where(current =>
{
if (fileSource.IrregularTimeInterval)
{
Expand All @@ -621,7 +635,19 @@ protected abstract Task ReadAsync(
current.DateTimeOffset.UtcDateTime < regularUtcFileEnd;
}
})
.Select(current => (current.FilePath, current.DateTimeOffset.UtcDateTime - regularUtcFileBegin));
.Select(current => (current.FilePath, current.DateTimeOffset.UtcDateTime - regularUtcFileBegin))
.ToArray();

// no files left for current time period - try to find next file after this period
if (!fileInfos.Any())
{
fileInfos = tmpFileInfos
.OrderBy(x => x.DateTimeOffset.DateTime)
.Where(current => regularUtcFileEnd <= current.DateTimeOffset.UtcDateTime)
.Select(current => (current.FilePath, current.DateTimeOffset.UtcDateTime - regularUtcFileBegin))
.Take(1)
.ToArray();
}
}

else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public StructuredFileDataSourceTester(

public Dictionary<string, Dictionary<string, IReadOnlyList<FileSource>>> Config { get; private set; } = default!;

public new Task<(DateTime RegularUtcFileBegin, IEnumerable<(string FilePath, TimeSpan FileBeginOffset)>)>
public new Task<(DateTime RegularUtcFileBegin, (string FilePath, TimeSpan FileBeginOffset)[])>
FindFileBeginAndPathsAsync(DateTime begin, FileSource fileSource)
{
return base.FindFileBeginAndPathsAsync(begin, fileSource);
Expand Down

0 comments on commit e1b31ac

Please sign in to comment.