Skip to content

Commit

Permalink
Force checkpoint of a small segment in some cases
Browse files Browse the repository at this point in the history
Fixes #46
  • Loading branch information
lalinsky committed Dec 8, 2024
1 parent c400190 commit 92d3ce1
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 33 deletions.
4 changes: 3 additions & 1 deletion src/FileSegment.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const KeepOrDelete = common.KeepOrDelete;

const Item = @import("segment.zig").Item;
const SegmentInfo = @import("segment.zig").SegmentInfo;
const SegmentStatus = @import("segment.zig").SegmentStatus;

const filefmt = @import("filefmt.zig");

Expand All @@ -20,6 +21,7 @@ pub const Options = struct {
allocator: std.mem.Allocator,
dir: std.fs.Dir,
info: SegmentInfo = .{},
status: SegmentStatus = .{},
attributes: std.StringHashMapUnmanaged(u64) = .{},
docs: std.AutoHashMapUnmanaged(u32, bool) = .{},
min_doc_id: u32 = 0,
Expand Down Expand Up @@ -158,7 +160,7 @@ test "build" {
defer source.deinit(.delete);

source.info = .{ .version = 1 };
source.frozen = true;
source.status.frozen = true;
try source.docs.put(source.allocator, 1, true);
try source.items.append(source.allocator, .{ .id = 1, .hash = 1 });
try source.items.append(source.allocator, .{ .id = 1, .hash = 2 });
Expand Down
25 changes: 4 additions & 21 deletions src/Index.zig
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,9 @@ pub fn deinit(self: *Self) void {
self.dir.close();
}

fn doCheckpoint(self: *Self) !bool {
var snapshot = try self.acquireReader();
defer self.releaseReader(&snapshot);

const source = snapshot.memory_segments.value.getFirst() orelse return false;
if (source.value.getSize() < self.options.min_segment_size) {
return false;
}
fn checkpoint(self: *Self) !bool {
var source = self.memory_segments.prepareCheckpoint(self.allocator) orelse return false;
defer MemorySegmentList.destroySegment(self.allocator, &source);

// build new file segment

Expand Down Expand Up @@ -207,7 +202,7 @@ fn updateDocsMetrics(self: *Self) void {
}

fn checkpointTask(self: *Self) void {
_ = self.doCheckpoint() catch |err| {
_ = self.checkpoint() catch |err| {
log.err("checkpoint failed: {}", .{err});
};
}
Expand Down Expand Up @@ -377,18 +372,6 @@ fn maybeScheduleCheckpoint(self: *Self) void {
}
}

fn readyForCheckpoint(self: *Self) ?MemorySegmentNode {
self.segments_lock.lockShared();
defer self.segments_lock.unlockShared();

if (self.segments.memory_segments.value.getFirstOrNull()) |first_node| {
if (first_node.value.getSize() > self.options.min_segment_size) {
return first_node.acquire();
}
}
return null;
}

pub fn waitForReady(self: *Self, timeout_ms: u32) !void {
try self.is_ready.timedWait(timeout_ms * std.time.us_per_ms);
}
Expand Down
3 changes: 2 additions & 1 deletion src/MemorySegment.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const common = @import("common.zig");
const SearchResults = common.SearchResults;
const KeepOrDelete = common.KeepOrDelete;
const SegmentInfo = @import("segment.zig").SegmentInfo;
const SegmentStatus = @import("segment.zig").SegmentStatus;
const Item = @import("segment.zig").Item;

const Change = @import("change.zig").Change;
Expand All @@ -19,12 +20,12 @@ pub const Options = struct {};

allocator: std.mem.Allocator,
info: SegmentInfo = .{},
status: SegmentStatus = .{},
attributes: std.StringHashMapUnmanaged(u64) = .{},
docs: std.AutoHashMapUnmanaged(u32, bool) = .{},
min_doc_id: u32 = 0,
max_doc_id: u32 = 0,
items: std.ArrayListUnmanaged(Item) = .{},
frozen: bool = false,

pub fn init(allocator: std.mem.Allocator, opts: Options) Self {
_ = opts;
Expand Down
4 changes: 4 additions & 0 deletions src/segment.zig
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub const Item = packed struct(u64) {
}
};

pub const SegmentStatus = struct {
frozen: bool = false,
};

test "Item binary" {
try std.testing.expectEqual(8, @sizeOf(Item));
try std.testing.expectEqual(64, @bitSizeOf(Item));
Expand Down
63 changes: 56 additions & 7 deletions src/segment_list.zig
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ pub fn SegmentList(Segment: type) type {
};
}

fn getSegmentSize(comptime T: type) fn (SharedPtr(T)) usize {
fn getSizeFn(comptime T: type) fn (SharedPtr(T)) usize {
const tmp = struct {
fn getSize(segment: SharedPtr(T)) usize {
return segment.value.getSize();
Expand All @@ -202,17 +202,27 @@ fn getSegmentSize(comptime T: type) fn (SharedPtr(T)) usize {
return tmp.getSize;
}

fn isFrozenFn(comptime T: type) fn (SharedPtr(T)) bool {
const tmp = struct {
fn isFrozen(segment: SharedPtr(T)) bool {
return segment.value.status.frozen;
}
};
return tmp.isFrozen;
}

pub fn SegmentListManager(Segment: type) type {
return struct {
pub const Self = @This();
pub const List = SegmentList(Segment);
pub const MergePolicy = TieredMergePolicy(List.Node, getSegmentSize(Segment));
pub const MergePolicy = TieredMergePolicy(List.Node, getSizeFn(Segment), isFrozenFn(Segment));

options: Segment.Options,
segments: SharedPtr(List),
merge_policy: MergePolicy,
num_allowed_segments: std.atomic.Value(usize),
update_lock: std.Thread.Mutex,
status_update_lock: std.Thread.Mutex,

pub fn init(allocator: Allocator, options: Segment.Options, merge_policy: MergePolicy) !Self {
const segments = try SharedPtr(List).create(allocator, List.initEmpty());
Expand All @@ -222,6 +232,7 @@ pub fn SegmentListManager(Segment: type) type {
.merge_policy = merge_policy,
.num_allowed_segments = std.atomic.Value(usize).init(0),
.update_lock = .{},
.status_update_lock = .{},
};
}

Expand Down Expand Up @@ -249,16 +260,49 @@ pub fn SegmentListManager(Segment: type) type {
return self.segments.value.nodes.items.len > self.num_allowed_segments.load(.acquire);
}

pub fn prepareMerge(self: *Self, allocator: Allocator) !?Update {
pub fn prepareCheckpoint(self: *Self, allocator: Allocator) ?List.Node {
var segments = self.acquireSegments();
defer destroySegments(allocator, &segments);

self.num_allowed_segments.store(self.merge_policy.calculateBudget(segments.value.nodes.items), .release);
if (!self.needsMerge()) {
return null;
self.status_update_lock.lock();
defer self.status_update_lock.unlock();

if (segments.value.getFirst()) |node| {
if (node.value.status.frozen) {
return node.acquire();
}
}
return null;
}

const candidate = self.merge_policy.findSegmentsToMerge(segments.value.nodes.items) orelse return null;
pub fn prepareMerge(self: *Self, allocator: Allocator) !?Update {
var segments = self.acquireSegments();
defer destroySegments(allocator, &segments);

const candidate = blk: {
self.status_update_lock.lock();
defer self.status_update_lock.unlock();

// Check for a degenerate case:
// - we have a small segment in the front of list and then an oversized one right next to it
// - such a segment could never be merged
// - but it would also never be considered for checkpoint, so it would be stuck there, blocking checkpoints
if (segments.value.nodes.items.len >= 2) {
const node0 = segments.value.nodes.items[0];
const node1 = segments.value.nodes.items[1];

if (node1.value.status.frozen and !node0.value.status.frozen) {
node0.value.status.frozen = true;
}
}

self.num_allowed_segments.store(self.merge_policy.calculateBudget(segments.value.nodes.items), .release);
if (!self.needsMerge()) {
return null;
}

break :blk self.merge_policy.findSegmentsToMerge(segments.value.nodes.items) orelse return null;
};

var target = try List.createSegment(allocator, self.options);
defer List.destroySegment(allocator, &target);
Expand All @@ -274,6 +318,11 @@ pub fn SegmentListManager(Segment: type) type {
try target.value.merge(&merger);
errdefer target.value.cleanup();

if (target.value.getSize() > self.merge_policy.max_segment_size) {
// we can do this without a lock, because we are the only ones knowing about this new segment
target.value.status.frozen = true;
}

var update = try self.beginUpdate(allocator);
update.replaceMergedSegment(target);

Expand Down
37 changes: 34 additions & 3 deletions src/segment_merge_policy.zig
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,15 @@ pub const MergeCandidate = struct {
score: f64 = 0.0,
};

pub fn TieredMergePolicy(comptime Segment: type, comptime getSizeFn: anytype) type {
pub fn GetSizeFn(comptime S: type) type {
return fn (S) usize;
}

pub fn IsFrozenFn(comptime S: type) type {
return fn (S) bool;
}

pub fn TieredMergePolicy(comptime Segment: type, comptime getSizeFn: GetSizeFn(Segment), comptime isFrozenFn: IsFrozenFn(Segment)) type {
return struct {
max_segments: ?usize = null,

Expand All @@ -53,6 +61,9 @@ pub fn TieredMergePolicy(comptime Segment: type, comptime getSizeFn: anytype) ty
var num_oversized_segments: usize = 0;

for (segments) |segment| {
if (isFrozenFn(segment)) {
continue;
}
const size = getSizeFn(segment);
if (size > self.max_segment_size) {
num_oversized_segments += 1;
Expand Down Expand Up @@ -97,6 +108,10 @@ pub fn TieredMergePolicy(comptime Segment: type, comptime getSizeFn: anytype) ty

var start: usize = 0;
while (start + 1 < segments.len) : (start += 1) {
if (isFrozenFn(segments[start])) {
// Skip frozen segments
continue;
}
const start_size = getSizeFn(segments[start]);
if (start_size > self.max_segment_size) {
// Skip oversized segments that can't be further merged
Expand All @@ -110,10 +125,21 @@ pub fn TieredMergePolicy(comptime Segment: type, comptime getSizeFn: anytype) ty
};

while (candidate.end < segments.len) {
candidate.size += getSizeFn(segments[candidate.end]);
if (isFrozenFn(segments[candidate.end])) {
// Can't include frozen segments
break;
}
const size = getSizeFn(segments[candidate.end]);
if (size > self.max_segment_size) {
// Can't include oversized segments
continue;
}

candidate.size += size;
candidate.end += 1;

if (candidate.end - candidate.start > self.segments_per_merge or candidate.size > max_merge_size) {
// Too many seegments to merge in one pass, or the result would be too big
break;
}

Expand Down Expand Up @@ -170,6 +196,11 @@ const MockSegment = struct {
pub fn getSize(self: @This()) usize {
return self.size;
}

pub fn isFrozen(self: @This()) bool {
_ = self;
return false;
}
};

fn applyMerge(segments: *std.ArrayList(MockSegment), merge: MergeCandidate) !void {
Expand All @@ -183,7 +214,7 @@ test "TieredMergePolicy" {
var segments = std.ArrayList(MockSegment).init(std.testing.allocator);
defer segments.deinit();

const policy = TieredMergePolicy(MockSegment, MockSegment.getSize){
const policy = TieredMergePolicy(MockSegment, MockSegment.getSize, MockSegment.isFrozen){
.min_segment_size = 100,
.max_segment_size = 100000,
.segments_per_merge = 10,
Expand Down

0 comments on commit 92d3ce1

Please sign in to comment.