Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support zero-delay cycles #468

Open
wants to merge 51 commits into
base: transient-fed
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
fea2eed
Support zero-delay cycles
edwardalee Jul 15, 2024
7fd3ce5
Fix the type of _lf_zero_delay_cycle_action_table_size iterator to be…
ChadliaJerad Jul 15, 2024
ac14760
Removed unnecessary parens
edwardalee Jul 15, 2024
ec5b2d6
Run clang formatter
ChadliaJerad Jul 17, 2024
322dde6
Fixed thread interactions
edwardalee Jul 17, 2024
da02bb6
Format
edwardalee Jul 17, 2024
e902d3f
Fix mutex access in lf_delayed_grants()
ChadliaJerad Jul 18, 2024
c7ea64d
Made functions static and use free function for queue
edwardalee Jul 18, 2024
46abf8c
Added max function
edwardalee Jul 19, 2024
8c85e1f
Revert to holding mutex plus general cleanups
edwardalee Jul 19, 2024
60282e5
Hold mutex and notify downstream not upstream
edwardalee Jul 19, 2024
e97a619
Merge branch 'transient-fed' into transient-fed-cycles
ChadliaJerad Jul 24, 2024
291d7fb
Fix the pqueue tag iterator type
ChadliaJerad Jul 24, 2024
71a4cb6
Fix condition 2 for computing the effective start time of a transient…
ChadliaJerad Jul 24, 2024
5ef6b54
Update lingua-franca-ref.txt
ChadliaJerad Jul 24, 2024
d41a6a4
Revert to 0 microstep
edwardalee Jul 27, 2024
07f3671
Merge branch 'transient-fed' into transient-fed-cycles
ChadliaJerad Aug 7, 2024
6c20e64
Fix lf_get_federates_bin_directory() + Fix its scope, as well as lf_g…
ChadliaJerad Aug 7, 2024
286931d
Remove no more needed LF_FEDERATED_BIN_DIRECTORY
ChadliaJerad Aug 7, 2024
6b55db0
Run Clang formatter
ChadliaJerad Aug 7, 2024
f00c5af
Merge branch 'transient-fed' into transient-fed-cycles
ChadliaJerad Aug 14, 2024
e8c930d
Remove overlooked code when merging
ChadliaJerad Aug 14, 2024
960307e
Handle corner cases where connection messages about transients are re…
ChadliaJerad Aug 21, 2024
d543cc7
Reset timing info from previous runs
ChadliaJerad Aug 21, 2024
2981542
Skip checking the satet in the first call of _update_min_delays_upsteam
ChadliaJerad Aug 21, 2024
7b7c01f
Invalidate min delays of all federates once a tansient joins
ChadliaJerad Aug 28, 2024
859a078
Do not skip the node itself in _updat_min_delays
ChadliaJerad Aug 28, 2024
f05d94f
Merge branch 'transient-fed' into transient-fed-cycles
ChadliaJerad Oct 1, 2024
5653cec
Removed duplicated function
ChadliaJerad Dec 2, 2024
7952a94
Merge branch 'transient-fed' into transient-fed-cycles
ChadliaJerad Dec 2, 2024
2e9fa98
Formatting
ChadliaJerad Dec 2, 2024
41442f1
Attempt to pass tests by manually adding prototypes to lf code
ChadliaJerad Dec 2, 2024
b189d36
Removed lf_get_federates_bin_directory. Use LF_FED_PACKAGE_DIRECTORY
edwardalee Dec 2, 2024
3318bb8
Do not account for absent trnsients when calculating efimt
ChadliaJerad Dec 4, 2024
5af472c
Merge branch 'transient-fed' into transient-fed-cycles
ChadliaJerad Dec 6, 2024
83bf6c4
Restored declaration of lf_get_federation_id
edwardalee Dec 8, 2024
782d492
Attempt to make sure a persistent knows that a transient joined
ChadliaJerad Dec 9, 2024
844ffb0
Fix send_start_tag() function name, comments, and actions ordering
ChadliaJerad Dec 10, 2024
66f0275
Merge branch 'transient-fed' into transient-fed-cycles
ChadliaJerad Dec 10, 2024
b7f3d5c
Fix message ordering in send_start_tag_locked() + apply suggestions t…
ChadliaJerad Dec 11, 2024
02730df
Merge branch 'transient-fed' into transient-fed-cycles
ChadliaJerad Dec 11, 2024
bb67e63
Allow federate to handle that an upstream has connected or disconnect…
ChadliaJerad Dec 11, 2024
173bfc3
Format?!
ChadliaJerad Dec 11, 2024
6ad1897
Formatting using clang-format-19
ChadliaJerad Dec 11, 2024
9053feb
Fix dropping the message when a transient's effective start tag is no…
ChadliaJerad Dec 11, 2024
34d51b3
Simplify the code of message dropping in the remote RTI.
ChadliaJerad Dec 13, 2024
793fc9a
Do not exit if there is no timeout, fast is true, and keepalive is true
edwardalee Dec 14, 2024
6253d02
Merge pull request #504 from lf-lang/keepalive
edwardalee Dec 15, 2024
1e5e441
Account for delay when dropping a message to be forwarded to a transi…
ChadliaJerad Dec 16, 2024
75db4d6
Merge branch 'main' into transient-fed
ChadliaJerad Dec 16, 2024
b12f3c2
Merge branch 'transient-fed' into transient-fed-cycles
ChadliaJerad Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ define(SCHEDULER)
define(LF_SOURCE_DIRECTORY)
define(LF_SOURCE_GEN_DIRECTORY)
define(LF_PACKAGE_DIRECTORY)
define(LF_FEDERATES_BIN_DIRECTORY)
define(LF_FILE_SEPARATOR)
define(WORKERS_NEEDED_FOR_FEDERATE)
define(LF_ENCLAVES)
2 changes: 2 additions & 0 deletions core/federated/RTI/rti_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ tag_t earliest_future_incoming_message_tag(scheduling_node_t* e) {
for (int i = 0; i < e->num_min_delays; i++) {
// Node e->min_delays[i].id is upstream of e with min delay e->min_delays[i].min_delay.
scheduling_node_t* upstream = rti_common->scheduling_nodes[e->min_delays[i].id];
if (upstream->state == NOT_CONNECTED)
continue;
// If we haven't heard from the upstream node, then assume it can send an event at the start time.
if (lf_tag_compare(upstream->next_event, NEVER_TAG) == 0) {
tag_t start_tag = {.time = start_time, .microstep = 0};
Expand Down
404 changes: 243 additions & 161 deletions core/federated/RTI/rti_remote.c

Large diffs are not rendered by default.

13 changes: 6 additions & 7 deletions core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ typedef enum clock_sync_stat { clock_sync_off, clock_sync_init, clock_sync_on }
/**
* The federation life cycle phases.
*/
typedef enum federation_life_cycle_phase { startup_phase, execution_phase, shutdown_phase } federation_life_cycle_phase;
typedef enum federation_life_cycle_phase {
startup_phase, // Not all persistent federates have joined.
execution_phase, // All persistent federates have joined.
shutdown_phase // Federation is shutting down.
} federation_life_cycle_phase;

/**
* @brief The type for an element in a delayed grants priority queue that is sorted by tag.
Expand Down Expand Up @@ -391,7 +395,7 @@ void* federate_info_thread_TCP(void* fed);
* @param socket_id Pointer to the socket ID.
* @param error_code An error code.
*/
void send_reject(int* socket_id, unsigned char error_code);
void send_reject(int* socket_id, rejection_code_t error_code);

/**
* Wait for one incoming connection request from each (persistent) federate,
Expand All @@ -409,11 +413,6 @@ void lf_connect_to_persistent_federates(int socket_descriptor);
*/
void* lf_connect_to_transient_federates_thread(void* nothing);

/**
* Thread that manages the delayed grants using a priprity queue.
*/
void* lf_delayed_grants_thread(void* nothing);

/**
* Thread to respond to new connections, which could be federates of other
* federations who are attempting to join the wrong federation.
Expand Down
213 changes: 137 additions & 76 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ extern interval_t _lf_action_delay_table[];
extern size_t _lf_action_table_size;
extern lf_action_base_t* _lf_zero_delay_cycle_action_table[];
extern size_t _lf_zero_delay_cycle_action_table_size;
extern uint16_t _lf_zero_delay_cycle_upstream_ids[];
extern bool _lf_zero_delay_cycle_upstream_disconnected[];
extern reaction_t* network_input_reactions[];
extern size_t num_network_input_reactions;
extern reaction_t* port_absent_reaction[];
Expand All @@ -194,7 +196,7 @@ static lf_action_base_t* action_for_port(int port_id) {

/**
* Update the last known status tag of all network input ports
* to the value of `tag`, unless that the provided `tag` is less
* to the value of `tag`, unless the provided `tag` is less
* than the last_known_status_tag of the port. This is called when
* a TAG signal is received from the RTI in centralized coordination.
* If any update occurs, then this broadcasts on `lf_port_status_changed`.
Expand Down Expand Up @@ -260,7 +262,7 @@ static void update_last_known_status_on_input_ports(tag_t tag, environment_t* en
*
* @param env The top-level environment, whose mutex is assumed to be held.
* @param tag The tag on which the latest status of the specified network input port is known.
* @param portID The port ID.
* @param port_id The port ID.
*/
static void update_last_known_status_on_input_port(environment_t* env, tag_t tag, int port_id) {
if (lf_tag_compare(tag, env->current_tag) < 0)
Expand Down Expand Up @@ -322,13 +324,41 @@ static void mark_inputs_known_absent(int fed_id) {
}

/**
* Set the status of network port with id portID.
* @brief Update the last known status tag of a network input action.
*
* This function is similar to update_last_known_status_on_input_port, but
* it is called when a PTAG is granted and an upstream transient federate is not
* connected. It updates the last known status tag of the network input action
* so that it will not wait for a message or absent message from the upstream federate.
*
* This function assumes the caller holds the mutex on the top-level environment,
* and, if the tag actually increases, it broadcasts on `lf_port_status_changed`.
*
* @param portID The network port ID
* @param env The top-level environment, whose mutex is assumed to be held.
* @param action The action associated with the network input port.
* @param tag The tag of the PTAG.
*/
static void update_last_known_status_on_action(environment_t* env, lf_action_base_t* action, tag_t tag) {
if (lf_tag_compare(tag, env->current_tag) < 0)
tag = env->current_tag;
trigger_t* input_port_trigger = action->trigger;
if (lf_tag_compare(tag, input_port_trigger->last_known_status_tag) > 0) {
LF_PRINT_LOG("Updating the last known status tag of port for upstream absent transient federate from " PRINTF_TAG
" to " PRINTF_TAG ".",
input_port_trigger->last_known_status_tag.time - lf_time_start(),
input_port_trigger->last_known_status_tag.microstep, tag.time - lf_time_start(), tag.microstep);
input_port_trigger->last_known_status_tag = tag;
}
}

/**
* Set the status of network port with id port_id.
*
* @param port_id The network port ID
* @param status The network port status (port_status_t)
*/
static void set_network_port_status(int portID, port_status_t status) {
lf_action_base_t* network_input_port_action = action_for_port(portID);
static void set_network_port_status(int port_id, port_status_t status) {
lf_action_base_t* network_input_port_action = action_for_port(port_id);
network_input_port_action->trigger->status = status;
}

Expand Down Expand Up @@ -720,7 +750,7 @@ static int handle_port_absent_message(int* socket, int fed_id) {
tracepoint_federate_from_federate(receive_PORT_ABS, _lf_my_fed_id, fed_id, &intended_tag);
}
LF_PRINT_LOG("Handling port absent for tag " PRINTF_TAG " for port %hu of fed %d.",
intended_tag.time - lf_time_start(), intended_tag.microstep, port_id, fed_id);
intended_tag.time - lf_time_start(), intended_tag.microstep, port_id, _lf_my_fed_id);

// Environment is always the one corresponding to the top-level scheduling enclave.
environment_t* env;
Expand Down Expand Up @@ -992,17 +1022,34 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) {
size_t buffer_length = MSG_TYPE_TIMESTAMP_START_LENGTH;
unsigned char buffer[buffer_length];

read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, buffer_length, buffer, NULL,
"Failed to read MSG_TYPE_TIMESTAMP_START message from RTI.");
LF_PRINT_DEBUG("Read 21 bytes.");

// First byte received is the message ID.
if (buffer[0] != MSG_TYPE_TIMESTAMP_START) {
if (buffer[0] == MSG_TYPE_FAILED) {
lf_print_error_and_exit("RTI has failed.");
while (true) {
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, 1, buffer, NULL,
"Failed to read MSG_TYPE_TIMESTAMP_START message from RTI.");
// First byte received is the message ID.
if (buffer[0] != MSG_TYPE_TIMESTAMP_START) {
if (buffer[0] == MSG_TYPE_FAILED) {
lf_print_error_and_exit("RTI has failed.");
} else if (buffer[0] == MSG_TYPE_UPSTREAM_CONNECTED) {
// We need to swallow this message so that we continue waiting for MSG_TYPE_TIMESTAMP_START to arrive
// FIXME: Shouldn't we keep the ids, so that these messages are handled right after the startime is set?
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, MSG_TYPE_UPSTREAM_CONNECTED_LENGTH - 1, buffer + 1, NULL,
"Failed to complete reading MSG_TYPE_UPSTREAM_CONNECTED.");
continue;
} else if (buffer[0] == MSG_TYPE_UPSTREAM_DISCONNECTED) {
// We need to swallow this message so that we continue waiting for MSG_TYPE_TIMESTAMP_START to arrive
// FIXME: Shouldn't we keep the ids, so that these messages are handled right after the startime is set?
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH - 1, buffer + 1,
NULL, "Failed to complete reading MSG_TYPE_UPSTREAM_DISCONNECTED.");
continue;
} else {
lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP_START message from the RTI. Got %u (see net_common.h).",
buffer[0]);
}
} else {
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, buffer_length - 1, buffer + 1, NULL,
"Failed to read MSG_TYPE_TIMESTAMP_START message from RTI.");
break;
}
lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP_START message from the RTI. Got %u (see net_common.h).",
buffer[0]);
}

