Skip to content

Commit

Permalink
Merge pull request #390 from lf-lang/event-queue-refactoring
Browse files Browse the repository at this point in the history
Refactoring of event queue
  • Loading branch information
byeonggiljun authored Apr 11, 2024
2 parents ec06c93 + d10375d commit 64c3f01
Show file tree
Hide file tree
Showing 17 changed files with 308 additions and 543 deletions.
67 changes: 46 additions & 21 deletions core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,31 @@
#include "scheduler.h"
#endif

//////////////////
// Local functions, not intended for use outside this file.

/**
* @brief Callback function to determine whether two events have the same trigger.
* This function is used by event queue and recycle.
* Return 1 if the triggers are identical, 0 otherwise.
* @param event1 A pointer to an event.
* @param event2 A pointer to an event.
*/
static int event_matches(void* event1, void* event2) {
return (((event_t*)event1)->trigger == ((event_t*)event2)->trigger);
}

/**
* @brief Callback function to print information about an event.
* This function is used by event queue and recycle.
* @param element A pointer to an event.
*/
static void print_event(void* event) {
event_t* e = (event_t*)event;
LF_PRINT_DEBUG("tag: " PRINTF_TAG ", trigger: %p, token: %p", e->base.tag.time, e->base.tag.microstep,
(void*)e->trigger, (void*)e->token);
}

/**
* @brief Initialize the threaded part of the environment struct.
*/
Expand All @@ -58,6 +83,7 @@ static void environment_init_threaded(environment_t* env, int num_workers) {
(void)num_workers;
#endif
}

/**
* @brief Initialize the single-threaded-specific parts of the environment struct.
*/
Expand Down Expand Up @@ -127,18 +153,6 @@ static void environment_init_federated(environment_t* env, int num_is_present_fi
#endif
}

void environment_init_tags(environment_t* env, instant_t start_time, interval_t duration) {
env->current_tag = (tag_t){.time = start_time, .microstep = 0u};

tag_t stop_tag = FOREVER_TAG_INITIALIZER;
if (duration >= 0LL) {
// A duration has been specified. Calculate the stop time.
stop_tag.time = env->current_tag.time + duration;
stop_tag.microstep = 0;
}
env->stop_tag = stop_tag;
}

static void environment_free_threaded(environment_t* env) {
#if !defined(LF_SINGLE_THREADED)
free(env->thread_ids);
Expand Down Expand Up @@ -176,6 +190,9 @@ static void environment_free_federated(environment_t* env) {
#endif
}

//////////////////
// Functions defined in environment.h.

void environment_free(environment_t* env) {
free(env->name);
free(env->timer_triggers);
Expand All @@ -184,16 +201,27 @@ void environment_free(environment_t* env) {
free(env->reset_reactions);
free(env->is_present_fields);
free(env->is_present_fields_abbreviated);
pqueue_free(env->event_q);
pqueue_free(env->recycle_q);
pqueue_free(env->next_q);
pqueue_tag_free(env->event_q);
pqueue_tag_free(env->recycle_q);

environment_free_threaded(env);
environment_free_single_threaded(env);
environment_free_modes(env);
environment_free_federated(env);
}

void environment_init_tags(environment_t* env, instant_t start_time, interval_t duration) {
env->current_tag = (tag_t){.time = start_time, .microstep = 0u};

tag_t stop_tag = FOREVER_TAG_INITIALIZER;
if (duration >= 0LL) {
// A duration has been specified. Calculate the stop time.
stop_tag.time = env->current_tag.time + duration;
stop_tag.microstep = 0;
}
env->stop_tag = stop_tag;
}

int environment_init(environment_t* env, const char* name, int id, int num_workers, int num_timers,
int num_startup_reactions, int num_shutdown_reactions, int num_reset_reactions,
int num_is_present_fields, int num_modes, int num_state_resets, int num_watchdogs,
Expand Down Expand Up @@ -261,12 +289,9 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke
env->_lf_handle = 1;

// Initialize our priority queues.
env->event_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_reverse_order, get_event_time, get_event_position,
set_event_position, event_matches, print_event);
env->recycle_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, get_event_time, get_event_position,
set_event_position, event_matches, print_event);
env->next_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, get_event_time, get_event_position,
set_event_position, event_matches, print_event);
env->event_q = pqueue_tag_init_customize(INITIAL_EVENT_QUEUE_SIZE, pqueue_tag_compare, event_matches, print_event);
env->recycle_q =
pqueue_tag_init_customize(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, event_matches, print_event);

