diff --git a/config/samples/prequal/prequal_client.appnet b/config/samples/prequal/prequal_client.appnet index fb5d847..553bd5c 100644 --- a/config/samples/prequal/prequal_client.appnet +++ b/config/samples/prequal/prequal_client.appnet @@ -1,68 +1,63 @@ state: - QRIF: float - num_choices: int + RIF_map: Map + QRIF: float // Quantile value to separate hot and cold backends based on RIF. + num_choices: int // Number of replicas to sample from the available backends. init(): - QRIF = 0.8 - num_choices = 3 + QRIF = 0.8 // Initialize QRIF to 0.8, indicating the cutoff for hot backends is at the 80th percentile. + num_choices = 3 // Initialize the number of backends to sample to 3. req(rpc): - # Choose a subset of replicas - backends = get_backends() - sampled_backends = ramdom_choices(num_choices, 3) - - # Get their load and RIF information and store it in a list - sorted_backend_info = sort(map(get_backend_info(), sampled_backends), RIF) - index = int(len(sorted_backend_info)*QRIF) - - # (hot-cold lexicographic) Divide it to hot and cold backend - cold_backend = slice(sorted_backend_info, 0, index) - hot_backend = slice(sorted_backend_info, index, len(sorted_backend_info)) - - # if all probes are hot, then the one with lowest RIF is chosen. - # Otherwise, the cold probe with the lowest latency is chosen - match len(coldBackend) == 0: - true => - selected = get(hot_backend, 0) - false => - selected = min(cold_backend, latency) - - set(rpc, dst, selected) - send(rpc, down) - - -resp(rpc): - send(rpc, up) - - -state: - QRIF: float - num_choices: int - -init(): - QRIF = 0.8 - num_choices = 3 - -req(rpc): - # Choose a subset of replicas - backends = get_backends() - sampled_backends = ramdom_choices(backends, num_choices) - - # Get their load and RIF information and store it in a list - # get_backend_info is a built-in function that receives backend name and returns a dictionary: {"backend": xxx, "latency": xxx, "RIF": xxx} - sorted_backend_info = sort(map(get_backend_info, sampled_backends), "RIF") - index = int(len(sorted_backend_info) * QRIF) - - match index == 0: - true => - # all probes are hot - selected = get(array_get(sorted_backend_info, 0), "backend") - false => - # cold probe with the lowest latency is chosen - selected = get(array_min(sorted_backend_info, 0, index, "latency"), "backend") + // Choose a subset of replicas + backends = get_backends() // Retrieve the list of available backends. + // Randomly select a subset of backends of size `num_choices`. + sampled_backends = random_choices(backends, num_choices) + + RIF_distribution = estimate_RIF_distribution(sampled_backends) + hot_threshold = quantile(RIF_distribution, QRIF) + + hot_backends = [] + cold_backends = [] + + foreach(sampled_backends, lambda(backend): + RIF = get(RIF_map, backend) + match(RIF > hot_threshold): + true => + set(hot_backends, size(hot_backends), (backend, RIF)) + false => + set(cold_backends, size(cold_backends), (backend, latency)) + ) + + selected = 0 - set(rpc, dst, selected) - send(rpc, down) + match(len(hot_backend)==0): + true => + min_latency = inf + foreach(cold_backends, lambda(cold_backend): + backend, latency = cold_backend + match(RIF < min_RIF): + true => + selected = backend + min_latency = latency + false => + pass + ) + + false => + min_RIF = inf + foreach(hot_backends, lambda(hot_backend): + backend, RIF = hot_backend + match(RIF < min_RIF): + true => + selected = backend + min_RIF = RIF + false => + pass + ) + + + set(rpc, dst, selected) // Set the selected backend as the destination for the RPC. + send(rpc, down) // Send the RPC downstream to the selected backend. resp(rpc): - send(rpc, up) \ No newline at end of file + send(rpc, up) // Forward the response upstream. diff --git a/config/samples/prequal/prequal_server.appnet b/config/samples/prequal/prequal_server.appnet index 8fd67e0..23c7f77 100644 --- a/config/samples/prequal/prequal_server.appnet +++ b/config/samples/prequal/prequal_server.appnet @@ -1,19 +1,31 @@ state: - outstanding_req_count: int - request_map + request_in_flight: int // Tracks the number of requests currently in flight. + request_map: Map> // Maps an RPC ID to a tuple containing the request's in-flight count at arrival and its start time. + latency_map: Map> // Maps an in-flight count to a vector of latency values for requests at that count. init(): - outstanding_req_count = 0 + request_in_flight = 0 // Initialize the in-flight request count to zero. req(rpc): - outstanding_req_count = outstanding_req_count + 1 - rpc_id = get(rpc, "id") - set(request_map, rpc_id, current_time()) - send(rpc, up) + request_in_flight = request_in_flight + 1 // Increment the in-flight request count when a new request is received. + + rpc_id = get(rpc, "id") // Extract the unique ID of the RPC. + // Store the current in-flight count and the current time as the start time for the RPC ID. + set(request_map, rpc_id, (request_in_flight, current_time())) + + send(rpc, up) // Send the RPC upstream for further processing. resp(rpc): - outstanding_req_count = outstanding_req_count - 1 - rpc_id = get(rpc, "id") - latency = time_diff(current_time(), get(request_map, rpc_id)) - send(rpc, down) + request_in_flight = request_in_flight - 1 // Decrement the in-flight request count when a response is received. + + rpc_id = get(rpc, "id") // Extract the unique ID of the RPC. + // Retrieve the in-flight count and start time associated with the RPC ID. + (arrival_rif, start_time) = get(request_map, rpc_id) + // Calculate the latency as the difference between the current time and the start time. + latency = time_diff(current_time(), start_time) + // Retrieve the vector of latency values corresponding to the in-flight count. + latency_vec = get(latency_map, arrival_rif) + // Add the calculated latency to the latency vector. + set(latency_vec, size(latency_vec), latency) + send(rpc, down) // Send the RPC downstream after processing the response.