instant_t timestamp = extract_int64(&(buffer[1]));
Expand All @@ -1027,7 +1074,7 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) {
* a notification of this update, which may unblock whichever worker
* thread is trying to advance time.
*
* @note This function is very similar to handle_provisinal_tag_advance_grant() except that
* @note This function is very similar to handle_provisional_tag_advance_grant() except that
* it sets last_TAG_was_provisional to false.
*/
static void handle_tag_advance_grant(void) {
Expand Down Expand Up @@ -1269,7 +1316,8 @@ static void* update_ports_from_staa_offsets(void* args) {
*
* @note This function is similar to handle_tag_advance_grant() except that
* it sets last_TAG_was_provisional to true and also it does not update the
* last known tag for input ports.
* last known tag for input ports unless there is an upstream federate that is
* disconnected.
*/
static void handle_provisional_tag_advance_grant() {
// Environment is always the one corresponding to the top-level scheduling enclave.
Expand Down Expand Up @@ -1306,6 +1354,12 @@ static void handle_provisional_tag_advance_grant() {
env->current_tag.time - start_time, env->current_tag.microstep, _fed.last_TAG.time - start_time,
_fed.last_TAG.microstep);

for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) {
if (_lf_zero_delay_cycle_upstream_disconnected[i]) {
update_last_known_status_on_action(env, _lf_zero_delay_cycle_action_table[i], PTAG);
}
}

// Even if we don't modify the event queue, we need to broadcast a change
// because we do not need to continue to wait for a TAG.
lf_cond_broadcast(&env->event_q_changed);
Expand Down Expand Up @@ -1542,6 +1596,44 @@ static void send_failed_signal() {
*/
static void handle_rti_failed_message(void) { exit(1); }

/**
* @brief Handle message from the RTI that an upstream federate has connected.
*
*/
static void handle_upstream_connected_message(void) {
size_t bytes_to_read = sizeof(uint16_t);
unsigned char buffer[bytes_to_read];
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL,
"Failed to read upstream connected message from RTI.");
uint16_t connected = extract_uint16(buffer);
LF_PRINT_DEBUG("Received notification that upstream federate %d has connected", connected);
// Mark the upstream as connected.
for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) {
if (_lf_zero_delay_cycle_upstream_ids[i] == connected) {
_lf_zero_delay_cycle_upstream_disconnected[i] = false;
}
}
}

/**
* @brief Handle message from the RTI that an upstream federate has disconnected.
*
*/
static void handle_upstream_disconnected_message(void) {
size_t bytes_to_read = sizeof(uint16_t);
unsigned char buffer[bytes_to_read];
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL,
"Failed to read upstream disconnected message from RTI.");
uint16_t disconnected = extract_uint16(buffer);
LF_PRINT_DEBUG("Received notification that upstream federate %d has disconnected", disconnected);
// Mark the upstream as disconnected.
for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) {
if (_lf_zero_delay_cycle_upstream_ids[i] == disconnected) {
_lf_zero_delay_cycle_upstream_disconnected[i] = true;
}
}
}

