Skip to content

Commit

Permalink
Remove unnecessary LTC messages based on messages tags
Browse files Browse the repository at this point in the history
Only usage of LTC messages is removing tags from in_transit message
queue in the RTI. So, if each federate manages the delivered message
tag queue, they can effectively remove unnecessary LTCs.
  • Loading branch information
byeonggiljun committed Mar 3, 2024
1 parent e0ca671 commit 214a0c4
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 29 deletions.
3 changes: 3 additions & 0 deletions core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ int environment_init(
get_event_position, set_event_position, event_matches, print_event);
env->next_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, get_event_time,
get_event_position, set_event_position, event_matches, print_event);
#ifdef FEDERATED_CENTRALIZED
env->delivered_message_tag_q = pqueue_tag_init(INITIAL_EVENT_QUEUE_SIZE);
#endif

// Initialize functionality depending on target properties.
environment_init_threaded(env, num_workers);
Expand Down
56 changes: 28 additions & 28 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,10 @@ void notify_provisional_tag_advance_grant(scheduling_node_t *e, tag_t tag) {

void update_federate_next_event_tag_locked(uint16_t federate_id, tag_t next_event_tag) {
federate_info_t *fed = GET_FED_INFO(federate_id);
// tag_t min_in_transit_tag = pqueue_tag_peek_tag(fed->in_transit_message_tags);
// if (lf_tag_compare(min_in_transit_tag, next_event_tag) < 0) {
// next_event_tag = min_in_transit_tag;
// }
tag_t min_in_transit_tag = pqueue_tag_peek_tag(fed->in_transit_message_tags);
if (lf_tag_compare(min_in_transit_tag, next_event_tag) < 0) {
next_event_tag = min_in_transit_tag;
}
update_scheduling_node_next_event_tag_locked(&(fed->enclave), next_event_tag);
}

Expand Down Expand Up @@ -488,30 +488,30 @@ void handle_timed_message(federate_info_t *sending_federate, unsigned char *buff
"RTI failed to send message chunks.");
}

// // Record this in-transit message in federate's in-transit message queue.
// if (lf_tag_compare(fed->enclave.completed, intended_tag) < 0) {
// // Add a record of this message to the list of in-transit messages to this federate.
// pqueue_tag_insert_if_no_match(
// fed->in_transit_message_tags,
// intended_tag);
// LF_PRINT_DEBUG(
// "RTI: Adding a message with tag " PRINTF_TAG " to the list of in-transit messages for federate %d.",
// intended_tag.time - lf_time_start(),
// intended_tag.microstep,
// federate_id);
// } else {
// lf_print_error(
// "RTI: Federate %d has already completed tag " PRINTF_TAG
// ", but there is an in-transit message with tag " PRINTF_TAG " from federate %hu. "
// "This is going to cause an STP violation under centralized coordination.",
// federate_id,
// fed->enclave.completed.time - lf_time_start(),
// fed->enclave.completed.microstep,
// intended_tag.time - lf_time_start(),
// intended_tag.microstep,
// sending_federate->enclave.id);
// // FIXME: Drop the federate?
// }
// Record this in-transit message in federate's in-transit message queue.
if (lf_tag_compare(fed->enclave.completed, intended_tag) < 0) {
// Add a record of this message to the list of in-transit messages to this federate.
pqueue_tag_insert_if_no_match(
fed->in_transit_message_tags,
intended_tag);
LF_PRINT_DEBUG(
"RTI: Adding a message with tag " PRINTF_TAG " to the list of in-transit messages for federate %d.",
intended_tag.time - lf_time_start(),
intended_tag.microstep,
federate_id);
} else {
lf_print_error(
"RTI: Federate %d has already completed tag " PRINTF_TAG
", but there is an in-transit message with tag " PRINTF_TAG " from federate %hu. "
"This is going to cause an STP violation under centralized coordination.",
federate_id,
fed->enclave.completed.time - lf_time_start(),
fed->enclave.completed.microstep,
intended_tag.time - lf_time_start(),
intended_tag.microstep,
sending_federate->enclave.id);
// FIXME: Drop the federate?
}

// If the message tag is less than the most recently received NET from the federate,
// then update the federate's next event tag to match the message tag.
Expand Down
14 changes: 13 additions & 1 deletion core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,9 @@ static int handle_tagged_message(int* socket, int fed_id) {

LF_MUTEX_LOCK(&env->mutex);

#ifdef FEDERATED_CENTRALIZED
pqueue_tag_insert_if_no_match(env->delivered_message_tag_q, intended_tag);
#endif
action->trigger->physical_time_of_arrival = time_of_arrival;

// Create a token for the message
Expand Down Expand Up @@ -2269,15 +2272,24 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) {
}

void lf_latest_tag_complete(tag_t tag_to_send) {
environment_t *env;
_lf_get_environments(&env);
LF_MUTEX_LOCK(&env->mutex);
tag_t earliest_delievered_message_tag = pqueue_tag_peek_tag(env->delivered_message_tag_q);
pqueue_tag_remove_up_to(env->delivered_message_tag_q, tag_to_send);
LF_MUTEX_UNLOCK(&env->mutex);
int compare_with_earliest_delievered_message_tag = lf_tag_compare(earliest_delievered_message_tag, tag_to_send);
int compare_with_last_tag = lf_tag_compare(_fed.last_sent_LTC, tag_to_send);
if (compare_with_last_tag >= 0) {
if (compare_with_last_tag >= 0 || compare_with_earliest_delievered_message_tag > 0) {
return;
}

LF_PRINT_LOG("Sending Latest Tag Complete (LTC) " PRINTF_TAG " to the RTI.",
tag_to_send.time - start_time,
tag_to_send.microstep);
send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, tag_to_send);
_fed.last_sent_LTC = tag_to_send;

}

parse_rti_code_t lf_parse_rti_addr(const char* rti_addr) {
Expand Down
1 change: 1 addition & 0 deletions include/core/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ typedef struct environment_t {
#if defined(FEDERATED)
tag_t** _lf_intended_tag_fields;
int _lf_intended_tag_fields_size;
pqueue_tag_t* delivered_message_tag_q;
#endif // FEDERATED
#ifdef LF_ENCLAVES // TODO: Consider dropping #ifdef
enclave_info_t *enclave_info;
Expand Down
1 change: 1 addition & 0 deletions include/core/lf_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "modal_models/modes.h" // Modal model support
#include "utils/pqueue.h"
#include "utils/pqueue_tag.h"
#include "lf_token.h"
#include "tag.h"
#include "vector.h"
Expand Down

0 comments on commit 214a0c4

Please sign in to comment.