Skip to content

Commit

Permalink
Open index in the background
Browse files Browse the repository at this point in the history
  • Loading branch information
lalinsky committed Dec 7, 2024
1 parent 9ac89b2 commit f03252b
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 76 deletions.
135 changes: 81 additions & 54 deletions src/Index.zig
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,13 @@ dir: std.fs.Dir,

oplog: Oplog,

memory_segments: SegmentListManager(MemorySegment),
file_segments: SegmentListManager(FileSegment),
open_lock: std.Thread.Mutex = .{},
is_ready: std.atomic.Value(bool),
load_task: ?Scheduler.Task = null,

// These segments are owned by the index and can't be accessed without acquiring segments_lock.
// They can never be modified, only replaced.
segments_lock: std.Thread.RwLock = .{},

// These locks give partial access to the respective segments list.
// 1) For memory_segments, new segment can be appended to the list without this lock.
// 2) For file_segments, no write operation can happen without this lock.
// These lock can be only acquired before segments_lock, never after, to avoid deadlock situatons.
// They are mostly useful to allowing read access to segments during merge/checkpoint, without blocking real-time update.
file_segments_lock: std.Thread.Mutex = .{},
memory_segments_lock: std.Thread.Mutex = .{},

// Mutex used to control linearity of updates.
update_lock: std.Thread.Mutex = .{},
memory_segments: SegmentListManager(MemorySegment),
file_segments: SegmentListManager(FileSegment),

checkpoint_task: ?Scheduler.Task = null,
file_segment_merge_task: ?Scheduler.Task = null,
Expand Down Expand Up @@ -123,12 +113,17 @@ pub fn init(allocator: std.mem.Allocator, scheduler: *Scheduler, parent_dir: std
.segments_lock = .{},
.memory_segments = memory_segments,
.file_segments = file_segments,
.is_ready = std.atomic.Value(bool).init(false),
};
}