/**
* Thread that listens for TCP inputs from the RTI.
* When messages arrive, this calls the appropriate handler.
Expand Down Expand Up @@ -1618,6 +1710,12 @@ static void* listen_to_rti_TCP(void* args) {
case MSG_TYPE_FAILED:
handle_rti_failed_message();
break;
case MSG_TYPE_UPSTREAM_CONNECTED:
handle_upstream_connected_message();
break;
case MSG_TYPE_UPSTREAM_DISCONNECTED:
handle_upstream_disconnected_message();
break;
case MSG_TYPE_CLOCK_SYNC_T1:
case MSG_TYPE_CLOCK_SYNC_T4:
lf_print_error("Federate %d received unexpected clock sync message from RTI on TCP socket.", _lf_my_fed_id);
Expand Down Expand Up @@ -2049,8 +2147,12 @@ void lf_connect_to_rti(const char* hostname, int port) {
} else if (response == MSG_TYPE_RESIGN) {
lf_print_warning("RTI on port %d resigned. Will try again", uport);
continue;
} else if (response == MSG_TYPE_UPSTREAM_CONNECTED) {
handle_upstream_connected_message();
} else if (response == MSG_TYPE_UPSTREAM_DISCONNECTED) {
handle_upstream_disconnected_message();
} else {
lf_print_warning("RTI on port %d gave unexpect response %u. Will try again", uport, response);
lf_print_warning("RTI on port %d gave unexpected response %u. Will try again", uport, response);
continue;
}
}
Expand Down Expand Up @@ -2778,13 +2880,20 @@ bool lf_update_max_level(tag_t tag, bool is_provisional) {
_lf_action_delay_table[i])) <= 0)) {
continue;
}
#else
// For centralized coordination, if there is an upstream transient federate that is not
// connected, then we don't want to block on its action.
if (_lf_zero_delay_cycle_upstream_disconnected[i]) {
// Mark the action known up to and including the current tag. It is absent.
update_last_known_status_on_action(env, input_port_action, env->current_tag);
}
#endif // FEDERATED_DECENTRALIZED
// If the current tag is greater than the last known status tag of the input port,
// and the input port is not physical, then block on that port by ensuring
// the MLAA is no greater than the level of that port.
// For centralized coordination, this is applied only to input ports coming from
// federates that are in a ZDC. For decentralized coordination, this is applied
// to all input ports.
// If the current tag is greater than the last known status tag of the input port,
// and the input port is not physical, then block on that port by ensuring
// the MLAA is no greater than the level of that port.
// For centralized coordination, this is applied only to input ports coming from
// federates that are in a ZDC. For decentralized coordination, this is applied
// to all input ports.
if (lf_tag_compare(env->current_tag, input_port_action->trigger->last_known_status_tag) > 0 &&
!input_port_action->trigger->is_physical) {
max_level_allowed_to_advance =
Expand All @@ -2809,8 +2918,8 @@ void lf_stop() {

lf_set_stop_tag(&env[i], new_stop_tag);

lf_print("Setting the stop tag of env %d to " PRINTF_TAG ".", i, env[i].stop_tag.time - start_time,
env[i].stop_tag.microstep);
LF_PRINT_LOG("Setting the stop tag of env %d to " PRINTF_TAG ".", i, env[i].stop_tag.time - start_time,
env[i].stop_tag.microstep);

if (env[i].barrier.requestors)
_lf_decrement_tag_barrier_locked(&env[i]);
Expand All @@ -2820,54 +2929,6 @@ void lf_stop() {
LF_PRINT_LOG("Federate is stopping.");
}

char* lf_get_federates_bin_directory() {
bool bin_directory_defined = false;
#ifdef LF_FEDERATES_BIN_DIRECTORY
bin_directory_defined = true;
#endif
if (bin_directory_defined) {
return (LF_FEDERATES_BIN_DIRECTORY);
}
return NULL;
}

const char* lf_get_federation_id() { return federation_metadata.federation_id; }

void lf_stop() {
environment_t* env;
int num_env = _lf_get_environments(&env);

for (int i = 0; i < num_env; i++) {
LF_MUTEX_LOCK(&env[i].mutex);

tag_t new_stop_tag;
new_stop_tag.time = env[i].current_tag.time;
new_stop_tag.microstep = env[i].current_tag.microstep + 1;

lf_set_stop_tag(&env[i], new_stop_tag);

lf_print("Setting the stop tag of env %d to " PRINTF_TAG ".", i, env[i].stop_tag.time - start_time,
env[i].stop_tag.microstep);

if (env[i].barrier.requestors)
_lf_decrement_tag_barrier_locked(&env[i]);
lf_cond_broadcast(&env[i].event_q_changed);
LF_MUTEX_UNLOCK(&env[i].mutex);
}
LF_PRINT_LOG("Federate is stopping.");
}

char* lf_get_federates_bin_directory() {
bool bin_directory_defined = false;
#ifdef LF_FEDERATES_BIN_DIRECTORY
bin_directory_defined = true;
#endif
if (bin_directory_defined) {
return (LF_FEDERATES_BIN_DIRECTORY);
}
return NULL;
}

const char* lf_get_federation_id() { return federation_metadata.federation_id; }

#ifdef FEDERATED_DECENTRALIZED
Expand Down
11 changes: 11 additions & 0 deletions core/utils/pqueue_tag.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,14 @@ void pqueue_tag_remove_up_to(pqueue_tag_t* q, tag_t t) {
}

void pqueue_tag_dump(pqueue_tag_t* q) { pqueue_dump((pqueue_t*)q, pqueue_tag_print_element); }

tag_t pqueue_tag_max_tag(pqueue_tag_t* q) {
tag_t result = NEVER_TAG;
for (size_t i = 1; i < q->size; i++) {
pqueue_tag_element_t* element = (pqueue_tag_element_t*)(q->d[i]);
if (lf_tag_compare(element->tag, result) > 0) {
result = element->tag;
}
}
return result;
}
Loading
Loading