Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Functionallity for snapshot suggestion #185

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions include/raft.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,8 @@ enum raft_event_type {
RAFT_TIMEOUT, /* The timeout has expired. */
RAFT_SUBMIT, /* New entries have been submitted. */
RAFT_CATCH_UP, /* Start catching-up a server. */
RAFT_TRANSFER /* Start transferring leadership to another server. */
RAFT_TRANSFER, /* Start transferring leadership to another server. */
RAFT_SUGGEST_SNAPSHOT /* Make a snapshot attempt whenever possible */
};

/**
Expand Down Expand Up @@ -564,6 +565,7 @@ struct raft_update
#define RAFT_UPDATE_STATE 1 << 5
#define RAFT_UPDATE_COMMIT_INDEX 1 << 6
#define RAFT_UPDATE_TIMEOUT 1 << 7
#define RAFT_UPDATE_SUGGEST_SNAPSHOT 1 << 8

/**
* State codes.
Expand Down Expand Up @@ -631,6 +633,7 @@ struct raft; /* Forward declaration. */
struct raft_log *log; /* Cache on-disk log */ \
unsigned snapshot_threshold; /* N. of entries before snapshot */ \
unsigned snapshot_trailing; /* N. of entries to retain */ \
struct raft_suggest_snapshot *suggest_snapshot; /* Pending snapshot */ \
} legacy;
#endif /* RAFT__LEGACY_no */

Expand Down Expand Up @@ -1385,8 +1388,10 @@ struct raft_fsm
* has fired.
*/

struct raft_change; /* Forward declaration */
struct raft_transfer; /* Forward declaration */
/* Forward declarations */
struct raft_change;
struct raft_transfer;
struct raft_suggest_snapshot;

/**
* Bootstrap this raft instance using the given configuration. The instance
Expand Down Expand Up @@ -1600,6 +1605,32 @@ RAFT_API int raft_transfer(struct raft *r,
raft_id id,
raft_transfer_cb cb);

/**
* Asynchronous request to perform a snapshot.
*/
typedef void (*raft_suggest_snapshot_cb)(struct raft_suggest_snapshot *req, int status);
struct raft_suggest_snapshot
{
RAFT__REQUEST;
raft_suggest_snapshot_cb cb; /* User callback */
};

/**
* Suggest that the server performs a snapshot. This may or may not be
* possible depending on the state of the server. The result of the operation
* will be reported to the callback function.
*
* The callback is called with a status of either 0, indicating that a snapshot
* was taken, or with status #RAFT_CANCELED, indicating that the operation was
* not performed.
*
* If the function is called again before the callback has been called, the
* error #RAFT_BUSY will be returned.
*/
RAFT_API int raft_suggest_snapshot(struct raft *r,
struct raft_suggest_snapshot *req,
raft_suggest_snapshot_cb cb);

/**
* Return the index of the last entry that was applied to the local FSM.
*/
Expand Down
46 changes: 43 additions & 3 deletions src/legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ static int putSnapshot(struct legacyTakeSnapshot *req)
return rv;
}

static bool legacyShouldTakeSnapshot(const struct raft *r)
static bool legacyShouldTakeSnapshot(const struct raft *r, bool ignore_threshold)
{
/* We currently support only synchronous FSMs, where entries are applied
* synchronously as soon as we advance the commit index, so the two
Expand All @@ -569,7 +569,7 @@ static bool legacyShouldTakeSnapshot(const struct raft *r)

/* If we didn't reach the threshold yet, do nothing. */
if (r->commit_index - r->legacy.log->snapshot.last_index <
r->legacy.snapshot_threshold) {
r->legacy.snapshot_threshold && !ignore_threshold) {
return false;
}

Expand Down Expand Up @@ -1121,7 +1121,16 @@ static int legacyHandleEvent(struct raft *r,
r->legacy.step_cb(r);
}

if (legacyShouldTakeSnapshot(r)) {
if (update.flags & RAFT_UPDATE_SUGGEST_SNAPSHOT) {
assert(r->legacy.suggest_snapshot);
if (legacyShouldTakeSnapshot(r, true)) {
legacyTakeSnapshot(r);
r->legacy.suggest_snapshot->cb(r->legacy.suggest_snapshot, 0);
} else {
r->legacy.suggest_snapshot->cb(r->legacy.suggest_snapshot, RAFT_CANCELED);
}
r->legacy.suggest_snapshot = NULL;
} else if (legacyShouldTakeSnapshot(r, false)) {
legacyTakeSnapshot(r);
}

Expand Down Expand Up @@ -1581,6 +1590,37 @@ int raft_transfer(struct raft *r,
return rv;
}

int raft_suggest_snapshot(struct raft *r,
struct raft_suggest_snapshot *req,
raft_suggest_snapshot_cb cb)
{
struct raft_event event;
int rv;

if (r->legacy.suggest_snapshot) {
rv = RAFT_BUSY;
goto err;
}

event.time = r->io->time(r->io);
event.type = RAFT_SUGGEST_SNAPSHOT;

req->cb = cb;
r->legacy.suggest_snapshot = req;

rv = LegacyForwardToRaftIo(r, &event);
if (rv != 0) {
r->legacy.suggest_snapshot = NULL;
goto err;
}

return 0;

err:
assert(rv != 0);
return rv;
}

int raft_bootstrap(struct raft *r, const struct raft_configuration *conf)
{
int rv;
Expand Down
6 changes: 6 additions & 0 deletions src/raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ int raft_init(struct raft *r,
r->legacy.log = logInit();
r->legacy.snapshot_threshold = DEFAULT_SNAPSHOT_THRESHOLD;
r->legacy.snapshot_trailing = DEFAULT_SNAPSHOT_TRAILING;
r->legacy.suggest_snapshot = NULL;
if (r->legacy.log == NULL) {
goto err_after_address_alloc;
}
Expand Down Expand Up @@ -566,6 +567,11 @@ int raft_step(struct raft *r,
infof("transfer leadership to %llu", event->transfer.server_id);
rv = ClientTransfer(r, event->transfer.server_id);
break;
case RAFT_SUGGEST_SNAPSHOT:
infof("suggesting snapshot to be performed");
r->update->flags |= RAFT_UPDATE_SUGGEST_SNAPSHOT;
rv = 0;
break;
default:
rv = RAFT_INVALID;
break;
Expand Down
Loading