// Initialize functionality depending on target properties.
environment_init_threaded(env, num_workers);
Expand Down
28 changes: 11 additions & 17 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1255,9 +1255,6 @@ static void handle_provisional_tag_advance_grant() {
// (which it should be). Do not do this if the federate has not fully
// started yet.

instant_t dummy_event_time = PTAG.time;
microstep_t dummy_event_relative_microstep = PTAG.microstep;

if (lf_tag_compare(env->current_tag, PTAG) == 0) {
// The current tag can equal the PTAG if we are at the start time
// or if this federate has been able to advance time to the current
Expand All @@ -1281,22 +1278,18 @@ static void handle_provisional_tag_advance_grant() {
// Nothing more to do.
LF_MUTEX_UNLOCK(&env->mutex);
return;
} else if (PTAG.time == env->current_tag.time) {
// We now know env->current_tag < PTAG, but the times are equal.
// Adjust the microstep for scheduling the dummy event.
dummy_event_relative_microstep -= env->current_tag.microstep;
}
// We now know env->current_tag < PTAG.

if (dummy_event_time != FOREVER) {
// Schedule a dummy event at the specified time and (relative) microstep.
if (PTAG.time != FOREVER) {
// Schedule a dummy event at the specified tag.
LF_PRINT_DEBUG("At tag " PRINTF_TAG ", inserting into the event queue a dummy event "
"with time " PRINTF_TIME " and (relative) microstep " PRINTF_MICROSTEP ".",
env->current_tag.time - start_time, env->current_tag.microstep, dummy_event_time - start_time,
dummy_event_relative_microstep);
// Dummy event points to a NULL trigger and NULL real event.
event_t* dummy = _lf_create_dummy_events(env, NULL, dummy_event_time, NULL, dummy_event_relative_microstep);
pqueue_insert(env->event_q, dummy);
"with time " PRINTF_TIME " and microstep " PRINTF_MICROSTEP ".",
env->current_tag.time - start_time, env->current_tag.microstep, PTAG.time - start_time,
PTAG.microstep);
// Dummy event points to a NULL trigger.
event_t* dummy = _lf_create_dummy_events(env, PTAG);
pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)dummy);
}

LF_MUTEX_UNLOCK(&env->mutex);
Expand Down Expand Up @@ -2410,8 +2403,9 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply)
// Create a dummy event that will force this federate to advance time and subsequently
// enable progress for downstream federates. Increment the time by ADVANCE_MESSAGE_INTERVAL
// to prevent too frequent dummy events.
event_t* dummy = _lf_create_dummy_events(env, NULL, tag.time + ADVANCE_MESSAGE_INTERVAL, NULL, 0);
pqueue_insert(env->event_q, dummy);
tag_t dummy_event_tag = (tag_t){.time = tag.time + ADVANCE_MESSAGE_INTERVAL, .microstep = tag.microstep};
event_t* dummy = _lf_create_dummy_events(env, dummy_event_tag);
pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)dummy);
}

LF_PRINT_DEBUG("Inserted a dummy event for logical time " PRINTF_TIME ".", tag.time - lf_time_start());
Expand Down
25 changes: 7 additions & 18 deletions core/modal_models/modes.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

// Forward declaration of functions and variables supplied by reactor_common.c
void _lf_trigger_reaction(environment_t* env, reaction_t* reaction, int worker_number);
event_t* _lf_create_dummy_events(environment_t* env, trigger_t* trigger, instant_t time, event_t* next,
microstep_t offset);
event_t* _lf_create_dummy_events(environment_t* env, tag_t tag);

// ----------------------------------------------------------------------------

Expand Down Expand Up @@ -400,28 +399,17 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]
event->trigger != NULL) { // History transition to a different mode
// Remaining time that the event would have been waiting before mode was left
instant_t local_remaining_delay =
event->time -
event->base.tag.time -
(state->next_mode->deactivation_time != 0 ? state->next_mode->deactivation_time : lf_time_start());
tag_t current_logical_tag = env->current_tag;

// Reschedule event with original local delay
LF_PRINT_DEBUG("Modes: Re-enqueuing event with a suspended delay of " PRINTF_TIME
" (previous TTH: " PRINTF_TIME ", Mode suspended at: " PRINTF_TIME ").",
local_remaining_delay, event->time, state->next_mode->deactivation_time);
local_remaining_delay, event->base.tag.time, state->next_mode->deactivation_time);
tag_t schedule_tag = {.time = current_logical_tag.time + local_remaining_delay,
.microstep = (local_remaining_delay == 0 ? current_logical_tag.microstep + 1 : 0)};
_lf_schedule_at_tag(env, event->trigger, schedule_tag, event->token);

