From fea2eed540d292b8b015b72163326d40ab4d6c82 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Mon, 15 Jul 2024 10:58:00 -0400 Subject: [PATCH 01/40] Support zero-delay cycles --- core/federated/RTI/rti_remote.c | 88 ++++++++++++++-- core/federated/federate.c | 105 ++++++++++++++++++-- include/core/federated/network/net_common.h | 27 ++++- 3 files changed, 197 insertions(+), 23 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 11b7d2f03..0e4616a69 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -289,6 +289,7 @@ static int create_rti_server(uint16_t port, socket_type_t socket_type) { * * In case there is already a grant for that federate, keep the soonest one. * FIXME: Is that correct? + * FIXME: Why not just add it to the queue? * * @param fed The federate. * @param tag The tag to grant. @@ -315,6 +316,7 @@ static void notify_grant_delayed(federate_info_t* fed, tag_t tag, bool is_provis } else { // FIXME: Decide what to do in this case... // TODO: do it! + // FIXME: Add to the queue? } LF_MUTEX_UNLOCK(&rti_mutex); } @@ -353,7 +355,7 @@ static int get_num_absent_upstream_transients(federate_info_t* fed) { int num_absent_upstream_transients = 0; for (int j = 0; j < fed->enclave.num_upstream; j++) { federate_info_t* upstream = GET_FED_INFO(fed->enclave.upstream[j]); - // Do Ignore this enclave if it no longer connected. + // Ignore this enclave if it no longer connected. if ((upstream->enclave.state == NOT_CONNECTED) && (upstream->is_transient)) { num_absent_upstream_transients++; } @@ -361,6 +363,65 @@ static int get_num_absent_upstream_transients(federate_info_t* fed) { return num_absent_upstream_transients; } +/** + * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified federate. + * + * This function assumes that the mutex lock is already held. + * @param destination The destination federate. + * @param disconnected The connected federate. + */ +static void send_upstream_connected_locked( + federate_info_t* destination, federate_info_t* connected) { + if (!connected->is_transient) { + // No need to send connected message for persistent federates. + return; + } + unsigned char buffer[MSG_TYPE_UPSTREAM_CONNECTED_LENGTH]; + buffer[0] = MSG_TYPE_UPSTREAM_CONNECTED; + encode_uint16(connected->enclave.id, &buffer[1]); + if (write_to_socket_close_on_error(&destination->socket, MSG_TYPE_UPSTREAM_CONNECTED_LENGTH, buffer)) { + lf_print_warning("RTI: Failed to send upstream connected message to federate %d.", + connected->enclave.id); + } +} + +/** + * @brief Send MSG_TYPE_UPSTREAM_DISCONNECTED to the specified federate. + * + * This function assumes that the mutex lock is already held. + * @param destination The destination federate. + * @param disconnected The disconnected federate. + */ +static void send_upstream_disconnected_locked( + federate_info_t* destination, federate_info_t* disconnected) { + unsigned char buffer[MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH]; + buffer[0] = MSG_TYPE_UPSTREAM_DISCONNECTED; + encode_uint16(disconnected->enclave.id, &buffer[1]); + if (write_to_socket_close_on_error(&destination->socket, MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH, buffer)) { + lf_print_warning("RTI: Failed to send upstream disconnected message to federate %d.", + disconnected->enclave.id); + } +} + +/** + * @brief Mark a federate as disconnected and inform downstream federates. + * @param e The enclave corresponding to the disconnected federate. + */ +static void notify_federate_disconnected(scheduling_node_t* e) { + e->state = NOT_CONNECTED; + // Notify downstream federates. Need to hold the mutex lock to do this. + LF_MUTEX_LOCK(&rti_mutex); + for (int j = 0; j < e->num_downstream; j++) { + federate_info_t* downstream = GET_FED_INFO(e->downstream[j]); + // Ignore this enclave if it no longer connected. + if (downstream->enclave.state != NOT_CONNECTED) { + // Notify the downstream enclave. + send_upstream_disconnected_locked(downstream, GET_FED_INFO(e->id)); + } + } + LF_MUTEX_UNLOCK(&rti_mutex); +} + /** * Notify a tag advance grant (TAG) message to the specified federate immediately. * @@ -385,7 +446,7 @@ static void notify_tag_advance_grant_immediate(scheduling_node_t* e, tag_t tag) // to fail. Consider a failure here a soft failure and update the federate's status. if (write_to_socket(((federate_info_t*)e)->socket, message_length, buffer)) { lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); - e->state = NOT_CONNECTED; + notify_federate_disconnected(e); } else { e->last_granted = tag; LF_PRINT_LOG("RTI sent to federate %d the tag advance grant (TAG) " PRINTF_TAG ".", e->id, tag.time - start_time, @@ -447,7 +508,7 @@ static void notify_provisional_tag_advance_grant_immediate(scheduling_node_t* e, // to fail. Consider a failure here a soft failure and update the federate's status. if (write_to_socket(((federate_info_t*)e)->socket, message_length, buffer)) { lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); - e->state = NOT_CONNECTED; + notify_federate_disconnected(e); } else { e->last_provisionally_granted = tag; LF_PRINT_LOG("RTI sent to federate %d the Provisional Tag Advance Grant (PTAG) " PRINTF_TAG ".", e->id, @@ -496,11 +557,12 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { } // Check if sending the tag advance grant needs to be delayed or not - // Delay is needed when a federate has, at least one, absent upstream transient + // Delay is needed when a federate has at least one absent upstream transient federate_info_t* fed = GET_FED_INFO(e->id); if (!fed->has_upstream_transient_federates) { notify_provisional_tag_advance_grant_immediate(e, tag); } else { + // FIXME: Isn't this check redundant? if (get_num_absent_upstream_transients(fed) > 0) { notify_grant_delayed(fed, tag, true); } else { @@ -1032,7 +1094,10 @@ void handle_address_ad(uint16_t federate_id) { * plus an offset. The federate will then receive identical federation_start_time * and federate_start_tag.time (the federate_start_tag.microstep will be 0). * If, however, the startup phase is passed, the federate will receive different - * values than sateted above. + * values than stated above. + * + * This will also notify federates downstream of my_fed that this federate is now + * connected. This is important when there are zero-delay cycles. * * @param my_fed the federate to send the start time to. * @param federation_start_time the federation start_time @@ -1062,6 +1127,12 @@ void send_start_tag(federate_info_t* my_fed, instant_t federation_start_time, ta my_fed->enclave.state = GRANTED; lf_cond_broadcast(&sent_start_time); LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d.", start_time, my_fed->enclave.id); + + // Notify downstream federates of this now connected transient. + for (int i = 0; i < my_fed->enclave.num_upstream; i++) { + send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.upstream[i]), my_fed); + } + LF_MUTEX_UNLOCK(&rti_mutex); } @@ -1367,7 +1438,7 @@ static void handle_federate_failed(federate_info_t* my_fed) { _lf_federate_reports_error = true; lf_print_error("RTI: Federate %d reports an error and has exited.", my_fed->enclave.id); - my_fed->enclave.state = NOT_CONNECTED; + notify_federate_disconnected(&my_fed->enclave); // Indicate that there will no further events from this federate. my_fed->enclave.next_event = FOREVER_TAG; @@ -1413,7 +1484,7 @@ static void handle_federate_resign(federate_info_t* my_fed) { lf_print("RTI: Federate %d has resigned.", my_fed->enclave.id); - my_fed->enclave.state = NOT_CONNECTED; + notify_federate_disconnected(&my_fed->enclave); // Indicate that there will no further events from this federate. my_fed->enclave.next_event = FOREVER_TAG; @@ -1460,7 +1531,7 @@ void* federate_info_thread_TCP(void* fed) { if (read_failed) { // Socket is closed lf_print_error("RTI: Socket to federate %d is closed. Exiting the thread.", my_fed->enclave.id); - my_fed->enclave.state = NOT_CONNECTED; + notify_federate_disconnected(&my_fed->enclave); my_fed->socket = -1; // FIXME: We need better error handling here, but do not stop execution here. break; @@ -2283,6 +2354,7 @@ void* lf_delayed_grants_thread(void* nothing) { federate_info_t* fed = GET_FED_INFO(next->fed_id); if (next->is_provisional) { notify_provisional_tag_advance_grant_immediate(&(fed->enclave), next->base.tag); + // FIXME: Send port absent notification to all federates downstream of absent federates. } else { notify_tag_advance_grant_immediate(&(fed->enclave), next->base.tag); } diff --git a/core/federated/federate.c b/core/federated/federate.c index 37ac7a4c1..e9cd57885 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -170,6 +170,8 @@ extern interval_t _lf_action_delay_table[]; extern size_t _lf_action_table_size; extern lf_action_base_t* _lf_zero_delay_cycle_action_table[]; extern size_t _lf_zero_delay_cycle_action_table_size; +extern uint16_t _lf_zero_delay_cycle_upstream_ids[]; +extern bool _lf_zero_delay_cycle_upstream_disconnected[]; extern reaction_t* network_input_reactions[]; extern size_t num_network_input_reactions; extern reaction_t* port_absent_reaction[]; @@ -195,7 +197,7 @@ static lf_action_base_t* action_for_port(int port_id) { /** * Update the last known status tag of all network input ports - * to the value of `tag`, unless that the provided `tag` is less + * to the value of `tag`, unless the provided `tag` is less * than the last_known_status_tag of the port. This is called when * a TAG signal is received from the RTI in centralized coordination. * If any update occurs, then this broadcasts on `lf_port_status_changed`. @@ -259,7 +261,7 @@ static void update_last_known_status_on_input_ports(tag_t tag) { * * @param env The top-level environment, whose mutex is assumed to be held. * @param tag The tag on which the latest status of the specified network input port is known. - * @param portID The port ID. + * @param port_id The port ID. */ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag, int port_id) { if (lf_tag_compare(tag, env->current_tag) < 0) @@ -292,13 +294,41 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag } /** - * Set the status of network port with id portID. + * @brief Update the last known status tag of a network input action. + * + * This function is similar to update_last_known_status_on_input_port, but + * it is called when a PTAG is granted and an upstream transient federate is not + * connected. It updates the last known status tag of the network input action + * so that it will not wait for a message or absent message from the upstream federate. + * + * This function assumes the caller holds the mutex on the top-level environment, + * and, if the tag actually increases, it broadcasts on `lf_port_status_changed`. + * + * @param env The top-level environment, whose mutex is assumed to be held. + * @param action The action associated with the network input port. + * @param tag The tag of the PTAG. + */ +static void update_last_known_status_on_action(environment_t* env, lf_action_base_t* action, tag_t tag) { + if (lf_tag_compare(tag, env->current_tag) < 0) tag = env->current_tag; + trigger_t* input_port_trigger = action->trigger; + if (lf_tag_compare(tag, input_port_trigger->last_known_status_tag) > 0) { + LF_PRINT_LOG("Updating the last known status tag of port for upstream absent transient federate from " + PRINTF_TAG " to " PRINTF_TAG ".", + input_port_trigger->last_known_status_tag.time - lf_time_start(), + input_port_trigger->last_known_status_tag.microstep, + tag.time - lf_time_start(), tag.microstep); + input_port_trigger->last_known_status_tag = tag; + } +} + +/** + * Set the status of network port with id port_id. * - * @param portID The network port ID + * @param port_id The network port ID * @param status The network port status (port_status_t) */ -static void set_network_port_status(int portID, port_status_t status) { - lf_action_base_t* network_input_port_action = action_for_port(portID); +static void set_network_port_status(int port_id, port_status_t status) { + lf_action_base_t* network_input_port_action = action_for_port(port_id); network_input_port_action->trigger->status = status; } @@ -690,7 +720,7 @@ static int handle_port_absent_message(int* socket, int fed_id) { tracepoint_federate_from_federate(receive_PORT_ABS, _lf_my_fed_id, fed_id, &intended_tag); } LF_PRINT_LOG("Handling port absent for tag " PRINTF_TAG " for port %hu of fed %d.", - intended_tag.time - lf_time_start(), intended_tag.microstep, port_id, fed_id); + intended_tag.time - lf_time_start(), intended_tag.microstep, port_id, _lf_my_fed_id); // Environment is always the one corresponding to the top-level scheduling enclave. environment_t* env; @@ -999,7 +1029,7 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { * a notification of this update, which may unblock whichever worker * thread is trying to advance time. * - * @note This function is very similar to handle_provisinal_tag_advance_grant() except that + * @note This function is very similar to handle_provisional_tag_advance_grant() except that * it sets last_TAG_was_provisional to false. */ static void handle_tag_advance_grant(void) { @@ -1213,7 +1243,8 @@ static void* update_ports_from_staa_offsets(void* args) { * * @note This function is similar to handle_tag_advance_grant() except that * it sets last_TAG_was_provisional to true and also it does not update the - * last known tag for input ports. + * last known tag for input ports unless there is an upstream federate that is + * disconnected. */ static void handle_provisional_tag_advance_grant() { // Environment is always the one corresponding to the top-level scheduling enclave. @@ -1250,6 +1281,12 @@ static void handle_provisional_tag_advance_grant() { env->current_tag.time - start_time, env->current_tag.microstep, _fed.last_TAG.time - start_time, _fed.last_TAG.microstep); + for (int i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + if (_lf_zero_delay_cycle_upstream_disconnected[i] == true) { + update_last_known_status_on_action(env, _lf_zero_delay_cycle_action_table[i], PTAG); + } + } + // Even if we don't modify the event queue, we need to broadcast a change // because we do not need to continue to wait for a TAG. lf_cond_broadcast(&env->event_q_changed); @@ -1486,6 +1523,44 @@ static void send_failed_signal() { */ static void handle_rti_failed_message(void) { exit(1); } +/** + * @brief Handle message from the RTI that an upstream federate has connected. + * + */ +static void handle_upstream_connected_message(void) { + size_t bytes_to_read = sizeof(uint16_t); + unsigned char buffer[bytes_to_read]; + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, + "Failed to read upstream connected message from RTI."); + uint16_t connected = extract_uint16(buffer); + lf_print("********* FIXME: Upstream %d connected *********\n", connected); + // Mark the upstream as connected. + for (int i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + if (_lf_zero_delay_cycle_upstream_ids[i] == connected) { + _lf_zero_delay_cycle_upstream_disconnected[i] = false; + } + } +} + +/** + * @brief Handle message from the RTI that an upstream federate has disconnected. + * + */ +static void handle_upstream_disconnected_message(void) { + size_t bytes_to_read = sizeof(uint16_t); + unsigned char buffer[bytes_to_read]; + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, + "Failed to read upstream disconnected message from RTI."); + uint16_t disconnected = extract_uint16(buffer); + lf_print("********* FIXME: Upstream %d disconnected *********\n", disconnected); + // Mark the upstream as disconnected. + for (int i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + if (_lf_zero_delay_cycle_upstream_ids[i] == disconnected) { + _lf_zero_delay_cycle_upstream_disconnected[i] = true; + } + } +} + /** * Thread that listens for TCP inputs from the RTI. * When messages arrive, this calls the appropriate handler. @@ -1562,6 +1637,12 @@ static void* listen_to_rti_TCP(void* args) { case MSG_TYPE_FAILED: handle_rti_failed_message(); break; + case MSG_TYPE_UPSTREAM_CONNECTED: + handle_upstream_connected_message(); + break; + case MSG_TYPE_UPSTREAM_DISCONNECTED: + handle_upstream_disconnected_message(); + break; case MSG_TYPE_CLOCK_SYNC_T1: case MSG_TYPE_CLOCK_SYNC_T4: lf_print_error("Federate %d received unexpected clock sync message from RTI on TCP socket.", _lf_my_fed_id); @@ -1993,8 +2074,12 @@ void lf_connect_to_rti(const char* hostname, int port) { } else if (response == MSG_TYPE_RESIGN) { lf_print_warning("RTI on port %d resigned. Will try again", uport); continue; + } else if (response == MSG_TYPE_UPSTREAM_CONNECTED) { + handle_upstream_connected_message(); + } else if (response == MSG_TYPE_UPSTREAM_DISCONNECTED) { + handle_upstream_disconnected_message(); } else { - lf_print_warning("RTI on port %d gave unexpect response %u. Will try again", uport, response); + lf_print_warning("RTI on port %d gave unexpected response %u. Will try again", uport, response); continue; } } diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index 7af7d939b..0bdeed6a3 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -671,6 +671,23 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define MSG_TYPE_FAILED 25 +///////////////////////////////////////////// +//// Transient federate support + +/** + * A message the informs a downstream federate that a federate upstream of it + * is connected. The next 2 bytes are the federate ID of the upstream federate. + */ +#define MSG_TYPE_UPSTREAM_CONNECTED 26 +#define MSG_TYPE_UPSTREAM_CONNECTED_LENGTH (1 + sizeof(uint16_t)) + +/** + * A message the informs a downstream federate that a federate upstream of it + * is no longer connected. The next 2 bytes are the federate ID of the upstream federate. + */ +#define MSG_TYPE_UPSTREAM_DISCONNECTED 27 +#define MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH (1 + sizeof(uint16_t)) + /** * As an answer to MSG_TYPE_TIMESTAMP, the RTI broadcasts to all persistent * federates, or sends to newly joining transient federate, a message of @@ -678,17 +695,17 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * together with the effective starting logical tag. The latter is useful for * transient federates. */ -#define MSG_TYPE_TIMESTAMP_START 50 +#define MSG_TYPE_TIMESTAMP_START 28 #define MSG_TYPE_TIMESTAMP_START_LENGTH (1 + sizeof(instant_t) + sizeof(instant_t) + sizeof(microstep_t)) /** - * Byte sent by the RTI ordering the federate to stop. Upon receiving the meaasage, - * the federate will call lf_stop(), which will make him resign at its current_tag + * Byte sent by the RTI ordering the federate to stop. Upon receiving the message, + * the federate will call lf_stop(), which will make it resign at its current_tag * plus 1 microstep. - * The next 8 bytes will be the time at which the federates will stop. * + * The next 8 bytes will be the time at which the federates will stop. * The next 4 bytes will be the microstep at which the federates will stop.. */ -#define MSG_TYPE_STOP 30 +#define MSG_TYPE_STOP 29 #define MSG_TYPE_STOP_LENGTH 1 ///////////////////////////////////////////// From 7fd3ce5826449174f11e85f5e6a4a824f5a58e5a Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Mon, 15 Jul 2024 17:01:48 +0100 Subject: [PATCH 02/40] Fix the type of _lf_zero_delay_cycle_action_table_size iterator to be size_t + automatic formatter --- core/federated/federate.c | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index e9cd57885..e5b53d29d 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -300,7 +300,7 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag * it is called when a PTAG is granted and an upstream transient federate is not * connected. It updates the last known status tag of the network input action * so that it will not wait for a message or absent message from the upstream federate. - * + * * This function assumes the caller holds the mutex on the top-level environment, * and, if the tag actually increases, it broadcasts on `lf_port_status_changed`. * @@ -309,14 +309,14 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag * @param tag The tag of the PTAG. */ static void update_last_known_status_on_action(environment_t* env, lf_action_base_t* action, tag_t tag) { - if (lf_tag_compare(tag, env->current_tag) < 0) tag = env->current_tag; + if (lf_tag_compare(tag, env->current_tag) < 0) + tag = env->current_tag; trigger_t* input_port_trigger = action->trigger; if (lf_tag_compare(tag, input_port_trigger->last_known_status_tag) > 0) { - LF_PRINT_LOG("Updating the last known status tag of port for upstream absent transient federate from " - PRINTF_TAG " to " PRINTF_TAG ".", - input_port_trigger->last_known_status_tag.time - lf_time_start(), - input_port_trigger->last_known_status_tag.microstep, - tag.time - lf_time_start(), tag.microstep); + LF_PRINT_LOG("Updating the last known status tag of port for upstream absent transient federate from " PRINTF_TAG + " to " PRINTF_TAG ".", + input_port_trigger->last_known_status_tag.time - lf_time_start(), + input_port_trigger->last_known_status_tag.microstep, tag.time - lf_time_start(), tag.microstep); input_port_trigger->last_known_status_tag = tag; } } @@ -1281,7 +1281,7 @@ static void handle_provisional_tag_advance_grant() { env->current_tag.time - start_time, env->current_tag.microstep, _fed.last_TAG.time - start_time, _fed.last_TAG.microstep); - for (int i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { if (_lf_zero_delay_cycle_upstream_disconnected[i] == true) { update_last_known_status_on_action(env, _lf_zero_delay_cycle_action_table[i], PTAG); } @@ -1525,7 +1525,7 @@ static void handle_rti_failed_message(void) { exit(1); } /** * @brief Handle message from the RTI that an upstream federate has connected. - * + * */ static void handle_upstream_connected_message(void) { size_t bytes_to_read = sizeof(uint16_t); @@ -1535,7 +1535,7 @@ static void handle_upstream_connected_message(void) { uint16_t connected = extract_uint16(buffer); lf_print("********* FIXME: Upstream %d connected *********\n", connected); // Mark the upstream as connected. - for (int i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { if (_lf_zero_delay_cycle_upstream_ids[i] == connected) { _lf_zero_delay_cycle_upstream_disconnected[i] = false; } @@ -1544,7 +1544,7 @@ static void handle_upstream_connected_message(void) { /** * @brief Handle message from the RTI that an upstream federate has disconnected. - * + * */ static void handle_upstream_disconnected_message(void) { size_t bytes_to_read = sizeof(uint16_t); @@ -1554,7 +1554,7 @@ static void handle_upstream_disconnected_message(void) { uint16_t disconnected = extract_uint16(buffer); lf_print("********* FIXME: Upstream %d disconnected *********\n", disconnected); // Mark the upstream as disconnected. - for (int i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { if (_lf_zero_delay_cycle_upstream_ids[i] == disconnected) { _lf_zero_delay_cycle_upstream_disconnected[i] = true; } From ac14760bc4fa3feb930073b858d46e1b6167821d Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Mon, 15 Jul 2024 16:30:48 -0400 Subject: [PATCH 03/40] Removed unnecessary parens --- core/federated/federate.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index e5b53d29d..130552eb6 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2848,7 +2848,7 @@ char* lf_get_federates_bin_directory() { bin_directory_defined = true; #endif if (bin_directory_defined) { - return (LF_FEDERATES_BIN_DIRECTORY); + return LF_FEDERATES_BIN_DIRECTORY; } return NULL; } From ec5b2d6e6b9f1ffcdfee739759d017f9e19849ee Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 17 Jul 2024 06:58:17 +0100 Subject: [PATCH 04/40] Run clang formatter --- core/federated/RTI/rti_remote.c | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 0e4616a69..f8bc39c47 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -365,13 +365,12 @@ static int get_num_absent_upstream_transients(federate_info_t* fed) { /** * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified federate. - * + * * This function assumes that the mutex lock is already held. * @param destination The destination federate. * @param disconnected The connected federate. */ -static void send_upstream_connected_locked( - federate_info_t* destination, federate_info_t* connected) { +static void send_upstream_connected_locked(federate_info_t* destination, federate_info_t* connected) { if (!connected->is_transient) { // No need to send connected message for persistent federates. return; @@ -380,26 +379,23 @@ static void send_upstream_connected_locked( buffer[0] = MSG_TYPE_UPSTREAM_CONNECTED; encode_uint16(connected->enclave.id, &buffer[1]); if (write_to_socket_close_on_error(&destination->socket, MSG_TYPE_UPSTREAM_CONNECTED_LENGTH, buffer)) { - lf_print_warning("RTI: Failed to send upstream connected message to federate %d.", - connected->enclave.id); + lf_print_warning("RTI: Failed to send upstream connected message to federate %d.", connected->enclave.id); } } /** * @brief Send MSG_TYPE_UPSTREAM_DISCONNECTED to the specified federate. - * + * * This function assumes that the mutex lock is already held. * @param destination The destination federate. * @param disconnected The disconnected federate. */ -static void send_upstream_disconnected_locked( - federate_info_t* destination, federate_info_t* disconnected) { +static void send_upstream_disconnected_locked(federate_info_t* destination, federate_info_t* disconnected) { unsigned char buffer[MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH]; buffer[0] = MSG_TYPE_UPSTREAM_DISCONNECTED; encode_uint16(disconnected->enclave.id, &buffer[1]); if (write_to_socket_close_on_error(&destination->socket, MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH, buffer)) { - lf_print_warning("RTI: Failed to send upstream disconnected message to federate %d.", - disconnected->enclave.id); + lf_print_warning("RTI: Failed to send upstream disconnected message to federate %d.", disconnected->enclave.id); } } @@ -1095,7 +1091,7 @@ void handle_address_ad(uint16_t federate_id) { * and federate_start_tag.time (the federate_start_tag.microstep will be 0). * If, however, the startup phase is passed, the federate will receive different * values than stated above. - * + * * This will also notify federates downstream of my_fed that this federate is now * connected. This is important when there are zero-delay cycles. * @@ -1127,12 +1123,12 @@ void send_start_tag(federate_info_t* my_fed, instant_t federation_start_time, ta my_fed->enclave.state = GRANTED; lf_cond_broadcast(&sent_start_time); LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d.", start_time, my_fed->enclave.id); - + // Notify downstream federates of this now connected transient. for (int i = 0; i < my_fed->enclave.num_upstream; i++) { send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.upstream[i]), my_fed); } - + LF_MUTEX_UNLOCK(&rti_mutex); } From 322dde61fd016435f1872d3231dd2dcc8221a16b Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 17 Jul 2024 09:03:12 -0400 Subject: [PATCH 05/40] Fixed thread interactions --- core/federated/RTI/rti_remote.c | 145 +++++++++++++------------------- core/federated/federate.c | 27 +++--- 2 files changed, 76 insertions(+), 96 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index f8bc39c47..d45be2752 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -40,13 +40,13 @@ extern instant_t start_time; static rti_remote_t* rti_remote; // Referance to the federate instance to support hot swap -federate_info_t* hot_swap_federate; +static federate_info_t* hot_swap_federate; // Indicates if a hot swap process is in progress -bool hot_swap_in_progress = false; +static bool hot_swap_in_progress = false; // Indicates that the old federate has stopped. -bool hot_swap_old_resigned = false; +static bool hot_swap_old_resigned = false; bool _lf_federate_reports_error = false; @@ -282,24 +282,15 @@ static int create_rti_server(uint16_t port, socket_type_t socket_type) { } /** - * @brief Insert the delayed grant into the delayed_grants queue. - * - * The insertion will cause the broadcast to cause the delayed_grants_thread to - * account for the update. - * - * In case there is already a grant for that federate, keep the soonest one. - * FIXME: Is that correct? - * FIXME: Why not just add it to the queue? - * + * @brief Insert the delayed grant into the delayed_grants queue and notify. + * + * This function assumes the caller holds the rti_mutex. * @param fed The federate. * @param tag The tag to grant. * @param is_provisional State whther the grant is provisional. */ static void notify_grant_delayed(federate_info_t* fed, tag_t tag, bool is_provisional) { - // Check wether there is already a pending grant, - // and check the pending provisional grant as well - // Iterate over the - LF_MUTEX_LOCK(&rti_mutex); + // Check wether there is already a pending grant. pqueue_delayed_grant_element_t* dge = pqueue_delayed_grants_find_by_fed_id(rti_remote->delayed_grants, fed->enclave.id); if (dge == NULL) { @@ -311,39 +302,29 @@ static void notify_grant_delayed(federate_info_t* fed, tag_t tag, bool is_provis dge->is_provisional = is_provisional; pqueue_delayed_grants_insert(rti_remote->delayed_grants, dge); LF_PRINT_LOG("RTI: Inserting a delayed grant of " PRINTF_TAG " for federate %d.", dge->base.tag.time - start_time, - dge->base.tag.microstep, dge->fed_id); + dge->base.tag.microstep, dge->fed_id); lf_cond_signal(&updated_delayed_grants); } else { - // FIXME: Decide what to do in this case... - // TODO: do it! - // FIXME: Add to the queue? - } - LF_MUTEX_UNLOCK(&rti_mutex); -} - -/** - * @brief Cancel a delayed grant by removing it from delayed_grants queue. - * - * The removal will cause the broadcast to cause the delayed_grants_thread to - * account for the update. - * - * In case there is already a grant for that federte, keep the soonest one. - * FIXME: Is that correct? - * - * @param fed The federate. - */ - -void notify_grant_canceled(federate_info_t* fed) { - LF_MUTEX_LOCK(&rti_mutex); - pqueue_delayed_grant_element_t* dge = - pqueue_delayed_grants_find_by_fed_id(rti_remote->delayed_grants, fed->enclave.id); - if (dge != NULL) { - pqueue_delayed_grants_remove(rti_remote->delayed_grants, dge); - LF_PRINT_LOG("RTI: Canceling the delayed grant of " PRINTF_TAG " for federate %d.", dge->base.tag.time - start_time, - dge->base.tag.microstep, dge->fed_id); - lf_cond_signal(&updated_delayed_grants); + // Note that there should never be more than one pending grant for a federate. + int compare = lf_tag_compare(dge->base.tag, tag); + if (compare > 0) { + // Update the pre-existing grant. + dge->base.tag = tag; + dge->is_provisional = is_provisional; + LF_PRINT_LOG("RTI: Updating a delayed grant of " PRINTF_TAG " for federate %d.", tag.time - start_time, + tag.microstep, dge->fed_id); + lf_cond_signal(&updated_delayed_grants); + } else if (compare == 0) { + if (dge->is_provisional != is_provisional) { + // Update the grant to keep the most recent is_provisional status. + dge->is_provisional = is_provisional; + LF_PRINT_LOG("RTI: Changing status of a delayed grant of " PRINTF_TAG " for federate %d to provisional: %d.", + dge->base.tag.time - start_time, + dge->base.tag.microstep, dge->fed_id, + is_provisional); + } + } } - LF_MUTEX_UNLOCK(&rti_mutex); } /** @@ -1183,8 +1164,8 @@ void handle_timestamp(federate_info_t* my_fed) { my_fed->effective_start_tag = (tag_t){.time = start_time, .microstep = 0u}; send_start_tag(my_fed, start_time, my_fed->effective_start_tag); } else if (rti_remote->phase == shutdown_phase) { - // Do not answer the federate if the federation is in hsutdown phase - // Or maybe send and error message? + // Do not answer the federate if the federation is in sutdown phase. + // FIXME: Or maybe send and error message? LF_MUTEX_UNLOCK(&rti_mutex); return; } else { // The federation is the execution phase @@ -1260,7 +1241,7 @@ void handle_timestamp(federate_info_t* my_fed) { } // Check the pending grants, if any, and keep it only if it is - // sonner than the effective start tag + // sooner than the effective start tag pqueue_delayed_grant_element_t* dge = pqueue_delayed_grants_find_by_fed_id(rti_remote->delayed_grants, downstream->enclave.id); if (dge != NULL && lf_tag_compare(dge->base.tag, my_fed->effective_start_tag) > 0) { @@ -2263,9 +2244,8 @@ void* lf_connect_to_transient_federates_thread(void* nothing) { // Wait for the old federate to send MSG_TYPE_RESIGN LF_PRINT_LOG("RTI: Waiting for old federate %d to send resign.", fed_id); - // FIXME: Should this have a timeout? - while (!hot_swap_old_resigned) - ; + // FIXME: This is a busy wait! Need instead a lf_cond_wait on a condition variable. + while (!hot_swap_old_resigned) {} // The latest LTC is the tag at which the old federate resigned. This is useful // for computing the effective_start_time of the new joining federate. @@ -2328,47 +2308,40 @@ void* lf_connect_to_transient_federates_thread(void* nothing) { */ void* lf_delayed_grants_thread(void* nothing) { initialize_lf_thread_id(); - - // Wait for the first condition signal - lf_cond_wait(&updated_delayed_grants); - - while (true) { - if (rti_remote->all_federates_exited) { - break; - } - if (pqueue_delayed_grants_size(rti_remote->delayed_grants) != 0) { - pqueue_delayed_grant_element_t* next; - - // Do not pop, but rather read - next = pqueue_delayed_grants_peek(rti_remote->delayed_grants); + // Hold the mutex except while waiting. + LF_MUTEX_LOCK(&rti_mutex); + while (!rti_remote->all_federates_exited) { + if (pqueue_delayed_grants_size(rti_remote->delayed_grants) > 0) { + // Do not pop, but rather peek. + pqueue_delayed_grant_element_t* next = pqueue_delayed_grants_peek(rti_remote->delayed_grants); instant_t next_time = next->base.tag.time; // Wait for expiration, or a signal to stop or terminate. if (lf_clock_cond_timedwait(&updated_delayed_grants, next_time)) { - // Time reached to send the grant. Do it for delayed grants with the same tag - LF_MUTEX_LOCK(&rti_mutex); - next = pqueue_delayed_grants_pop(rti_remote->delayed_grants); - federate_info_t* fed = GET_FED_INFO(next->fed_id); - if (next->is_provisional) { - notify_provisional_tag_advance_grant_immediate(&(fed->enclave), next->base.tag); - // FIXME: Send port absent notification to all federates downstream of absent federates. - } else { - notify_tag_advance_grant_immediate(&(fed->enclave), next->base.tag); + // Time reached to send the grant. + // However, the grant may have been canceled while we were waiting. + pqueue_delayed_grant_element_t* new_next = pqueue_delayed_grants_peek(rti_remote->delayed_grants); + if (next == new_next) { + pqueue_delayed_grants_pop(rti_remote->delayed_grants); + federate_info_t* fed = GET_FED_INFO(next->fed_id); + if (next->is_provisional) { + notify_provisional_tag_advance_grant_immediate(&(fed->enclave), next->base.tag); + } else { + notify_tag_advance_grant_immediate(&(fed->enclave), next->base.tag); + } + free(next); } - LF_MUTEX_UNLOCK(&rti_mutex); - } else { - // Waiting was interrupted, because of an update in the queue, or - // because this thread needs to terminate - lf_print("RTI: lf_delayed_grants_thread() did not send grant to %d at " PRINTF_TIME ", but rather terminated!", - next->fed_id, next_time - start_time); } + } else if (pqueue_delayed_grants_size(rti_remote->delayed_grants) == 0) { + // Wait for something to appear on the queue. + lf_cond_wait(&updated_delayed_grants); } } - // The federation is at the shutdown phase. All persistent federates exited. - // We can do a sanity check that the delayed_grants queue is empty. - // FIXME: If there are still pending grants, what does that mean? Maybe that the - // federation stopped after a request to stop (not a timeout). Therefore, we need - // cleanup, and free the memory... - // TODO: do it! + // Free any delayed grants that are still on the queue. + while (pqueue_delayed_grants_size(rti_remote->delayed_grants) > 0) { + pqueue_delayed_grant_element_t* next = pqueue_delayed_grants_pop(rti_remote->delayed_grants); + free(next); + } + LF_MUTEX_UNLOCK(&rti_mutex); return NULL; } diff --git a/core/federated/federate.c b/core/federated/federate.c index 130552eb6..f072f3a7d 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1282,7 +1282,7 @@ static void handle_provisional_tag_advance_grant() { _fed.last_TAG.microstep); for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { - if (_lf_zero_delay_cycle_upstream_disconnected[i] == true) { + if (_lf_zero_delay_cycle_upstream_disconnected[i]) { update_last_known_status_on_action(env, _lf_zero_delay_cycle_action_table[i], PTAG); } } @@ -1533,7 +1533,7 @@ static void handle_upstream_connected_message(void) { read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, "Failed to read upstream connected message from RTI."); uint16_t connected = extract_uint16(buffer); - lf_print("********* FIXME: Upstream %d connected *********\n", connected); + LF_PRINT_DEBUG("Received notification that upstream federate %d has connected", connected); // Mark the upstream as connected. for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { if (_lf_zero_delay_cycle_upstream_ids[i] == connected) { @@ -1552,7 +1552,7 @@ static void handle_upstream_disconnected_message(void) { read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, "Failed to read upstream disconnected message from RTI."); uint16_t disconnected = extract_uint16(buffer); - lf_print("********* FIXME: Upstream %d disconnected *********\n", disconnected); + LF_PRINT_DEBUG("Received notification that upstream federate %d has disconnected", disconnected); // Mark the upstream as disconnected. for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { if (_lf_zero_delay_cycle_upstream_ids[i] == disconnected) { @@ -2800,13 +2800,20 @@ bool lf_update_max_level(tag_t tag, bool is_provisional) { _lf_action_delay_table[i])) <= 0)) { continue; } +#else + // For centralized coordination, if there is an upstream transient federate that is not + // connected, then we don't want to block on its action. + if (_lf_zero_delay_cycle_upstream_disconnected[i]) { + // Mark the action known up to and including the current tag. It is absent. + update_last_known_status_on_action(env, input_port_action, env->current_tag); + } #endif // FEDERATED_DECENTRALIZED - // If the current tag is greater than the last known status tag of the input port, - // and the input port is not physical, then block on that port by ensuring - // the MLAA is no greater than the level of that port. - // For centralized coordination, this is applied only to input ports coming from - // federates that are in a ZDC. For decentralized coordination, this is applied - // to all input ports. + // If the current tag is greater than the last known status tag of the input port, + // and the input port is not physical, then block on that port by ensuring + // the MLAA is no greater than the level of that port. + // For centralized coordination, this is applied only to input ports coming from + // federates that are in a ZDC. For decentralized coordination, this is applied + // to all input ports. if (lf_tag_compare(env->current_tag, input_port_action->trigger->last_known_status_tag) > 0 && !input_port_action->trigger->is_physical) { max_level_allowed_to_advance = @@ -2831,7 +2838,7 @@ void lf_stop() { lf_set_stop_tag(&env[i], new_stop_tag); - lf_print("Setting the stop tag of env %d to " PRINTF_TAG ".", i, env[i].stop_tag.time - start_time, + LF_PRINT_LOG("Setting the stop tag of env %d to " PRINTF_TAG ".", i, env[i].stop_tag.time - start_time, env[i].stop_tag.microstep); if (env[i].barrier.requestors) From da02bb627c06016bc2af37f48936aa729bebf7b5 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 17 Jul 2024 09:17:02 -0400 Subject: [PATCH 06/40] Format --- core/federated/RTI/rti_remote.c | 13 ++++++------- core/federated/federate.c | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index d45be2752..f8509f615 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -283,7 +283,7 @@ static int create_rti_server(uint16_t port, socket_type_t socket_type) { /** * @brief Insert the delayed grant into the delayed_grants queue and notify. - * + * * This function assumes the caller holds the rti_mutex. * @param fed The federate. * @param tag The tag to grant. @@ -302,7 +302,7 @@ static void notify_grant_delayed(federate_info_t* fed, tag_t tag, bool is_provis dge->is_provisional = is_provisional; pqueue_delayed_grants_insert(rti_remote->delayed_grants, dge); LF_PRINT_LOG("RTI: Inserting a delayed grant of " PRINTF_TAG " for federate %d.", dge->base.tag.time - start_time, - dge->base.tag.microstep, dge->fed_id); + dge->base.tag.microstep, dge->fed_id); lf_cond_signal(&updated_delayed_grants); } else { // Note that there should never be more than one pending grant for a federate. @@ -312,16 +312,14 @@ static void notify_grant_delayed(federate_info_t* fed, tag_t tag, bool is_provis dge->base.tag = tag; dge->is_provisional = is_provisional; LF_PRINT_LOG("RTI: Updating a delayed grant of " PRINTF_TAG " for federate %d.", tag.time - start_time, - tag.microstep, dge->fed_id); + tag.microstep, dge->fed_id); lf_cond_signal(&updated_delayed_grants); } else if (compare == 0) { if (dge->is_provisional != is_provisional) { // Update the grant to keep the most recent is_provisional status. dge->is_provisional = is_provisional; LF_PRINT_LOG("RTI: Changing status of a delayed grant of " PRINTF_TAG " for federate %d to provisional: %d.", - dge->base.tag.time - start_time, - dge->base.tag.microstep, dge->fed_id, - is_provisional); + dge->base.tag.time - start_time, dge->base.tag.microstep, dge->fed_id, is_provisional); } } } @@ -2245,7 +2243,8 @@ void* lf_connect_to_transient_federates_thread(void* nothing) { // Wait for the old federate to send MSG_TYPE_RESIGN LF_PRINT_LOG("RTI: Waiting for old federate %d to send resign.", fed_id); // FIXME: This is a busy wait! Need instead a lf_cond_wait on a condition variable. - while (!hot_swap_old_resigned) {} + while (!hot_swap_old_resigned) { + } // The latest LTC is the tag at which the old federate resigned. This is useful // for computing the effective_start_time of the new joining federate. diff --git a/core/federated/federate.c b/core/federated/federate.c index f072f3a7d..c4a021c68 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2839,7 +2839,7 @@ void lf_stop() { lf_set_stop_tag(&env[i], new_stop_tag); LF_PRINT_LOG("Setting the stop tag of env %d to " PRINTF_TAG ".", i, env[i].stop_tag.time - start_time, - env[i].stop_tag.microstep); + env[i].stop_tag.microstep); if (env[i].barrier.requestors) _lf_decrement_tag_barrier_locked(&env[i]); From e902d3ffa8238a3915a409f5dc47ddfc05df8e94 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Thu, 18 Jul 2024 14:09:25 +0100 Subject: [PATCH 07/40] Fix mutex access in lf_delayed_grants() --- core/federated/RTI/rti_remote.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index f8509f615..2d1b0d6e1 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -2307,17 +2307,19 @@ void* lf_connect_to_transient_federates_thread(void* nothing) { */ void* lf_delayed_grants_thread(void* nothing) { initialize_lf_thread_id(); - // Hold the mutex except while waiting. - LF_MUTEX_LOCK(&rti_mutex); + // Hold the mutex only when accessing rti_remote->delayed_grants pqueue while (!rti_remote->all_federates_exited) { if (pqueue_delayed_grants_size(rti_remote->delayed_grants) > 0) { // Do not pop, but rather peek. + LF_MUTEX_LOCK(&rti_mutex); pqueue_delayed_grant_element_t* next = pqueue_delayed_grants_peek(rti_remote->delayed_grants); instant_t next_time = next->base.tag.time; + LF_MUTEX_UNLOCK(&rti_mutex); // Wait for expiration, or a signal to stop or terminate. if (lf_clock_cond_timedwait(&updated_delayed_grants, next_time)) { // Time reached to send the grant. // However, the grant may have been canceled while we were waiting. + LF_MUTEX_LOCK(&rti_mutex); pqueue_delayed_grant_element_t* new_next = pqueue_delayed_grants_peek(rti_remote->delayed_grants); if (next == new_next) { pqueue_delayed_grants_pop(rti_remote->delayed_grants); @@ -2330,6 +2332,7 @@ void* lf_delayed_grants_thread(void* nothing) { free(next); } } + LF_MUTEX_UNLOCK(&rti_mutex); } else if (pqueue_delayed_grants_size(rti_remote->delayed_grants) == 0) { // Wait for something to appear on the queue. lf_cond_wait(&updated_delayed_grants); @@ -2340,7 +2343,6 @@ void* lf_delayed_grants_thread(void* nothing) { pqueue_delayed_grant_element_t* next = pqueue_delayed_grants_pop(rti_remote->delayed_grants); free(next); } - LF_MUTEX_UNLOCK(&rti_mutex); return NULL; } From c7ea64d1ee17583175f929d2fa29c5cc4b964510 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Thu, 18 Jul 2024 16:09:09 -0400 Subject: [PATCH 08/40] Made functions static and use free function for queue --- core/federated/RTI/rti_remote.c | 29 ++++++++++++++--------------- core/federated/RTI/rti_remote.h | 5 ----- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 2d1b0d6e1..7bc3015f3 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -73,7 +73,7 @@ extern int lf_critical_section_exit(environment_t* env) { return lf_mutex_unlock * @param nbr_delayed_grants The size. * @return The dynamically allocated queue or NULL. */ -pqueue_delayed_grants_t* pqueue_delayed_grants_init(uint16_t nbr_delayed_grants) { +static pqueue_delayed_grants_t* pqueue_delayed_grants_init(uint16_t nbr_delayed_grants) { return (pqueue_delayed_grants_t*)pqueue_tag_init((size_t)nbr_delayed_grants); } @@ -83,7 +83,7 @@ pqueue_delayed_grants_t* pqueue_delayed_grants_init(uint16_t nbr_delayed_grants) * @param q The queue. * @return The size. */ -size_t pqueue_delayed_grants_size(pqueue_delayed_grants_t* q) { return pqueue_tag_size((pqueue_tag_t*)q); } +static size_t pqueue_delayed_grants_size(pqueue_delayed_grants_t* q) { return pqueue_tag_size((pqueue_tag_t*)q); } /** * @brief Insert an\ delayed grant element into the queue. @@ -92,7 +92,7 @@ size_t pqueue_delayed_grants_size(pqueue_delayed_grants_t* q) { return pqueue_ta * @param e The delayed grant element to insert. * @return 0 on success */ -int pqueue_delayed_grants_insert(pqueue_delayed_grants_t* q, pqueue_delayed_grant_element_t* d) { +static int pqueue_delayed_grants_insert(pqueue_delayed_grants_t* q, pqueue_delayed_grant_element_t* d) { return pqueue_tag_insert((pqueue_tag_t*)q, (void*)d); } @@ -102,7 +102,7 @@ int pqueue_delayed_grants_insert(pqueue_delayed_grants_t* q, pqueue_delayed_gran * @param q The queue. * @return NULL on error, otherwise the entry */ -pqueue_delayed_grant_element_t* pqueue_delayed_grants_pop(pqueue_delayed_grants_t* q) { +static pqueue_delayed_grant_element_t* pqueue_delayed_grants_pop(pqueue_delayed_grants_t* q) { return (pqueue_delayed_grant_element_t*)pqueue_tag_pop((pqueue_tag_t*)q); } @@ -112,7 +112,7 @@ pqueue_delayed_grant_element_t* pqueue_delayed_grants_pop(pqueue_delayed_grants_ * @param q The queue. * @return NULL on if the queue is empty, otherwise the delayed grant element. */ -pqueue_delayed_grant_element_t* pqueue_delayed_grants_peek(pqueue_delayed_grants_t* q) { +static pqueue_delayed_grant_element_t* pqueue_delayed_grants_peek(pqueue_delayed_grants_t* q) { return (pqueue_delayed_grant_element_t*)pqueue_tag_peek((pqueue_tag_t*)q); } @@ -121,7 +121,7 @@ pqueue_delayed_grant_element_t* pqueue_delayed_grants_peek(pqueue_delayed_grants * * @param q The queue. */ -void pqueue_delayed_grants_free(pqueue_delayed_grants_t* q) { pqueue_tag_free((pqueue_tag_t*)q); } +static void pqueue_delayed_grants_free(pqueue_delayed_grants_t* q) { pqueue_tag_free((pqueue_tag_t*)q); } /** * @brief Remove an item from the delayed grants queue. @@ -129,7 +129,7 @@ void pqueue_delayed_grants_free(pqueue_delayed_grants_t* q) { pqueue_tag_free((p * @param q The queue. * @param e The entry to remove. */ -void pqueue_delayed_grants_remove(pqueue_delayed_grants_t* q, pqueue_delayed_grant_element_t* e) { +static void pqueue_delayed_grants_remove(pqueue_delayed_grants_t* q, pqueue_delayed_grant_element_t* e) { pqueue_tag_remove((pqueue_tag_t*)q, (void*)e); } @@ -140,7 +140,7 @@ void pqueue_delayed_grants_remove(pqueue_delayed_grants_t* q, pqueue_delayed_gra * @param fed_id The federate id. * @return An entry with the specified federate if or NULL if there isn't one. */ -pqueue_delayed_grant_element_t* pqueue_delayed_grants_find_by_fed_id(pqueue_delayed_grants_t* q, uint16_t fed_id) { +static pqueue_delayed_grant_element_t* pqueue_delayed_grants_find_by_fed_id(pqueue_delayed_grants_t* q, uint16_t fed_id) { pqueue_delayed_grant_element_t* dge; if (!q || q->size == 1) return NULL; @@ -2299,13 +2299,15 @@ void* lf_connect_to_transient_federates_thread(void* nothing) { } /** + * @brief Thread that manages the delayed grants using a priprity queue. + * * This thread is responsible for managing the priority queue of delayed grants to be issued. * It waits until the current time matches the highest priority tag time in the queue. * If reached, it notifies the grant immediately. If, however, the current time has not yet * reached the highest priority tag and the queue has been updated (either by inserting or * canceling an entry), the thread stops waiting and restarts the process again. */ -void* lf_delayed_grants_thread(void* nothing) { +static void* lf_delayed_grants_thread(void* nothing) { initialize_lf_thread_id(); // Hold the mutex only when accessing rti_remote->delayed_grants pqueue while (!rti_remote->all_federates_exited) { @@ -2331,18 +2333,15 @@ void* lf_delayed_grants_thread(void* nothing) { } free(next); } - } - LF_MUTEX_UNLOCK(&rti_mutex); + } + LF_MUTEX_UNLOCK(&rti_mutex); } else if (pqueue_delayed_grants_size(rti_remote->delayed_grants) == 0) { // Wait for something to appear on the queue. lf_cond_wait(&updated_delayed_grants); } } // Free any delayed grants that are still on the queue. - while (pqueue_delayed_grants_size(rti_remote->delayed_grants) > 0) { - pqueue_delayed_grant_element_t* next = pqueue_delayed_grants_pop(rti_remote->delayed_grants); - free(next); - } + pqueue_delayed_grants_free(rti_remote->delayed_grants); return NULL; } diff --git a/core/federated/RTI/rti_remote.h b/core/federated/RTI/rti_remote.h index df86803de..608f70eb2 100644 --- a/core/federated/RTI/rti_remote.h +++ b/core/federated/RTI/rti_remote.h @@ -409,11 +409,6 @@ void lf_connect_to_persistent_federates(int socket_descriptor); */ void* lf_connect_to_transient_federates_thread(void* nothing); -/** - * Thread that manages the delayed grants using a priprity queue. - */ -void* lf_delayed_grants_thread(void* nothing); - /** * Thread to respond to new connections, which could be federates of other * federations who are attempting to join the wrong federation. From 46abf8cc7d7e1e373acd095bf5311973390499cd Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 19 Jul 2024 09:42:08 -0400 Subject: [PATCH 09/40] Added max function --- core/utils/pqueue_tag.c | 11 +++++++++++ include/core/utils/pqueue_tag.h | 7 +++++++ test/general/utils/pqueue_test.c | 2 ++ 3 files changed, 20 insertions(+) diff --git a/core/utils/pqueue_tag.c b/core/utils/pqueue_tag.c index c1abe35ba..9a7491dd7 100644 --- a/core/utils/pqueue_tag.c +++ b/core/utils/pqueue_tag.c @@ -159,3 +159,14 @@ void pqueue_tag_remove_up_to(pqueue_tag_t* q, tag_t t) { } void pqueue_tag_dump(pqueue_tag_t* q) { pqueue_dump((pqueue_t*)q, pqueue_tag_print_element); } + +tag_t pqueue_tag_max_tag(pqueue_tag_t* q) { + tag_t result = NEVER_TAG; + for (int i = 1; i < q->size; i++) { + pqueue_tag_element_t* element = (pqueue_tag_element_t*)(q->d[i]); + if (lf_tag_compare(element->tag, result) > 0) { + result = element->tag; + } + } + return result; +} diff --git a/include/core/utils/pqueue_tag.h b/include/core/utils/pqueue_tag.h index e06e074be..dec61a5cd 100644 --- a/include/core/utils/pqueue_tag.h +++ b/include/core/utils/pqueue_tag.h @@ -216,4 +216,11 @@ void pqueue_tag_remove_up_to(pqueue_tag_t* q, tag_t t); */ void pqueue_tag_dump(pqueue_tag_t* q); +/** + * @brief Return the maximum tag in the queue or NEVER_TAG if the queue is empty. + * + * @param q The queue. + */ +tag_t pqueue_tag_max_tag(pqueue_tag_t* q); + #endif // PQUEUE_TAG_H diff --git a/test/general/utils/pqueue_test.c b/test/general/utils/pqueue_test.c index 665c4e13f..18b3009a8 100644 --- a/test/general/utils/pqueue_test.c +++ b/test/general/utils/pqueue_test.c @@ -23,6 +23,8 @@ static void insert_on_queue(pqueue_tag_t* q) { assert(!pqueue_tag_insert_tag(q, t2)); assert(!pqueue_tag_insert_tag(q, t3)); + assert(lf_tag_compare(pqueue_tag_max_tag(q), t1) == 0); + assert(!pqueue_tag_insert_if_no_match(q, t4)); assert(pqueue_tag_insert_if_no_match(q, t1)); assert(pqueue_tag_insert_if_no_match(q, t4)); From 8c85e1f51d83689d57fff4a0ef9f0908c816ff4d Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 19 Jul 2024 09:53:30 -0400 Subject: [PATCH 10/40] Revert to holding mutex plus general cleanups --- core/federated/RTI/rti_remote.c | 113 ++++++++++---------- core/federated/RTI/rti_remote.h | 8 +- include/core/federated/network/net_common.h | 31 ++---- 3 files changed, 74 insertions(+), 78 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 7bc3015f3..be7af2568 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -55,9 +55,9 @@ bool _lf_federate_reports_error = false; #define GET_FED_INFO(_idx) (federate_info_t*)rti_remote->base.scheduling_nodes[_idx] lf_mutex_t rti_mutex; -lf_cond_t received_start_times; -lf_cond_t sent_start_time; -lf_cond_t updated_delayed_grants; +static lf_cond_t received_start_times; +static lf_cond_t sent_start_time; +static lf_cond_t updated_delayed_grants; extern int lf_critical_section_enter(environment_t* env) { return lf_mutex_lock(&rti_mutex); } @@ -1126,74 +1126,72 @@ void handle_timestamp(federate_info_t* my_fed) { LF_MUTEX_LOCK(&rti_mutex); - // Processing the TIMESTAMP depends on whether it is the startup phase (all - // persistent federates joined) or not. - if (rti_remote->phase == - startup_phase) { // This is equivalent to: rti_remote->num_feds_proposed_start < (rti_remote->number_of_enclaves - - // rti_remote->number_of_transient_federates) + // Processing the TIMESTAMP depends on whether it is the startup phase. + if (rti_remote->phase == startup_phase) { + // Not all persistent federates have proposed a start time. if (timestamp > rti_remote->max_start_time) { rti_remote->max_start_time = timestamp; } - // Check that persistent federates did propose a start_time + // Note that if a transient federate's thread gets here during the startup phase, + // then it will be assigned the same global tag as its effective start tag and its + // timestamp will affect that start tag. if (!my_fed->is_transient) { rti_remote->num_feds_proposed_start++; } if (rti_remote->num_feds_proposed_start == (rti_remote->base.number_of_scheduling_nodes - rti_remote->number_of_transient_federates)) { - // All federates have proposed a start time. + // This federate is the last persistent federate to proposed a start time. lf_cond_broadcast(&received_start_times); rti_remote->phase = execution_phase; } else { - // Some federates have not yet proposed a start time. - // wait for a notification. + // Wait until all persistent federates have proposed a start time. while (rti_remote->num_feds_proposed_start < (rti_remote->base.number_of_scheduling_nodes - rti_remote->number_of_transient_federates)) { - // FIXME: Should have a timeout here? lf_cond_wait(&received_start_times); } } + // Add an offset to the maximum tag to get everyone starting together. + start_time = rti_remote->max_start_time + DELAY_START; + my_fed->effective_start_tag = (tag_t){.time = start_time, .microstep = 0u}; LF_MUTEX_UNLOCK(&rti_mutex); - // Send back to the federate the maximum time plus an offset on a TIMESTAMP - // message. - // Add an offset to this start time to get everyone starting together. - start_time = rti_remote->max_start_time + DELAY_START; - my_fed->effective_start_tag = (tag_t){.time = start_time, .microstep = 0u}; + // Notify the federate of its start tag. send_start_tag(my_fed, start_time, my_fed->effective_start_tag); - } else if (rti_remote->phase == shutdown_phase) { - // Do not answer the federate if the federation is in sutdown phase. - // FIXME: Or maybe send and error message? + } else if (rti_remote->phase == shutdown_phase || !my_fed->is_transient) { LF_MUTEX_UNLOCK(&rti_mutex); - return; - } else { // The federation is the execution phase - // A transient has joined after the startup phase - // At this point, we already hold the mutex - // This is rather a possible extreme corner case, where a transient sends its timestamp, and only - // enters the if section after all persistents have joined. - if (timestamp < start_time) { - timestamp = start_time; - } + // Send reject message if the federation is in shutdown phase or if + // it is in the execution phase but the federate is persistent. + send_reject(&my_fed->socket, JOINING_TOO_LATE); + return; + } else { + // The federation is transient and we are in the execution phase. + // At this point, we already hold the mutex. //// Algorithm for computing the effective_start_time of a joining transient // The effective_start_time will be the max among all the following tags: // 1. At tag: (joining time, 0 microstep) - // 2. The latest completed logical tag + 1 microstep - // 3. The latest granted (P)TAG + 1 microstep, of every downstream federate - // 4. The maximun tag of messages from the upstream federates + 1 microstep + // 2. (start_time, 1 microstep) + // 3. The latest completed logical tag + 1 microstep + // 4. The latest granted (P)TAG + 1 microstep, of every downstream federate + // 5. The maximun tag of messages from the upstream federates + 1 microstep // Condition 1. my_fed->effective_start_tag = (tag_t){.time = timestamp, .microstep = 0u}; // Condition 2. - // FIXME: Not sure if this corner case can happen, but better to be on the safe side. + if (timestamp < start_time) { + my_fed->effective_start_tag = (tag_t){.time = start_time, .microstep = 1u}; + } + + // Condition 3. if (lf_tag_compare(my_fed->enclave.completed, my_fed->effective_start_tag) >= 0) { my_fed->effective_start_tag = my_fed->enclave.completed; my_fed->effective_start_tag.microstep++; } - // Condition 3. Iterate over the downstream federates + // Condition 4. Iterate over the downstream federates for (int j = 0; j < my_fed->enclave.num_downstream; j++) { federate_info_t* downstream = GET_FED_INFO(my_fed->enclave.downstream[j]); @@ -1210,16 +1208,18 @@ void handle_timestamp(federate_info_t* my_fed) { } } - // Condition 4. Iterate over the messages from the upstream federates + // Condition 5. + // This one is a bit subtle. Any messages from upstream federates that the RTI has + // not yet seen will be sent to this joining federate after the effective_start_tag + // because the effective_start_tag is sent while still holding the mutex. + + // Iterate over the messages from the upstream federates for (int j = 0; j < my_fed->enclave.num_upstream; j++) { federate_info_t* upstream = GET_FED_INFO(my_fed->enclave.upstream[j]); - // Get the max over the TAG of the upstreams size_t queue_size = pqueue_tag_size(upstream->in_transit_message_tags); if (queue_size != 0) { - pqueue_t* pq = (pqueue_t*)(upstream->in_transit_message_tags); - pqueue_tag_element_t* message_with_max_tag = (pqueue_tag_element_t*)(pq->d[queue_size]); - tag_t max_tag = message_with_max_tag->tag; + tag_t max_tag = pqueue_tag_max_tag(upstream->in_transit_message_tags); if (lf_tag_compare(max_tag, my_fed->effective_start_tag) >= 0) { my_fed->effective_start_tag = max_tag; @@ -1228,8 +1228,8 @@ void handle_timestamp(federate_info_t* my_fed) { } } - // For every downstream that has a pending grant that is higher then the - // effective_start_time of the federate, cancel it + // For every downstream that has a pending grant that is higher than the + // effective_start_time of the federate, cancel it. for (int j = 0; j < my_fed->enclave.num_downstream; j++) { federate_info_t* downstream = GET_FED_INFO(my_fed->enclave.downstream[j]); @@ -1239,7 +1239,7 @@ void handle_timestamp(federate_info_t* my_fed) { } // Check the pending grants, if any, and keep it only if it is - // sooner than the effective start tag + // sooner than the effective start tag. pqueue_delayed_grant_element_t* dge = pqueue_delayed_grants_find_by_fed_id(rti_remote->delayed_grants, downstream->enclave.id); if (dge != NULL && lf_tag_compare(dge->base.tag, my_fed->effective_start_tag) > 0) { @@ -1247,13 +1247,14 @@ void handle_timestamp(federate_info_t* my_fed) { } } - LF_MUTEX_UNLOCK(&rti_mutex); - // Once the effective start time set, sent it to the joining transient, // together with the start time of the federation. - // Send the start time + // Have to send the start tag while still holding the mutex to ensure that no message + // from an upstream federate is forwarded before the start tag. send_start_tag(my_fed, start_time, my_fed->effective_start_tag); + + LF_MUTEX_UNLOCK(&rti_mutex); } } @@ -1580,11 +1581,11 @@ void* federate_info_thread_TCP(void* fed) { return NULL; } -void send_reject(int* socket_id, unsigned char error_code) { +void send_reject(int* socket_id, rejection_code_t error_code) { LF_PRINT_DEBUG("RTI sending MSG_TYPE_REJECT."); unsigned char response[2]; response[0] = MSG_TYPE_REJECT; - response[1] = error_code; + response[1] = (unsigned char)error_code; LF_MUTEX_LOCK(&rti_mutex); // NOTE: Ignore errors on this response. if (write_to_socket(*socket_id, 2, response)) { @@ -2309,19 +2310,18 @@ void* lf_connect_to_transient_federates_thread(void* nothing) { */ static void* lf_delayed_grants_thread(void* nothing) { initialize_lf_thread_id(); - // Hold the mutex only when accessing rti_remote->delayed_grants pqueue + // Hold the mutex when not waiting. + LF_MUTEX_LOCK(&rti_mutex); while (!rti_remote->all_federates_exited) { if (pqueue_delayed_grants_size(rti_remote->delayed_grants) > 0) { // Do not pop, but rather peek. - LF_MUTEX_LOCK(&rti_mutex); pqueue_delayed_grant_element_t* next = pqueue_delayed_grants_peek(rti_remote->delayed_grants); instant_t next_time = next->base.tag.time; - LF_MUTEX_UNLOCK(&rti_mutex); // Wait for expiration, or a signal to stop or terminate. - if (lf_clock_cond_timedwait(&updated_delayed_grants, next_time)) { + int ret = lf_clock_cond_timedwait(&updated_delayed_grants, next_time); + if (ret == LF_TIMEOUT) { // Time reached to send the grant. // However, the grant may have been canceled while we were waiting. - LF_MUTEX_LOCK(&rti_mutex); pqueue_delayed_grant_element_t* new_next = pqueue_delayed_grants_peek(rti_remote->delayed_grants); if (next == new_next) { pqueue_delayed_grants_pop(rti_remote->delayed_grants); @@ -2333,8 +2333,10 @@ static void* lf_delayed_grants_thread(void* nothing) { } free(next); } - } - LF_MUTEX_UNLOCK(&rti_mutex); + } else if (ret != 0) { + // An error occurred. + lf_print_error_and_exit("lf_delayed_grants_thread: lf_clock_cond_timedwait failed with code %d.", ret); + } } else if (pqueue_delayed_grants_size(rti_remote->delayed_grants) == 0) { // Wait for something to appear on the queue. lf_cond_wait(&updated_delayed_grants); @@ -2342,6 +2344,7 @@ static void* lf_delayed_grants_thread(void* nothing) { } // Free any delayed grants that are still on the queue. pqueue_delayed_grants_free(rti_remote->delayed_grants); + LF_MUTEX_UNLOCK(&rti_mutex); return NULL; } diff --git a/core/federated/RTI/rti_remote.h b/core/federated/RTI/rti_remote.h index 608f70eb2..d7c1b5fe7 100644 --- a/core/federated/RTI/rti_remote.h +++ b/core/federated/RTI/rti_remote.h @@ -83,7 +83,11 @@ typedef enum clock_sync_stat { clock_sync_off, clock_sync_init, clock_sync_on } /** * The federation life cycle phases. */ -typedef enum federation_life_cycle_phase { startup_phase, execution_phase, shutdown_phase } federation_life_cycle_phase; +typedef enum federation_life_cycle_phase { + startup_phase, // Not all persistent federates have joined. + execution_phase, // All persistent federates have joined. + shutdown_phase // Federation is shutting down. +} federation_life_cycle_phase; /** * @brief The type for an element in a delayed grants priority queue that is sorted by tag. @@ -391,7 +395,7 @@ void* federate_info_thread_TCP(void* fed); * @param socket_id Pointer to the socket ID. * @param error_code An error code. */ -void send_reject(int* socket_id, unsigned char error_code); +void send_reject(int* socket_id, rejection_code_t error_code); /** * Wait for one incoming connection request from each (persistent) federate, diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index 0bdeed6a3..062126b75 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -715,26 +715,15 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * These codes are sent in a MSG_TYPE_REJECT message. * They are limited to one byte (uchar). */ - -/** Federation ID does not match. */ -#define FEDERATION_ID_DOES_NOT_MATCH 1 - -/** Federate with the specified ID has already joined. */ -#define FEDERATE_ID_IN_USE 2 - -/** Federate ID out of range. */ -#define FEDERATE_ID_OUT_OF_RANGE 3 - -/** Incoming message is not expected. */ -#define UNEXPECTED_MESSAGE 4 - -/** Connected to the wrong server. */ -#define WRONG_SERVER 5 - -/** HMAC authentication failed. */ -#define HMAC_DOES_NOT_MATCH 6 - -/** RTI not executed using -a or --auth option. */ -#define RTI_NOT_EXECUTED_WITH_AUTH 7 +typedef enum { + FEDERATION_ID_DOES_NOT_MATCH = 1, + FEDERATE_ID_IN_USE = 2, + FEDERATE_ID_OUT_OF_RANGE = 3, + UNEXPECTED_MESSAGE = 4, + WRONG_SERVER = 5, + HMAC_DOES_NOT_MATCH = 6, + RTI_NOT_EXECUTED_WITH_AUTH = 7, + JOINING_TOO_LATE = 8 +} rejection_code_t; #endif /* NET_COMMON_H */ From 60282e52459f96f19ceefc7ccd6590c7d59ea608 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 19 Jul 2024 17:47:52 -0400 Subject: [PATCH 11/40] Hold mutex and notify downstream not upstream --- core/federated/RTI/rti_remote.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index be7af2568..f11c8218b 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -1073,12 +1073,14 @@ void handle_address_ad(uint16_t federate_id) { * * This will also notify federates downstream of my_fed that this federate is now * connected. This is important when there are zero-delay cycles. + * + * This function assumes the caller holds the mutex. * * @param my_fed the federate to send the start time to. * @param federation_start_time the federation start_time * @param federate_start_tag the federate effective start tag */ -void send_start_tag(federate_info_t* my_fed, instant_t federation_start_time, tag_t federate_start_tag) { +static void send_start_tag(federate_info_t* my_fed, instant_t federation_start_time, tag_t federate_start_tag) { // Send back to the federate the maximum time plus an offset on a TIMESTAMP_START // message. // In the startup phase, federates will receive identical start_time and @@ -1095,7 +1097,6 @@ void send_start_tag(federate_info_t* my_fed, instant_t federation_start_time, ta lf_print_error("Failed to send the starting time to federate %d.", my_fed->enclave.id); } - LF_MUTEX_LOCK(&rti_mutex); // Update state for the federate to indicate that the MSG_TYPE_TIMESTAMP // message has been sent. That MSG_TYPE_TIMESTAMP message grants time advance to // the federate to the start time. @@ -1104,11 +1105,9 @@ void send_start_tag(federate_info_t* my_fed, instant_t federation_start_time, ta LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d.", start_time, my_fed->enclave.id); // Notify downstream federates of this now connected transient. - for (int i = 0; i < my_fed->enclave.num_upstream; i++) { - send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.upstream[i]), my_fed); + for (int i = 0; i < my_fed->enclave.num_downstream; i++) { + send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.downstream[i]), my_fed); } - - LF_MUTEX_UNLOCK(&rti_mutex); } void handle_timestamp(federate_info_t* my_fed) { @@ -1154,10 +1153,11 @@ void handle_timestamp(federate_info_t* my_fed) { start_time = rti_remote->max_start_time + DELAY_START; my_fed->effective_start_tag = (tag_t){.time = start_time, .microstep = 0u}; - LF_MUTEX_UNLOCK(&rti_mutex); - // Notify the federate of its start tag. + // This has to be done while still holding the mutex. send_start_tag(my_fed, start_time, my_fed->effective_start_tag); + + LF_MUTEX_UNLOCK(&rti_mutex); } else if (rti_remote->phase == shutdown_phase || !my_fed->is_transient) { LF_MUTEX_UNLOCK(&rti_mutex); From 291d7fb421b7a33a3e8c0a8760b76d1e7dac3868 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 24 Jul 2024 15:44:47 +0100 Subject: [PATCH 12/40] Fix the pqueue tag iterator type --- core/utils/pqueue_tag.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/utils/pqueue_tag.c b/core/utils/pqueue_tag.c index 9a7491dd7..57e52ef5c 100644 --- a/core/utils/pqueue_tag.c +++ b/core/utils/pqueue_tag.c @@ -162,7 +162,7 @@ void pqueue_tag_dump(pqueue_tag_t* q) { pqueue_dump((pqueue_t*)q, pqueue_tag_pri tag_t pqueue_tag_max_tag(pqueue_tag_t* q) { tag_t result = NEVER_TAG; - for (int i = 1; i < q->size; i++) { + for (size_t i = 1; i < q->size; i++) { pqueue_tag_element_t* element = (pqueue_tag_element_t*)(q->d[i]); if (lf_tag_compare(element->tag, result) > 0) { result = element->tag; From 71a4cb6fba62da6de89be6fbbade9fdbf8081109 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 24 Jul 2024 15:50:10 +0100 Subject: [PATCH 13/40] Fix condition 2 for computing the effective start time of a transient + set the start time in he trace file --- core/federated/RTI/rti_remote.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index f11c8218b..2c232eb4c 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -140,7 +140,8 @@ static void pqueue_delayed_grants_remove(pqueue_delayed_grants_t* q, pqueue_dela * @param fed_id The federate id. * @return An entry with the specified federate if or NULL if there isn't one. */ -static pqueue_delayed_grant_element_t* pqueue_delayed_grants_find_by_fed_id(pqueue_delayed_grants_t* q, uint16_t fed_id) { +static pqueue_delayed_grant_element_t* pqueue_delayed_grants_find_by_fed_id(pqueue_delayed_grants_t* q, + uint16_t fed_id) { pqueue_delayed_grant_element_t* dge; if (!q || q->size == 1) return NULL; @@ -1073,7 +1074,7 @@ void handle_address_ad(uint16_t federate_id) { * * This will also notify federates downstream of my_fed that this federate is now * connected. This is important when there are zero-delay cycles. - * + * * This function assumes the caller holds the mutex. * * @param my_fed the federate to send the start time to. @@ -1166,13 +1167,13 @@ void handle_timestamp(federate_info_t* my_fed) { send_reject(&my_fed->socket, JOINING_TOO_LATE); return; } else { - // The federation is transient and we are in the execution phase. + // The federate is transient and we are in the execution phase. // At this point, we already hold the mutex. //// Algorithm for computing the effective_start_time of a joining transient // The effective_start_time will be the max among all the following tags: // 1. At tag: (joining time, 0 microstep) - // 2. (start_time, 1 microstep) + // 2. (start_time, 0 microstep) // 3. The latest completed logical tag + 1 microstep // 4. The latest granted (P)TAG + 1 microstep, of every downstream federate // 5. The maximun tag of messages from the upstream federates + 1 microstep @@ -1182,7 +1183,7 @@ void handle_timestamp(federate_info_t* my_fed) { // Condition 2. if (timestamp < start_time) { - my_fed->effective_start_tag = (tag_t){.time = start_time, .microstep = 1u}; + my_fed->effective_start_tag = (tag_t){.time = start_time, .microstep = 0u}; } // Condition 3. @@ -2462,6 +2463,11 @@ void wait_for_federates(int socket_descriptor) { // Wait for connections from persistent federates and create a thread for each. lf_connect_to_persistent_federates(socket_descriptor); + // Set the start_time in the RTI trace + if (rti_remote->base.tracing_enabled) { + lf_tracing_set_start_time(start_time); + } + // Set has_upstream_transient_federates parameter in all federates and check // that there is no more than one level of transiency if (rti_remote->number_of_transient_federates > 0) { From 5ef6b5476061657748355c6f4fef003a5b637a22 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 24 Jul 2024 15:53:28 +0100 Subject: [PATCH 14/40] Update lingua-franca-ref.txt --- lingua-franca-ref.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 52199a147..23343040e 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -transient-fed +transient-fed-cycles From d41a6a420fce6fcdf8a103d66ea87be8ccf4cd37 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sat, 27 Jul 2024 11:10:35 -0400 Subject: [PATCH 15/40] Revert to 0 microstep --- core/federated/RTI/rti_remote.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 2c232eb4c..da368be27 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -1231,6 +1231,9 @@ void handle_timestamp(federate_info_t* my_fed) { // For every downstream that has a pending grant that is higher than the // effective_start_time of the federate, cancel it. + // FIXME: Should this be higher-than or equal to? + // FIXME: Also, won't the grant simply be lost? + // If the joining federate doesn't send anything, the downstream federate won't issue another NET. for (int j = 0; j < my_fed->enclave.num_downstream; j++) { federate_info_t* downstream = GET_FED_INFO(my_fed->enclave.downstream[j]); From 6c20e647c4962c1288f57d545d2ce8335a2e4058 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 7 Aug 2024 19:20:15 +0100 Subject: [PATCH 16/40] Fix lf_get_federates_bin_directory() + Fix its scope, as well as lf_get_federation_id() --- core/federated/federate.c | 9 +-------- include/core/federated/federate.h | 18 ++++++++++++++++++ include/core/utils/util.h | 13 ------------- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 47ad7411b..d48e4627c 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2857,14 +2857,7 @@ void lf_stop() { } char* lf_get_federates_bin_directory() { - bool bin_directory_defined = false; -#ifdef LF_FEDERATES_BIN_DIRECTORY - bin_directory_defined = true; -#endif - if (bin_directory_defined) { - return LF_FEDERATES_BIN_DIRECTORY; - } - return NULL; + return LF_SOURCE_GEN_DIRECTORY LF_FILE_SEPARATOR ".." LF_FILE_SEPARATOR ".." LF_FILE_SEPARATOR "bin"; } const char* lf_get_federation_id() { return federation_metadata.federation_id; } diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 409bdfa7e..32f69edd0 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -535,4 +535,22 @@ void lf_synchronize_with_other_federates(); */ bool lf_update_max_level(tag_t tag, bool is_provisional); +/** + * @brief Return the directory containing the executables of the individual + * federates. + * + * This function is useful for testing purposes only. + * Note that it assumes that all federates are running on the same machine. + * In order for a program to use this function, it needs to include "federate.h" in the preamble. + */ +char* lf_get_federates_bin_directory(); + +/** + * @brief Returns the federation id. + * + * This function is useful for testing purposes only. + * In order for a program to use this function, it needs to include "federate.h" in the preamble. + */ +const char* lf_get_federation_id(); + #endif // FEDERATE_H diff --git a/include/core/utils/util.h b/include/core/utils/util.h index eb153f1f4..744c3579b 100644 --- a/include/core/utils/util.h +++ b/include/core/utils/util.h @@ -204,17 +204,4 @@ void lf_vprint_error_and_exit(const char* format, va_list args) ATTRIBUTE_FORMAT */ void lf_stop(); -/** - * @brief Return the directory containing the executables of the individual - * federates. - */ -char* lf_get_federates_bin_directory(); - -/** - * @brief Returns the federation id. - * - * This function is useful for creating federates on runtime. - */ -const char* lf_get_federation_id(); - #endif /* UTIL_H */ From 286931d56b5afcbda4da1367099b5992e14f92c8 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 7 Aug 2024 19:41:33 +0100 Subject: [PATCH 17/40] Remove no more needed LF_FEDERATED_BIN_DIRECTORY --- core/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 04443d5ab..6d938ae0c 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -181,7 +181,6 @@ define(SCHEDULER) define(LF_SOURCE_DIRECTORY) define(LF_SOURCE_GEN_DIRECTORY) define(LF_PACKAGE_DIRECTORY) -define(LF_FEDERATES_BIN_DIRECTORY) define(LF_FILE_SEPARATOR) define(WORKERS_NEEDED_FOR_FEDERATE) define(LF_ENCLAVES) From 6b55db0e4fefc794213edcce7dec7bbcf8a82903 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 7 Aug 2024 19:41:53 +0100 Subject: [PATCH 18/40] Run Clang formatter --- include/core/federated/network/net_common.h | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index 38df7c460..c93d25216 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -716,14 +716,14 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * They are limited to one byte (uchar). */ typedef enum { - FEDERATION_ID_DOES_NOT_MATCH = 1, - FEDERATE_ID_IN_USE = 2, - FEDERATE_ID_OUT_OF_RANGE = 3, - UNEXPECTED_MESSAGE = 4, - WRONG_SERVER = 5, - HMAC_DOES_NOT_MATCH = 6, - RTI_NOT_EXECUTED_WITH_AUTH = 7, - JOINING_TOO_LATE = 8 + FEDERATION_ID_DOES_NOT_MATCH = 1, + FEDERATE_ID_IN_USE = 2, + FEDERATE_ID_OUT_OF_RANGE = 3, + UNEXPECTED_MESSAGE = 4, + WRONG_SERVER = 5, + HMAC_DOES_NOT_MATCH = 6, + RTI_NOT_EXECUTED_WITH_AUTH = 7, + JOINING_TOO_LATE = 8 } rejection_code_t; #endif /* NET_COMMON_H */ From e8c930d331061fd5fcea8dd105435a511420d15a Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 14 Aug 2024 07:49:03 +0100 Subject: [PATCH 19/40] Remove overlooked code when merging --- core/federated/federate.c | 37 ------------------------------------- 1 file changed, 37 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 1600bd4a9..2f9a260b8 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2891,43 +2891,6 @@ char* lf_get_federates_bin_directory() { const char* lf_get_federation_id() { return federation_metadata.federation_id; } -void lf_stop() { - environment_t* env; - int num_env = _lf_get_environments(&env); - - for (int i = 0; i < num_env; i++) { - LF_MUTEX_LOCK(&env[i].mutex); - - tag_t new_stop_tag; - new_stop_tag.time = env[i].current_tag.time; - new_stop_tag.microstep = env[i].current_tag.microstep + 1; - - lf_set_stop_tag(&env[i], new_stop_tag); - - lf_print("Setting the stop tag of env %d to " PRINTF_TAG ".", i, env[i].stop_tag.time - start_time, - env[i].stop_tag.microstep); - - if (env[i].barrier.requestors) - _lf_decrement_tag_barrier_locked(&env[i]); - lf_cond_broadcast(&env[i].event_q_changed); - LF_MUTEX_UNLOCK(&env[i].mutex); - } - LF_PRINT_LOG("Federate is stopping."); -} - -char* lf_get_federates_bin_directory() { - bool bin_directory_defined = false; -#ifdef LF_FEDERATES_BIN_DIRECTORY - bin_directory_defined = true; -#endif - if (bin_directory_defined) { - return (LF_FEDERATES_BIN_DIRECTORY); - } - return NULL; -} - -const char* lf_get_federation_id() { return federation_metadata.federation_id; } - #ifdef FEDERATED_DECENTRALIZED instant_t lf_wait_until_time(tag_t tag) { instant_t result = tag.time; // Default. From 960307e47eb5818f94c2e3e84050c09b147a482c Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 21 Aug 2024 11:26:01 +0100 Subject: [PATCH 20/40] Handle corner cases where connection messages about transients are received before the start time --- core/federated/federate.c | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 2f9a260b8..806ff5a59 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -995,17 +995,34 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { size_t buffer_length = MSG_TYPE_TIMESTAMP_START_LENGTH; unsigned char buffer[buffer_length]; - read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, buffer_length, buffer, NULL, - "Failed to read MSG_TYPE_TIMESTAMP_START message from RTI."); - LF_PRINT_DEBUG("Read 21 bytes."); - - // First byte received is the message ID. - if (buffer[0] != MSG_TYPE_TIMESTAMP_START) { - if (buffer[0] == MSG_TYPE_FAILED) { - lf_print_error_and_exit("RTI has failed."); + while (true) { + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, 1, buffer, NULL, + "Failed to read MSG_TYPE_TIMESTAMP_START message from RTI."); + // First byte received is the message ID. + if (buffer[0] != MSG_TYPE_TIMESTAMP_START) { + if (buffer[0] == MSG_TYPE_FAILED) { + lf_print_error_and_exit("RTI has failed."); + } else if (buffer[0] == MSG_TYPE_UPSTREAM_CONNECTED) { + // We need to swallow this message so that we continue waiting for MSG_TYPE_TIMESTAMP_START to arrive + // FIXME: Shouldn't we keep the ids, so that these messages are handled right after the startime is set? + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, MSG_TYPE_UPSTREAM_CONNECTED_LENGTH - 1, buffer + 1, NULL, + "Failed to complete reading MSG_TYPE_UPSTREAM_CONNECTED."); + continue; + } else if (buffer[0] == MSG_TYPE_UPSTREAM_DISCONNECTED) { + // We need to swallow this message so that we continue waiting for MSG_TYPE_TIMESTAMP_START to arrive + // FIXME: Shouldn't we keep the ids, so that these messages are handled right after the startime is set? + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH - 1, buffer + 1, + NULL, "Failed to complete reading MSG_TYPE_UPSTREAM_DISCONNECTED."); + continue; + } else { + lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP_START message from the RTI. Got %u (see net_common.h).", + buffer[0]); + } + } else { + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, buffer_length - 1, buffer + 1, NULL, + "Failed to read MSG_TYPE_TIMESTAMP_START message from RTI."); + break; } - lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP_START message from the RTI. Got %u (see net_common.h).", - buffer[0]); } instant_t timestamp = extract_int64(&(buffer[1])); From d543cc7e047193569705fb67406129fc89493db7 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 21 Aug 2024 11:28:15 +0100 Subject: [PATCH 21/40] Reset timing info from previous runs --- core/federated/RTI/rti_remote.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 2c71b0224..1804e9ece 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -2398,6 +2398,11 @@ void initialize_federate(federate_info_t* fed, uint16_t id) { } void reset_transient_federate(federate_info_t* fed) { + // Reset all the timing information from the previous run + fed->enclave.completed = NEVER_TAG; + fed->enclave.last_granted = NEVER_TAG; + fed->enclave.last_provisionally_granted = NEVER_TAG; + fed->enclave.next_event = NEVER_TAG; // Reset of the federate-related attributes fed->socket = -1; // No socket. fed->clock_synchronization_enabled = true; From 29815425b2835b06920293f9fe326d8a59be5501 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 21 Aug 2024 11:56:09 +0100 Subject: [PATCH 22/40] Skip checking the satet in the first call of _update_min_delays_upsteam --- core/federated/RTI/rti_common.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/federated/RTI/rti_common.c b/core/federated/RTI/rti_common.c index f1229493f..d0f063652 100644 --- a/core/federated/RTI/rti_common.c +++ b/core/federated/RTI/rti_common.c @@ -305,9 +305,10 @@ static void _update_min_delays_upstream(scheduling_node_t* end, scheduling_node_ // Not the first call, so intermediate is upstream of end. delay_from_intermediate_so_far = path_delays[intermediate->id]; } - if (intermediate->state == NOT_CONNECTED) { + if (intermediate->state == NOT_CONNECTED && end->id != intermediate->id) { // Enclave or federate is not connected. // No point in checking upstream scheduling_nodes. + // Skip the first call return; } // Check nodes upstream of intermediate (or end on first call). From 7b7c01f4106a09b13647d29ced6ebf8611119183 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 28 Aug 2024 14:23:54 +0100 Subject: [PATCH 23/40] Invalidate min delays of all federates once a tansient joins --- core/federated/RTI/rti_remote.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 1804e9ece..9f75090a3 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -1258,6 +1258,14 @@ void handle_timestamp(federate_info_t* my_fed) { // from an upstream federate is forwarded before the start tag. send_start_tag(my_fed, start_time, my_fed->effective_start_tag); + // Whenver a transient joins, invalidate all federates, so that all min_delays_upstream + // get re-computed. + // FIXME: Needs to be optimized to only invalidate those affected by the transient + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { + federate_info_t* fed = GET_FED_INFO(i); + invalidate_min_delays_upstream(&(fed->enclave)); + } + LF_MUTEX_UNLOCK(&rti_mutex); } } @@ -2413,7 +2421,7 @@ void reset_transient_federate(federate_info_t* fed) { fed->server_ip_addr.s_addr = 0; fed->server_port = -1; fed->requested_stop = false; - invalidate_min_delays_upstream(&(fed->enclave)); + // invalidate_all_min_delays(); } int32_t start_rti_server(uint16_t port) { From 859a0780a26197a205008d5fe20600b5c7c8f086 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 28 Aug 2024 14:27:03 +0100 Subject: [PATCH 24/40] Do not skip the node itself in _updat_min_delays --- core/federated/RTI/rti_common.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/federated/RTI/rti_common.c b/core/federated/RTI/rti_common.c index d0f063652..f1229493f 100644 --- a/core/federated/RTI/rti_common.c +++ b/core/federated/RTI/rti_common.c @@ -305,10 +305,9 @@ static void _update_min_delays_upstream(scheduling_node_t* end, scheduling_node_ // Not the first call, so intermediate is upstream of end. delay_from_intermediate_so_far = path_delays[intermediate->id]; } - if (intermediate->state == NOT_CONNECTED && end->id != intermediate->id) { + if (intermediate->state == NOT_CONNECTED) { // Enclave or federate is not connected. // No point in checking upstream scheduling_nodes. - // Skip the first call return; } // Check nodes upstream of intermediate (or end on first call). From 5653cece4924cd1658c8dd2f2f2540ed5305126b Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Mon, 2 Dec 2024 09:05:33 +0100 Subject: [PATCH 25/40] Removed duplicated function --- core/federated/federate.c | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 002c9f1ce..734529c71 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -323,34 +323,6 @@ static void mark_inputs_known_absent(int fed_id) { #endif // FEDERATED_DECENTRALIZED } -/** - * @brief Mark all the input ports connected to the given federate as known to be absent until FOREVER. - * - * This does nothing if the federate is not using decentralized coordination. - * This function acquires the mutex on the top-level environment. - * @param fed_id The ID of the federate. - */ -static void mark_inputs_known_absent(int fed_id) { -#ifdef FEDERATED_DECENTRALIZED - // Note that when transient federates are supported, this will need to be updated because the - // federate could rejoin. - environment_t* env; - _lf_get_environments(&env); - LF_MUTEX_LOCK(&env->mutex); - - for (size_t i = 0; i < _lf_action_table_size; i++) { - lf_action_base_t* action = _lf_action_table[i]; - if (action->source_id == fed_id) { - update_last_known_status_on_input_port(env, FOREVER_TAG, i); - } - } - LF_MUTEX_UNLOCK(&env->mutex); -#else - // Do nothing, except suppress unused parameter error. - (void)fed_id; -#endif // FEDERATED_DECENTRALIZED -} - /** * @brief Update the last known status tag of a network input action. * From 2e9fa982b44791b8069217f34871b67974c52abd Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Mon, 2 Dec 2024 09:32:59 +0100 Subject: [PATCH 26/40] Formatting --- include/core/utils/pqueue_tag.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/core/utils/pqueue_tag.h b/include/core/utils/pqueue_tag.h index dec61a5cd..907428cac 100644 --- a/include/core/utils/pqueue_tag.h +++ b/include/core/utils/pqueue_tag.h @@ -218,7 +218,7 @@ void pqueue_tag_dump(pqueue_tag_t* q); /** * @brief Return the maximum tag in the queue or NEVER_TAG if the queue is empty. - * + * * @param q The queue. */ tag_t pqueue_tag_max_tag(pqueue_tag_t* q); From 41442f1dfacebd05f4ac6bca181311ddcc85aa2c Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Mon, 2 Dec 2024 10:12:41 +0100 Subject: [PATCH 27/40] Attempt to pass tests by manually adding prototypes to lf code --- include/core/federated/federate.h | 34 +++++++++++++++---------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index afa94896d..02b6495ef 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -530,23 +530,23 @@ void lf_synchronize_with_other_federates(); */ bool lf_update_max_level(tag_t tag, bool is_provisional); -/** - * @brief Return the directory containing the executables of the individual - * federates. - * - * This function is useful for testing purposes only. - * Note that it assumes that all federates are running on the same machine. - * In order for a program to use this function, it needs to include "federate.h" in the preamble. - */ -char* lf_get_federates_bin_directory(); - -/** - * @brief Returns the federation id. - * - * This function is useful for testing purposes only. - * In order for a program to use this function, it needs to include "federate.h" in the preamble. - */ -const char* lf_get_federation_id(); +// /** +// * @brief Return the directory containing the executables of the individual +// * federates. +// * +// * This function is useful for testing purposes only. +// * Note that it assumes that all federates are running on the same machine. +// * In order for a program to use this function, it needs to include "federate.h" in the preamble. +// */ +// char* lf_get_federates_bin_directory(); + +// /** +// * @brief Returns the federation id. +// * +// * This function is useful for testing purposes only. +// * In order for a program to use this function, it needs to include "federate.h" in the preamble. +// */ +// const char* lf_get_federation_id(); #ifdef FEDERATED_DECENTRALIZED /** From b189d360b3fcbedb8f37ee97e78133cc70ae5fd5 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Mon, 2 Dec 2024 11:48:19 -0800 Subject: [PATCH 28/40] Removed lf_get_federates_bin_directory. Use LF_FED_PACKAGE_DIRECTORY --- core/federated/federate.c | 4 ---- include/core/federated/federate.h | 10 ---------- 2 files changed, 14 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 734529c71..d18f96ec2 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2929,10 +2929,6 @@ void lf_stop() { LF_PRINT_LOG("Federate is stopping."); } -char* lf_get_federates_bin_directory() { - return LF_SOURCE_GEN_DIRECTORY LF_FILE_SEPARATOR ".." LF_FILE_SEPARATOR ".." LF_FILE_SEPARATOR "bin"; -} - const char* lf_get_federation_id() { return federation_metadata.federation_id; } #ifdef FEDERATED_DECENTRALIZED diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 02b6495ef..be72ae675 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -530,16 +530,6 @@ void lf_synchronize_with_other_federates(); */ bool lf_update_max_level(tag_t tag, bool is_provisional); -// /** -// * @brief Return the directory containing the executables of the individual -// * federates. -// * -// * This function is useful for testing purposes only. -// * Note that it assumes that all federates are running on the same machine. -// * In order for a program to use this function, it needs to include "federate.h" in the preamble. -// */ -// char* lf_get_federates_bin_directory(); - // /** // * @brief Returns the federation id. // * From 3318bb85a9bebc634277f358dd94e4d43d68ec18 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 4 Dec 2024 14:11:56 +0100 Subject: [PATCH 29/40] Do not account for absent trnsients when calculating efimt --- core/federated/RTI/rti_common.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/federated/RTI/rti_common.c b/core/federated/RTI/rti_common.c index f1229493f..fd9b01ee3 100644 --- a/core/federated/RTI/rti_common.c +++ b/core/federated/RTI/rti_common.c @@ -94,6 +94,8 @@ tag_t earliest_future_incoming_message_tag(scheduling_node_t* e) { for (int i = 0; i < e->num_min_delays; i++) { // Node e->min_delays[i].id is upstream of e with min delay e->min_delays[i].min_delay. scheduling_node_t* upstream = rti_common->scheduling_nodes[e->min_delays[i].id]; + if (upstream->state == NOT_CONNECTED) + continue; // If we haven't heard from the upstream node, then assume it can send an event at the start time. if (lf_tag_compare(upstream->next_event, NEVER_TAG) == 0) { tag_t start_tag = {.time = start_time, .microstep = 0}; From 83bf6c4f191e5d5cda09aea0e54ccd5c604b82e0 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sun, 8 Dec 2024 15:23:29 -0800 Subject: [PATCH 30/40] Restored declaration of lf_get_federation_id --- include/core/federated/federate.h | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index be72ae675..e7f99eaa9 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -477,6 +477,11 @@ int lf_send_tagged_message(environment_t* env, interval_t additional_delay, int */ void lf_set_federation_id(const char* fid); +/** + * @brief Return the federation id. + */ +const char* lf_get_federation_id(); + #ifdef FEDERATED_DECENTRALIZED /** * @brief Spawn a thread to iterate through STAA structs. @@ -530,14 +535,6 @@ void lf_synchronize_with_other_federates(); */ bool lf_update_max_level(tag_t tag, bool is_provisional); -// /** -// * @brief Returns the federation id. -// * -// * This function is useful for testing purposes only. -// * In order for a program to use this function, it needs to include "federate.h" in the preamble. -// */ -// const char* lf_get_federation_id(); - #ifdef FEDERATED_DECENTRALIZED /** * @brief Return the physical time that we should wait until before advancing to the specified tag. From 782d4927add68fad4671522a3e2f266f3cb8b07b Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Mon, 9 Dec 2024 13:07:42 +0100 Subject: [PATCH 31/40] Attempt to make sure a persistent knows that a transient joined --- core/federated/RTI/rti_remote.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 9f75090a3..bd0f1fc45 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -1109,6 +1109,15 @@ static void send_start_tag(federate_info_t* my_fed, instant_t federation_start_t for (int i = 0; i < my_fed->enclave.num_downstream; i++) { send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.downstream[i]), my_fed); } + + // A corner case was identified where a transient joins at tag (0, 0) and one of its persistent downstreams misses the + // notification. The following is an attempt to make sure it is notified. + for (int i = 0; i < my_fed->enclave.num_upstream; i++) { + federate_info_t* fed = GET_FED_INFO(my_fed->enclave.upstream[i]); + if (fed->is_transient && fed->enclave.state == GRANTED) { + send_upstream_connected_locked(my_fed, fed); + } + } } void handle_timestamp(federate_info_t* my_fed) { From 844ffb0eb657c588a84ded697e79a3029110f0c3 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Tue, 10 Dec 2024 15:13:30 +0100 Subject: [PATCH 32/40] Fix send_start_tag() function name, comments, and actions ordering --- core/federated/RTI/rti_remote.c | 77 ++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 35 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index bd0f1fc45..277d26744 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -344,22 +344,23 @@ static int get_num_absent_upstream_transients(federate_info_t* fed) { } /** - * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified federate. + * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified federate, only if it is connected. * * This function assumes that the mutex lock is already held. * @param destination The destination federate. * @param disconnected The connected federate. */ static void send_upstream_connected_locked(federate_info_t* destination, federate_info_t* connected) { - if (!connected->is_transient) { - // No need to send connected message for persistent federates. + if (destination->enclave.state == NOT_CONNECTED) { + LF_PRINT_LOG("RTI did not send upstream connected message to federate %d, because it is not connected.", + destination->enclave.id); return; } unsigned char buffer[MSG_TYPE_UPSTREAM_CONNECTED_LENGTH]; buffer[0] = MSG_TYPE_UPSTREAM_CONNECTED; encode_uint16(connected->enclave.id, &buffer[1]); if (write_to_socket_close_on_error(&destination->socket, MSG_TYPE_UPSTREAM_CONNECTED_LENGTH, buffer)) { - lf_print_warning("RTI: Failed to send upstream connected message to federate %d.", connected->enclave.id); + lf_print_warning("RTI: Failed to send upstream connected message to federate %d.", destination->enclave.id); } } @@ -1063,32 +1064,52 @@ void handle_address_ad(uint16_t federate_id) { } /** - * Send to the start time to the federate my_fed. - * This function assumes the caller does not hold the mutex. + * Send the start time and tag to the federate my_fed. * - * If it is the startup phase, the start_time will be the maximum received timestamps - * plus an offset. The federate will then receive identical federation_start_time - * and federate_start_tag.time (the federate_start_tag.microstep will be 0). - * If, however, the startup phase is passed, the federate will receive different - * values than stated above. + * During the startup phase, the start_time is calculated as the maximum received timestamp, plus an offset. + * The federate will then receive identical values for federation_start_time` and `federate_start_tag.time` (with + * `federate_start_tag.microstep` set to 0). After the startup phase, the federate will receive different values for + * these parameters. * - * This will also notify federates downstream of my_fed that this federate is now - * connected. This is important when there are zero-delay cycles. + * Before sending the start time and tag, this function performs the following actions: + * - If my_fed is transient, notify federates downstream of its connection, ensuring proper handling of zero-delay + * cycles. + * - Notify my_fed of all upstream transient federates that are connected. * - * This function assumes the caller holds the mutex. + * This function assumes that the mutex lock is already held. * * @param my_fed the federate to send the start time to. * @param federation_start_time the federation start_time * @param federate_start_tag the federate effective start tag */ -static void send_start_tag(federate_info_t* my_fed, instant_t federation_start_time, tag_t federate_start_tag) { +static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_start_time, tag_t federate_start_tag) { + // If this is a transient federate, notify its downstream federates that it is now connected. + if (my_fed->is_transient) { + for (int i = 0; i < my_fed->enclave.num_downstream; i++) { + send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.downstream[i]), my_fed); + } + } + // A corner case occurs when an upstream transient joins at tag (0, 0), but `my_fed` + // either misses the notification or receives it late. The following ensures that + // `my_fed` is informed of all currently connected upstream transients. + // This also prevents `my_fed` from receiving the start time and starting execution + // before the upstream connection message is received. + // This also deals with an even less likely corner case where two transients are joining simultaneously and one is + // upstream of the other. + for (int i = 0; i < my_fed->enclave.num_upstream; i++) { + federate_info_t* fed = GET_FED_INFO(my_fed->enclave.upstream[i]); + if (fed->is_transient && fed->enclave.state == GRANTED) { + send_upstream_connected_locked(my_fed, fed); + } + } + // Send back to the federate the maximum time plus an offset on a TIMESTAMP_START // message. // In the startup phase, federates will receive identical start_time and // effective_start_tag unsigned char start_time_buffer[MSG_TYPE_TIMESTAMP_START_LENGTH]; start_time_buffer[0] = MSG_TYPE_TIMESTAMP_START; - encode_int64(swap_bytes_if_big_endian_int64(start_time), &start_time_buffer[1]); + encode_int64(swap_bytes_if_big_endian_int64(federation_start_time), &start_time_buffer[1]); encode_tag(&(start_time_buffer[1 + sizeof(instant_t)]), federate_start_tag); if (rti_remote->base.tracing_enabled) { @@ -1098,26 +1119,12 @@ static void send_start_tag(federate_info_t* my_fed, instant_t federation_start_t lf_print_error("Failed to send the starting time to federate %d.", my_fed->enclave.id); } - // Update state for the federate to indicate that the MSG_TYPE_TIMESTAMP - // message has been sent. That MSG_TYPE_TIMESTAMP message grants time advance to - // the federate to the start time. + // Update state for the federate to indicate that the MSG_TYPE_TIMESTAMP_START + // message has been sent. That MSG_TYPE_TIMESTAMP_START message grants time advance to + // the federate to the federate_start_tag.time. my_fed->enclave.state = GRANTED; lf_cond_broadcast(&sent_start_time); LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d.", start_time, my_fed->enclave.id); - - // Notify downstream federates of this now connected transient. - for (int i = 0; i < my_fed->enclave.num_downstream; i++) { - send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.downstream[i]), my_fed); - } - - // A corner case was identified where a transient joins at tag (0, 0) and one of its persistent downstreams misses the - // notification. The following is an attempt to make sure it is notified. - for (int i = 0; i < my_fed->enclave.num_upstream; i++) { - federate_info_t* fed = GET_FED_INFO(my_fed->enclave.upstream[i]); - if (fed->is_transient && fed->enclave.state == GRANTED) { - send_upstream_connected_locked(my_fed, fed); - } - } } void handle_timestamp(federate_info_t* my_fed) { @@ -1165,7 +1172,7 @@ void handle_timestamp(federate_info_t* my_fed) { // Notify the federate of its start tag. // This has to be done while still holding the mutex. - send_start_tag(my_fed, start_time, my_fed->effective_start_tag); + send_start_tag_locked(my_fed, start_time, my_fed->effective_start_tag); LF_MUTEX_UNLOCK(&rti_mutex); } else if (rti_remote->phase == shutdown_phase || !my_fed->is_transient) { @@ -1265,7 +1272,7 @@ void handle_timestamp(federate_info_t* my_fed) { // Have to send the start tag while still holding the mutex to ensure that no message // from an upstream federate is forwarded before the start tag. - send_start_tag(my_fed, start_time, my_fed->effective_start_tag); + send_start_tag_locked(my_fed, start_time, my_fed->effective_start_tag); // Whenver a transient joins, invalidate all federates, so that all min_delays_upstream // get re-computed. From b7f3d5c465b9a1e5284afe45301f232fff8f157c Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 11 Dec 2024 12:18:52 +0100 Subject: [PATCH 33/40] Fix message ordering in send_start_tag_locked() + apply suggestions to improve comments --- core/federated/RTI/rti_remote.c | 58 ++++++++++++++++----------------- 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 277d26744..e431abbc0 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -344,7 +344,8 @@ static int get_num_absent_upstream_transients(federate_info_t* fed) { } /** - * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified federate, only if it is connected. + * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified `destination` if it is connected to the RTI, + * telling it that the specified `upstream` federate is also now connected. * * This function assumes that the mutex lock is already held. * @param destination The destination federate. @@ -1064,17 +1065,17 @@ void handle_address_ad(uint16_t federate_id) { } /** - * Send the start time and tag to the federate my_fed. + * @brief Send the global federation start time and the federate-specific starting tag to the specified federate. * - * During the startup phase, the start_time is calculated as the maximum received timestamp, plus an offset. - * The federate will then receive identical values for federation_start_time` and `federate_start_tag.time` (with - * `federate_start_tag.microstep` set to 0). After the startup phase, the federate will receive different values for - * these parameters. + * For persistent federates and transient federates that happen to join during federation startup, the + * `federation_start_time` will match the time in the `federate_start_tag`, and the microstep will be 0. + * For a transient federate that joins later, the time in the `federate_start_tag` will be greater than the + * federation_start_time`. * - * Before sending the start time and tag, this function performs the following actions: - * - If my_fed is transient, notify federates downstream of its connection, ensuring proper handling of zero-delay - * cycles. - * - Notify my_fed of all upstream transient federates that are connected. + * + * Before sending the start time and tag, this function notifies my_fed of all upstream transient federates that are + * connected. After sending the start time and tag, and if my_fed is transient, notify federates downstream of its + * connection, ensuring proper handling of zero-delay cycles. * * This function assumes that the mutex lock is already held. * @@ -1083,19 +1084,9 @@ void handle_address_ad(uint16_t federate_id) { * @param federate_start_tag the federate effective start tag */ static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_start_time, tag_t federate_start_tag) { - // If this is a transient federate, notify its downstream federates that it is now connected. - if (my_fed->is_transient) { - for (int i = 0; i < my_fed->enclave.num_downstream; i++) { - send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.downstream[i]), my_fed); - } - } - // A corner case occurs when an upstream transient joins at tag (0, 0), but `my_fed` - // either misses the notification or receives it late. The following ensures that - // `my_fed` is informed of all currently connected upstream transients. - // This also prevents `my_fed` from receiving the start time and starting execution - // before the upstream connection message is received. - // This also deals with an even less likely corner case where two transients are joining simultaneously and one is - // upstream of the other. + // Notify my_fed of any upstream transient federates that are connected. + // This has to occur before sending the start tag so that my_fed does not begin executing thinking that these + // upstream federates are not connected. for (int i = 0; i < my_fed->enclave.num_upstream; i++) { federate_info_t* fed = GET_FED_INFO(my_fed->enclave.upstream[i]); if (fed->is_transient && fed->enclave.state == GRANTED) { @@ -1117,14 +1108,21 @@ static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_ } if (write_to_socket(my_fed->socket, MSG_TYPE_TIMESTAMP_START_LENGTH, start_time_buffer)) { lf_print_error("Failed to send the starting time to federate %d.", my_fed->enclave.id); + } else { + // Update state for the federate to indicate that the MSG_TYPE_TIMESTAMP_START + // message has been sent. That MSG_TYPE_TIMESTAMP_START message grants time advance to + // the federate to the federate_start_tag.time. + my_fed->enclave.state = GRANTED; + lf_cond_broadcast(&sent_start_time); + LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d.", start_time, my_fed->enclave.id); + + // If this is a transient federate, notify its downstream federates that it is now connected. + if (my_fed->is_transient) { + for (int i = 0; i < my_fed->enclave.num_downstream; i++) { + send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.downstream[i]), my_fed); + } + } } - - // Update state for the federate to indicate that the MSG_TYPE_TIMESTAMP_START - // message has been sent. That MSG_TYPE_TIMESTAMP_START message grants time advance to - // the federate to the federate_start_tag.time. - my_fed->enclave.state = GRANTED; - lf_cond_broadcast(&sent_start_time); - LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d.", start_time, my_fed->enclave.id); } void handle_timestamp(federate_info_t* my_fed) { From bb67e6311fab6ee664e72354bda6b5688f10763c Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 11 Dec 2024 14:33:15 +0100 Subject: [PATCH 34/40] Allow federate to handle that an upstream has connected or disconnected even before receiving the start time --- core/federated/federate.c | 94 +++++++++++++++++++-------------------- 1 file changed, 45 insertions(+), 49 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index aaa958c2e..f9431fa64 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1004,6 +1004,44 @@ static void rti_address(const char* hostname, uint16_t port, struct addrinfo** r } } +/** + * @brief Handle message from the RTI that an upstream federate has connected. + * + */ +static void handle_upstream_connected_message(void) { + size_t bytes_to_read = sizeof(uint16_t); + unsigned char buffer[bytes_to_read]; + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, + "Failed to read upstream connected message from RTI."); + uint16_t connected = extract_uint16(buffer); + LF_PRINT_DEBUG("Received notification that upstream federate %d has connected", connected); + // Mark the upstream as connected. + for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + if (_lf_zero_delay_cycle_upstream_ids[i] == connected) { + _lf_zero_delay_cycle_upstream_disconnected[i] = false; + } + } +} + +/** + * @brief Handle message from the RTI that an upstream federate has disconnected. + * + */ +static void handle_upstream_disconnected_message(void) { + size_t bytes_to_read = sizeof(uint16_t); + unsigned char buffer[bytes_to_read]; + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, + "Failed to read upstream disconnected message from RTI."); + uint16_t disconnected = extract_uint16(buffer); + LF_PRINT_DEBUG("Received notification that upstream federate %d has disconnected", disconnected); + // Mark the upstream as disconnected. + for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + if (_lf_zero_delay_cycle_upstream_ids[i] == disconnected) { + _lf_zero_delay_cycle_upstream_disconnected[i] = true; + } + } +} + /** * Send the specified timestamp to the RTI and wait for a response. * The specified timestamp should be current physical time of the @@ -1030,16 +1068,12 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { if (buffer[0] == MSG_TYPE_FAILED) { lf_print_error_and_exit("RTI has failed."); } else if (buffer[0] == MSG_TYPE_UPSTREAM_CONNECTED) { - // We need to swallow this message so that we continue waiting for MSG_TYPE_TIMESTAMP_START to arrive - // FIXME: Shouldn't we keep the ids, so that these messages are handled right after the startime is set? - read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, MSG_TYPE_UPSTREAM_CONNECTED_LENGTH - 1, buffer + 1, NULL, - "Failed to complete reading MSG_TYPE_UPSTREAM_CONNECTED."); + // We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP_START to arrive + handle_upstream_connected_message(); continue; } else if (buffer[0] == MSG_TYPE_UPSTREAM_DISCONNECTED) { - // We need to swallow this message so that we continue waiting for MSG_TYPE_TIMESTAMP_START to arrive - // FIXME: Shouldn't we keep the ids, so that these messages are handled right after the startime is set? - read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH - 1, buffer + 1, - NULL, "Failed to complete reading MSG_TYPE_UPSTREAM_DISCONNECTED."); + // We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP_START to arrive + handle_upstream_disconnected_message(); continue; } else { lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP_START message from the RTI. Got %u (see net_common.h).", @@ -1596,44 +1630,6 @@ static void send_failed_signal() { */ static void handle_rti_failed_message(void) { exit(1); } -/** - * @brief Handle message from the RTI that an upstream federate has connected. - * - */ -static void handle_upstream_connected_message(void) { - size_t bytes_to_read = sizeof(uint16_t); - unsigned char buffer[bytes_to_read]; - read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, - "Failed to read upstream connected message from RTI."); - uint16_t connected = extract_uint16(buffer); - LF_PRINT_DEBUG("Received notification that upstream federate %d has connected", connected); - // Mark the upstream as connected. - for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { - if (_lf_zero_delay_cycle_upstream_ids[i] == connected) { - _lf_zero_delay_cycle_upstream_disconnected[i] = false; - } - } -} - -/** - * @brief Handle message from the RTI that an upstream federate has disconnected. - * - */ -static void handle_upstream_disconnected_message(void) { - size_t bytes_to_read = sizeof(uint16_t); - unsigned char buffer[bytes_to_read]; - read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, - "Failed to read upstream disconnected message from RTI."); - uint16_t disconnected = extract_uint16(buffer); - LF_PRINT_DEBUG("Received notification that upstream federate %d has disconnected", disconnected); - // Mark the upstream as disconnected. - for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { - if (_lf_zero_delay_cycle_upstream_ids[i] == disconnected) { - _lf_zero_delay_cycle_upstream_disconnected[i] = true; - } - } -} - /** * Thread that listens for TCP inputs from the RTI. * When messages arrive, this calls the appropriate handler. @@ -2070,9 +2066,9 @@ void lf_connect_to_rti(const char* hostname, int port) { if (result < 0) continue; // Connect failed. - // Have connected to an RTI, but not sure it's the right RTI. - // Send a MSG_TYPE_FED_IDS message and wait for a reply. - // Notify the RTI of the ID of this federate and its federation. + // Have connected to an RTI, but not sure it's the right RTI. + // Send a MSG_TYPE_FED_IDS message and wait for a reply. + // Notify the RTI of the ID of this federate and its federation. #ifdef FEDERATED_AUTHENTICATED LF_PRINT_LOG("Connected to an RTI. Performing HMAC-based authentication using federation ID."); From 173bfc3aa2d0d8844f68202b2ae2ee1a24848999 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 11 Dec 2024 14:38:56 +0100 Subject: [PATCH 35/40] Format?! --- include/core/utils/impl/hashmap.h | 2 +- include/core/utils/impl/pointer_hashmap.h | 2 +- tag/api/tag.h | 9 ++++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/include/core/utils/impl/hashmap.h b/include/core/utils/impl/hashmap.h index 94d5969a7..e64774887 100644 --- a/include/core/utils/impl/hashmap.h +++ b/include/core/utils/impl/hashmap.h @@ -19,7 +19,7 @@ #define V void* #endif #ifndef HASH_OF -#define HASH_OF(key) (size_t)key +#define HASH_OF(key) (size_t) key #endif #ifndef HASHMAP #define HASHMAP(token) hashmap##_##token diff --git a/include/core/utils/impl/pointer_hashmap.h b/include/core/utils/impl/pointer_hashmap.h index c2a60aef1..2184518b3 100644 --- a/include/core/utils/impl/pointer_hashmap.h +++ b/include/core/utils/impl/pointer_hashmap.h @@ -30,7 +30,7 @@ #define HASHMAP(token) hashmap_object2int##_##token #define K void* #define V int -#define HASH_OF(key) (size_t)key +#define HASH_OF(key) (size_t) key #include "hashmap.h" #undef HASHMAP #undef K diff --git a/tag/api/tag.h b/tag/api/tag.h index 2784e1c84..9b1a3f9ad 100644 --- a/tag/api/tag.h +++ b/tag/api/tag.h @@ -37,12 +37,15 @@ #define NEVER_TAG \ (tag_t) { .time = NEVER, .microstep = NEVER_MICROSTEP } // Need a separate initializer expression to comply with some C compilers -#define NEVER_TAG_INITIALIZER {NEVER, NEVER_MICROSTEP} +#define NEVER_TAG_INITIALIZER \ + { NEVER, NEVER_MICROSTEP } #define FOREVER_TAG \ (tag_t) { .time = FOREVER, .microstep = FOREVER_MICROSTEP } // Need a separate initializer expression to comply with some C compilers -#define FOREVER_TAG_INITIALIZER {FOREVER, FOREVER_MICROSTEP} -#define ZERO_TAG (tag_t){.time = 0LL, .microstep = 0u} +#define FOREVER_TAG_INITIALIZER \ + { FOREVER, FOREVER_MICROSTEP } +#define ZERO_TAG \ + (tag_t) { .time = 0LL, .microstep = 0u } // Returns true if timeout has elapsed. #define CHECK_TIMEOUT(start, duration) (lf_time_physical() > ((start) + (duration))) From 6ad1897824c4c98c3d968b35c855469c04933281 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 11 Dec 2024 15:34:12 +0100 Subject: [PATCH 36/40] Formatting using clang-format-19 --- core/federated/federate.c | 6 +++--- include/core/utils/impl/hashmap.h | 2 +- include/core/utils/impl/pointer_hashmap.h | 2 +- tag/api/tag.h | 9 +++------ 4 files changed, 8 insertions(+), 11 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index f9431fa64..5ae6cc38e 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2066,9 +2066,9 @@ void lf_connect_to_rti(const char* hostname, int port) { if (result < 0) continue; // Connect failed. - // Have connected to an RTI, but not sure it's the right RTI. - // Send a MSG_TYPE_FED_IDS message and wait for a reply. - // Notify the RTI of the ID of this federate and its federation. + // Have connected to an RTI, but not sure it's the right RTI. + // Send a MSG_TYPE_FED_IDS message and wait for a reply. + // Notify the RTI of the ID of this federate and its federation. #ifdef FEDERATED_AUTHENTICATED LF_PRINT_LOG("Connected to an RTI. Performing HMAC-based authentication using federation ID."); diff --git a/include/core/utils/impl/hashmap.h b/include/core/utils/impl/hashmap.h index e64774887..94d5969a7 100644 --- a/include/core/utils/impl/hashmap.h +++ b/include/core/utils/impl/hashmap.h @@ -19,7 +19,7 @@ #define V void* #endif #ifndef HASH_OF -#define HASH_OF(key) (size_t) key +#define HASH_OF(key) (size_t)key #endif #ifndef HASHMAP #define HASHMAP(token) hashmap##_##token diff --git a/include/core/utils/impl/pointer_hashmap.h b/include/core/utils/impl/pointer_hashmap.h index 2184518b3..c2a60aef1 100644 --- a/include/core/utils/impl/pointer_hashmap.h +++ b/include/core/utils/impl/pointer_hashmap.h @@ -30,7 +30,7 @@ #define HASHMAP(token) hashmap_object2int##_##token #define K void* #define V int -#define HASH_OF(key) (size_t) key +#define HASH_OF(key) (size_t)key #include "hashmap.h" #undef HASHMAP #undef K diff --git a/tag/api/tag.h b/tag/api/tag.h index 9b1a3f9ad..2784e1c84 100644 --- a/tag/api/tag.h +++ b/tag/api/tag.h @@ -37,15 +37,12 @@ #define NEVER_TAG \ (tag_t) { .time = NEVER, .microstep = NEVER_MICROSTEP } // Need a separate initializer expression to comply with some C compilers -#define NEVER_TAG_INITIALIZER \ - { NEVER, NEVER_MICROSTEP } +#define NEVER_TAG_INITIALIZER {NEVER, NEVER_MICROSTEP} #define FOREVER_TAG \ (tag_t) { .time = FOREVER, .microstep = FOREVER_MICROSTEP } // Need a separate initializer expression to comply with some C compilers -#define FOREVER_TAG_INITIALIZER \ - { FOREVER, FOREVER_MICROSTEP } -#define ZERO_TAG \ - (tag_t) { .time = 0LL, .microstep = 0u } +#define FOREVER_TAG_INITIALIZER {FOREVER, FOREVER_MICROSTEP} +#define ZERO_TAG (tag_t){.time = 0LL, .microstep = 0u} // Returns true if timeout has elapsed. #define CHECK_TIMEOUT(start, duration) (lf_time_physical() > ((start) + (duration))) From 9053feb831f7cfae87168b2c36393567d0f448a1 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Wed, 11 Dec 2024 22:47:44 +0100 Subject: [PATCH 37/40] Fix dropping the message when a transient's effective start tag is not yet reached. --- core/federated/RTI/rti_remote.c | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index e431abbc0..d1e962da2 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -691,10 +691,26 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff LF_MUTEX_UNLOCK(&rti_mutex); return; } else { + // Do not forward the message if the federate is connected, but its + // start_time is not reached yet if (lf_tag_compare(intended_tag, fed->effective_start_tag) < 0) { - // Do not forward the message if the federate is connected, but its - // start_time is not reached yet - lf_mutex_unlock(&rti_mutex); + LF_PRINT_LOG("RTI: Effective start tag of the destination federate %d (" PRINTF_TAG "), " + "is not reached yet, while the received message tag is ()" PRINTF_TAG "). " + "Dropping message.", + federate_id, fed->effective_start_tag.time - start_time, fed->effective_start_tag.microstep, + intended_tag.time - start_time, intended_tag.microstep); + // Similarly, if the message was larger than the buffer, we must empty out the remainder also. + size_t total_bytes_read = bytes_read; + while (total_bytes_read < total_bytes_to_read) { + bytes_to_read = total_bytes_to_read - total_bytes_read; + if (bytes_to_read > FED_COM_BUFFER_SIZE) { + bytes_to_read = FED_COM_BUFFER_SIZE; + } + read_from_socket_fail_on_error(&sending_federate->socket, bytes_to_read, buffer, NULL, + "RTI failed to clear message chunks."); + total_bytes_read += bytes_to_read; + } + LF_MUTEX_UNLOCK(&rti_mutex); return; } } @@ -2435,6 +2451,7 @@ void reset_transient_federate(federate_info_t* fed) { fed->server_ip_addr.s_addr = 0; fed->server_port = -1; fed->requested_stop = false; + fed->effective_start_tag = NEVER_TAG; // invalidate_all_min_delays(); } From 34d51b3c6b6d22d6f02b596434f6a5bfe2540cb8 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Fri, 13 Dec 2024 06:57:56 +0100 Subject: [PATCH 38/40] Simplify the code of message dropping in the remote RTI. --- core/federated/RTI/rti_remote.c | 43 ++++++--------------------------- 1 file changed, 7 insertions(+), 36 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index d1e962da2..c1f8dfada 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -663,20 +663,14 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff // issue a TAG before this message has been forwarded. LF_MUTEX_LOCK(&rti_mutex); - // If the destination federate is no longer connected, issue a warning, - // remove the message from the socket and return. + // If the destination federate is no longer connected, or it is a transient that has not started executing yet + // (the intended tag is less than the effective start tag of the destination), issue a warning, remove the message + // from the socket, and return. federate_info_t* fed = GET_FED_INFO(federate_id); - if (fed->enclave.state == NOT_CONNECTED) { - lf_print_warning("RTI: Destination federate %d is no longer connected. Dropping message.", federate_id); - LF_PRINT_LOG("Fed status: next_event " PRINTF_TAG ", " - "completed " PRINTF_TAG ", " - "last_granted " PRINTF_TAG ", " - "last_provisionally_granted " PRINTF_TAG ".", - fed->enclave.next_event.time - start_time, fed->enclave.next_event.microstep, - fed->enclave.completed.time - start_time, fed->enclave.completed.microstep, - fed->enclave.last_granted.time - start_time, fed->enclave.last_granted.microstep, - fed->enclave.last_provisionally_granted.time - start_time, - fed->enclave.last_provisionally_granted.microstep); + if (fed->enclave.state == NOT_CONNECTED || lf_tag_compare(intended_tag, fed->effective_start_tag) < 0) { + lf_print_warning("RTI: Destination federate %d is not connected at logical time (" PRINTF_TAG + "). Dropping message.", + federate_id, intended_tag.time - start_time, intended_tag.microstep); // If the message was larger than the buffer, we must empty out the remainder also. size_t total_bytes_read = bytes_read; while (total_bytes_read < total_bytes_to_read) { @@ -690,29 +684,6 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff } LF_MUTEX_UNLOCK(&rti_mutex); return; - } else { - // Do not forward the message if the federate is connected, but its - // start_time is not reached yet - if (lf_tag_compare(intended_tag, fed->effective_start_tag) < 0) { - LF_PRINT_LOG("RTI: Effective start tag of the destination federate %d (" PRINTF_TAG "), " - "is not reached yet, while the received message tag is ()" PRINTF_TAG "). " - "Dropping message.", - federate_id, fed->effective_start_tag.time - start_time, fed->effective_start_tag.microstep, - intended_tag.time - start_time, intended_tag.microstep); - // Similarly, if the message was larger than the buffer, we must empty out the remainder also. - size_t total_bytes_read = bytes_read; - while (total_bytes_read < total_bytes_to_read) { - bytes_to_read = total_bytes_to_read - total_bytes_read; - if (bytes_to_read > FED_COM_BUFFER_SIZE) { - bytes_to_read = FED_COM_BUFFER_SIZE; - } - read_from_socket_fail_on_error(&sending_federate->socket, bytes_to_read, buffer, NULL, - "RTI failed to clear message chunks."); - total_bytes_read += bytes_to_read; - } - LF_MUTEX_UNLOCK(&rti_mutex); - return; - } } LF_PRINT_DEBUG("RTI forwarding message to port %d of federate %hu of length %zu.", reactor_port_id, federate_id, From 793fc9a62378624e39ec52499dbae174146fe1ba Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sat, 14 Dec 2024 12:10:53 -0800 Subject: [PATCH 39/40] Do not exit if there is no timeout, fast is true, and keepalive is true --- core/threaded/reactor_threaded.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 93168c287..493bd5a3e 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -195,7 +195,7 @@ void lf_set_present(lf_port_base_t* port) { } bool wait_until(instant_t wait_until_time, lf_cond_t* condition) { - if (!fast) { + if (!fast || (wait_until_time == FOREVER && keepalive_specified)) { LF_PRINT_DEBUG("-------- Waiting until physical time " PRINTF_TIME, wait_until_time - start_time); // Check whether we actually need to wait, or if we have already passed the timepoint. interval_t wait_duration = wait_until_time - lf_time_physical(); From 1e5e441e10771b03a95efa0effad1389e0253445 Mon Sep 17 00:00:00 2001 From: Chadlia Jerad Date: Mon, 16 Dec 2024 11:52:36 +0100 Subject: [PATCH 40/40] Account for delay when dropping a message to be forwarded to a transient federate --- core/federated/RTI/rti_remote.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index c1f8dfada..0153a0c72 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -664,10 +664,18 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff LF_MUTEX_LOCK(&rti_mutex); // If the destination federate is no longer connected, or it is a transient that has not started executing yet - // (the intended tag is less than the effective start tag of the destination), issue a warning, remove the message - // from the socket, and return. + // (the delayed intended tag is less than the effective start tag of the destination), issue a warning, remove the + // message from the socket, and return. federate_info_t* fed = GET_FED_INFO(federate_id); - if (fed->enclave.state == NOT_CONNECTED || lf_tag_compare(intended_tag, fed->effective_start_tag) < 0) { + interval_t delay = NEVER; + for (int i = 0; i < fed->enclave.num_upstream; i++) { + if (fed->enclave.upstream[i] == sending_federate->enclave.id) { + delay = fed->enclave.upstream_delay[i]; + break; + } + } + if (fed->enclave.state == NOT_CONNECTED || + lf_tag_compare(lf_delay_tag(intended_tag, delay), fed->effective_start_tag) < 0) { lf_print_warning("RTI: Destination federate %d is not connected at logical time (" PRINTF_TAG "). Dropping message.", federate_id, intended_tag.time - start_time, intended_tag.microstep);