pub fn deinit(self: *Self) void {
log.info("closing index {}", .{@intFromPtr(self)});

if (self.load_task) |task| {
self.scheduler.destroyTask(task);
}

if (self.checkpoint_task) |task| {
self.scheduler.destroyTask(task);
}
Expand All @@ -148,36 +143,8 @@ pub fn deinit(self: *Self) void {
self.dir.close();
}

fn loadSegments(self: *Self, create: bool) !u64 {
self.segments_lock.lock();
defer self.segments_lock.unlock();

const segment_ids = filefmt.readManifestFile(self.dir, self.allocator) catch |err| {
if (err == error.FileNotFound) {
if (create) {
try self.updateManifestFile(self.file_segments.segments.value);
return 0;
}
return error.IndexNotFound;
}
return err;
};
defer self.allocator.free(segment_ids);
log.info("found {} segments in manifest", .{segment_ids.len});

try self.file_segments.segments.value.nodes.ensureTotalCapacity(self.allocator, segment_ids.len);
var last_commit_id: u64 = 0;
for (segment_ids, 1..) |segment_id, i| {
const node = try FileSegmentList.loadSegment(self.allocator, segment_id, .{ .dir = self.dir });
self.file_segments.segments.value.nodes.appendAssumeCapacity(node);
last_commit_id = node.value.info.getLastCommitId();
log.info("loaded segment {} ({}/{})", .{ last_commit_id, i, segment_ids.len });
}
return last_commit_id;
}

fn doCheckpoint(self: *Self) !bool {
var snapshot = self.acquireReader();
var snapshot = try self.acquireReader();
defer self.releaseReader(&snapshot);

const source = snapshot.memory_segments.value.getFirst() orelse return false;
Expand Down Expand Up @@ -234,7 +201,7 @@ fn doCheckpoint(self: *Self) !bool {
}

fn updateDocsMetrics(self: *Self) void {
var snapshot = self.acquireReader();
var snapshot = self.acquireReader() catch return;
defer self.releaseReader(&snapshot);

metrics.docs(self.name, snapshot.getNumDocs());
Expand Down Expand Up @@ -320,30 +287,81 @@ fn maybeMergeMemorySegments(self: *Self) !bool {
}

pub fn open(self: *Self, create: bool) !void {
const last_commit_id = try self.loadSegments(create);
self.open_lock.lock();
defer self.open_lock.unlock();

if (self.is_ready.load(.monotonic)) {
return;
}

if (self.load_task != null) {
return error.AlreadyOpening;
}

const manifest = filefmt.readManifestFile(self.dir, self.allocator) catch |err| {
if (err == error.FileNotFound) {
if (create) {
try self.updateManifestFile(self.file_segments.segments.value);
try self.load(&.{});
return;
}
return error.IndexNotFound;
}
return err;
};
errdefer self.allocator.free(manifest);

self.load_task = try self.scheduler.createTask(.medium, loadTask, .{ self, manifest });
self.scheduler.scheduleTask(self.load_task.?);
}

fn load(self: *Self, manifest: []SegmentInfo) !void {
defer self.allocator.free(manifest);

log.info("found {} segments in manifest", .{manifest.len});

try self.file_segments.segments.value.nodes.ensureTotalCapacity(self.allocator, manifest.len);
var last_commit_id: u64 = 0;
for (manifest, 1..) |segment_id, i| {
const node = try FileSegmentList.loadSegment(self.allocator, segment_id, .{ .dir = self.dir });
self.file_segments.segments.value.nodes.appendAssumeCapacity(node);
last_commit_id = node.value.info.getLastCommitId();
log.info("loaded segment {} ({}/{})", .{ last_commit_id, i, manifest.len });
}

self.checkpoint_task = try self.scheduler.createTask(.medium, checkpointTask, .{self});
self.memory_segment_merge_task = try self.scheduler.createTask(.high, memorySegmentMergeTask, .{self});
self.checkpoint_task = try self.scheduler.createTask(.medium, checkpointTask, .{self});
self.file_segment_merge_task = try self.scheduler.createTask(.low, fileSegmentMergeTask, .{self});

try self.oplog.open(last_commit_id + 1, updateInternal, self);
try self.oplog.open(1, updateInternal, self);

log.info("index loaded", .{});

self.is_ready.store(true, .monotonic);
}

fn loadTask(self: *Self, manifest: []SegmentInfo) void {
self.open_lock.lock();
defer self.open_lock.unlock();

self.load(manifest) catch |err| {
log.err("load failed: {}", .{err});
};
}

fn maybeScheduleMemorySegmentMerge(self: *Self) void {
if (self.memory_segments.needsMerge()) {
log.debug("too many memory segments, scheduling merging", .{});
if (self.memory_segment_merge_task) |task| {
log.debug("too many memory segments, scheduling merging", .{});
self.scheduler.scheduleTask(task);
}
}
}

fn maybeScheduleFileSegmentMerge(self: *Self) void {
if (self.file_segments.needsMerge()) {
log.debug("too many file segments, scheduling merging", .{});
if (self.file_segment_merge_task) |task| {
log.debug("too many file segments, scheduling merging", .{});
self.scheduler.scheduleTask(task);
}
}
Expand All @@ -352,8 +370,8 @@ fn maybeScheduleFileSegmentMerge(self: *Self) void {
fn maybeScheduleCheckpoint(self: *Self) void {
if (self.memory_segments.segments.value.getFirst()) |first_node| {
if (first_node.value.getSize() >= self.options.min_segment_size) {
log.debug("the first memory segment is too big, scheduling checkpoint", .{});
if (self.checkpoint_task) |task| {
log.debug("the first memory segment is too big, scheduling checkpoint", .{});
self.scheduler.scheduleTask(task);
}
}
Expand All @@ -372,11 +390,18 @@ fn readyForCheckpoint(self: *Self) ?MemorySegmentNode {
return null;
}

fn checkIfReady(self: Self) !void {
if (!self.is_ready.load(.monotonic)) {
return error.IndexNotReady;
}
}

pub fn update(self: *Self, changes: []const Change) !void {
try self.checkIfReady();
try self.updateInternal(changes, null);
}

pub fn updateInternal(self: *Self, changes: []const Change, commit_id: ?u64) !void {
fn updateInternal(self: *Self, changes: []const Change, commit_id: ?u64) !void {
var target = try MemorySegmentList.createSegment(self.allocator, .{});
defer MemorySegmentList.destroySegment(self.allocator, &target);

Expand All @@ -400,7 +425,9 @@ pub fn updateInternal(self: *Self, changes: []const Change, commit_id: ?u64) !vo
self.maybeScheduleCheckpoint();
}

pub fn acquireReader(self: *Self) IndexReader {
pub fn acquireReader(self: *Self) !IndexReader {
try self.checkIfReady();

self.segments_lock.lockShared();
defer self.segments_lock.unlockShared();

Expand All @@ -416,7 +443,7 @@ pub fn releaseReader(self: *Self, reader: *IndexReader) void {
}

pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, deadline: Deadline) !SearchResults {
var reader = self.acquireReader();
var reader = try self.acquireReader();
defer self.releaseReader(&reader);

return reader.search(hashes, allocator, deadline);
Expand Down
24 changes: 5 additions & 19 deletions src/MultiIndex.zig
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ pub const IndexRef = struct {
name: []const u8,
references: usize = 0,
last_used_at: i64 = std.math.minInt(i64),
is_open: bool = false,
lock: std.Thread.Mutex = .{},

pub fn deinit(self: *IndexRef, allocator: std.mem.Allocator) void {
allocator.free(self.name);
Expand All @@ -31,16 +29,6 @@ pub const IndexRef = struct {
self.last_used_at = std.time.milliTimestamp();
return self.references == 0;
}

pub fn ensureOpen(self: *IndexRef, create: bool) !void {
self.lock.lock();
defer self.lock.unlock();

if (self.is_open) return;

try self.index.open(create);
self.is_open = true;
}
};

lock: std.Thread.Mutex = .{},
Expand Down Expand Up @@ -128,7 +116,7 @@ pub fn releaseIndex(self: *Self, index: *Index) void {
self.releaseIndexRef(@fieldParentPtr("index", index));
}

fn acquireIndex(self: *Self, name: []const u8) !*IndexRef {
fn acquireIndex(self: *Self, name: []const u8, create: bool) !*IndexRef {
if (!isValidName(name)) {
return error.InvalidIndexName;
}
Expand All @@ -152,26 +140,24 @@ fn acquireIndex(self: *Self, name: []const u8) !*IndexRef {
};
errdefer result.value_ptr.index.deinit();

try result.value_ptr.index.open(create);

result.value_ptr.incRef();
return result.value_ptr;
}

pub fn getIndex(self: *Self, name: []const u8) !*Index {
const index_ref = try self.acquireIndex(name);
const index_ref = try self.acquireIndex(name, false);
errdefer self.releaseIndexRef(index_ref);

try index_ref.ensureOpen(false);

return &index_ref.index;
}

pub fn createIndex(self: *Self, name: []const u8) !void {
log.info("creating index {s}", .{name});

const index_ref = try self.acquireIndex(name);
const index_ref = try self.acquireIndex(name, true);
defer self.releaseIndexRef(index_ref);

try index_ref.ensureOpen(true);
}

pub fn deleteIndex(self: *Self, name: []const u8) !void {
Expand Down
1 change: 1 addition & 0 deletions src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub fn main() !void {
log.info("using {} threads", .{threads});

try metrics.initializeMetrics(allocator, .{ .prefix = "aindex_" });
defer metrics.deinitMetrics();

var scheduler = Scheduler.init(allocator);
defer scheduler.deinit();
Expand Down
4 changes: 4 additions & 0 deletions src/metrics.zig
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ pub fn initializeMetrics(allocator: std.mem.Allocator, comptime opts: m.Registry
};
}

pub fn deinitMetrics() void {
metrics.docs.deinit();
}

pub fn writeMetrics(writer: anytype) !void {
return m.write(&metrics, writer);
}
6 changes: 3 additions & 3 deletions src/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ fn handleHeadFingerprint(ctx: *Context, req: *httpz.Request, res: *httpz.Respons
const index = try getIndex(ctx, req, res, false) orelse return;
defer releaseIndex(ctx, index);

var index_reader = index.acquireReader();
var index_reader = try index.acquireReader();
defer index.releaseReader(&index_reader);

const id = try getId(req, res, false) orelse return;
Expand All @@ -351,7 +351,7 @@ fn handleGetFingerprint(ctx: *Context, req: *httpz.Request, res: *httpz.Response
const index = try getIndex(ctx, req, res, true) orelse return;
defer releaseIndex(ctx, index);

var index_reader = index.acquireReader();
var index_reader = try index.acquireReader();
defer index.releaseReader(&index_reader);

const id = try getId(req, res, true) orelse return;
Expand Down Expand Up @@ -443,7 +443,7 @@ fn handleGetIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !voi
const index = try getIndex(ctx, req, res, true) orelse return;
defer releaseIndex(ctx, index);

var index_reader = index.acquireReader();
var index_reader = try index.acquireReader();
defer index.releaseReader(&index_reader);

const response = GetIndexResponse{
Expand Down
2 changes: 2 additions & 0 deletions src/utils/Scheduler.zig
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub fn createTask(self: *Self, priority: Priority, comptime func: anytype, args:
const closure = try self.allocator.create(Closure);
errdefer self.allocator.destroy(closure);

closure.arguments = args;

task.* = .{
.data = .{
.priority = priority,
Expand Down

0 comments on commit f03252b

Please sign in to comment.