// Also schedule events stacked up in super dense time.
event_t* e = event;
while (e->next != NULL) {
schedule_tag.microstep++;
_lf_schedule_at_tag(env, e->next->trigger, schedule_tag, e->next->token);
event_t* tmp = e->next;
e = tmp->next;
// A fresh event was created by schedule, hence, recycle old one
lf_recycle_event(env, tmp);
}
}
// A fresh event was created by schedule, hence, recycle old one
lf_recycle_event(env, event);
Expand Down Expand Up @@ -490,7 +478,7 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]

// Retract all events from the event queue that are associated with now inactive modes
if (env->event_q != NULL) {
size_t q_size = pqueue_size(env->event_q);
size_t q_size = pqueue_tag_size(env->event_q);
if (q_size > 0) {
event_t** delayed_removal = (event_t**)calloc(q_size, sizeof(event_t*));
size_t delayed_removal_count = 0;
Expand All @@ -509,7 +497,7 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]
LF_PRINT_DEBUG("Modes: Pulling %zu events from the event queue to suspend them. %d events are now suspended.",
delayed_removal_count, _lf_suspended_events_num);
for (size_t i = 0; i < delayed_removal_count; i++) {
pqueue_remove(env->event_q, delayed_removal[i]);
pqueue_tag_remove(env->event_q, (pqueue_tag_element_t*)(delayed_removal[i]));
}

free(delayed_removal);
Expand All @@ -519,7 +507,8 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]
if (env->modes->triggered_reactions_request) {
// Insert a dummy event in the event queue for the next microstep to make
// sure startup/reset reactions (if any) are triggered as soon as possible.
pqueue_insert(env->event_q, _lf_create_dummy_events(env, NULL, env->current_tag.time, NULL, 1));
tag_t dummy_event_tag = (tag_t){.time = env->current_tag.time, .microstep = 1};
pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)_lf_create_dummy_events(env, dummy_event_tag));
}
}
}
Expand Down
18 changes: 6 additions & 12 deletions core/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ int next(environment_t* env) {
// Enter the critical section and do not leave until we have
// determined which tag to commit to and start invoking reactions for.
LF_CRITICAL_SECTION_ENTER(env);
event_t* event = (event_t*)pqueue_peek(env->event_q);
event_t* event = (event_t*)pqueue_tag_peek(env->event_q);
// pqueue_dump(event_q, event_q->prt);
// If there is no next event and -keepalive has been specified
// on the command line, then we will wait the maximum time possible.
Expand All @@ -231,24 +231,18 @@ int next(environment_t* env) {
lf_set_stop_tag(env, (tag_t){.time = env->current_tag.time, .microstep = env->current_tag.microstep + 1});
}
} else {
next_tag.time = event->time;
// Deduce the microstep
if (next_tag.time == env->current_tag.time) {
next_tag.microstep = env->current_tag.microstep + 1;
} else {
next_tag.microstep = 0;
}
next_tag = event->base.tag;
}

if (lf_is_tag_after_stop_tag(env, next_tag)) {
// Cannot process events after the stop tag.
next_tag = env->stop_tag;
}

LF_PRINT_LOG("Next event (elapsed) time is " PRINTF_TIME ".", next_tag.time - start_time);
LF_PRINT_LOG("Next event (elapsed) tag is " PRINTF_TAG ".", next_tag.time - start_time, next_tag.microstep);
// Wait until physical time >= event.time.
int finished_sleep = wait_until(env, next_tag.time);
LF_PRINT_LOG("Next event (elapsed) time is " PRINTF_TIME ".", next_tag.time - start_time);
LF_PRINT_LOG("Next event (elapsed) tag is " PRINTF_TAG ".", next_tag.time - start_time, next_tag.microstep);
if (finished_sleep != 0) {
LF_PRINT_DEBUG("***** wait_until was interrupted.");
// Sleep was interrupted. This could happen when a physical action
Expand All @@ -258,10 +252,10 @@ int next(environment_t* env) {
LF_CRITICAL_SECTION_EXIT(env);
return 1;
}
// Advance current time to match that of the first event on the queue.
// Advance current tag to match that of the first event on the queue.
// We can now leave the critical section. Any events that will be added
// to the queue asynchronously will have a later tag than the current one.
_lf_advance_logical_time(env, next_tag.time);
_lf_advance_tag(env, next_tag);

// Trigger shutdown reactions if appropriate.
if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) {
Expand Down
Loading

0 comments on commit 64c3f01

Please sign in to comment.