Skip to content

Commit

Permalink
update prequal
Browse files Browse the repository at this point in the history
  • Loading branch information
Romero027 committed Dec 27, 2024
1 parent c578ed5 commit 4b636b0
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 71 deletions.
115 changes: 55 additions & 60 deletions config/samples/prequal/prequal_client.appnet
Original file line number Diff line number Diff line change
@@ -1,68 +1,63 @@
state:
QRIF: float
num_choices: int
RIF_map: Map<int, int>
QRIF: float // Quantile value to separate hot and cold backends based on RIF.
num_choices: int // Number of replicas to sample from the available backends.

init():
QRIF = 0.8
num_choices = 3
QRIF = 0.8 // Initialize QRIF to 0.8, indicating the cutoff for hot backends is at the 80th percentile.
num_choices = 3 // Initialize the number of backends to sample to 3.

req(rpc):
# Choose a subset of replicas
backends = get_backends()
sampled_backends = ramdom_choices(num_choices, 3)

# Get their load and RIF information and store it in a list
sorted_backend_info = sort(map(get_backend_info(), sampled_backends), RIF)
index = int(len(sorted_backend_info)*QRIF)

# (hot-cold lexicographic) Divide it to hot and cold backend
cold_backend = slice(sorted_backend_info, 0, index)
hot_backend = slice(sorted_backend_info, index, len(sorted_backend_info))

# if all probes are hot, then the one with lowest RIF is chosen.
# Otherwise, the cold probe with the lowest latency is chosen
match len(coldBackend) == 0:
true =>
selected = get(hot_backend, 0)
false =>
selected = min(cold_backend, latency)

set(rpc, dst, selected)
send(rpc, down)


resp(rpc):
send(rpc, up)


state:
QRIF: float
num_choices: int

init():
QRIF = 0.8
num_choices = 3

req(rpc):
# Choose a subset of replicas
backends = get_backends()
sampled_backends = ramdom_choices(backends, num_choices)

# Get their load and RIF information and store it in a list
# get_backend_info is a built-in function that receives backend name and returns a dictionary: {"backend": xxx, "latency": xxx, "RIF": xxx}
sorted_backend_info = sort(map(get_backend_info, sampled_backends), "RIF")
index = int(len(sorted_backend_info) * QRIF)

match index == 0:
true =>
# all probes are hot
selected = get(array_get(sorted_backend_info, 0), "backend")
false =>
# cold probe with the lowest latency is chosen
selected = get(array_min(sorted_backend_info, 0, index, "latency"), "backend")
// Choose a subset of replicas
backends = get_backends() // Retrieve the list of available backends.
// Randomly select a subset of backends of size `num_choices`.
sampled_backends = random_choices(backends, num_choices)

RIF_distribution = estimate_RIF_distribution(sampled_backends)
hot_threshold = quantile(RIF_distribution, QRIF)

hot_backends = []
cold_backends = []

foreach(sampled_backends, lambda(backend):
RIF = get(RIF_map, backend)
match(RIF > hot_threshold):
true =>
set(hot_backends, size(hot_backends), (backend, RIF))
false =>
set(cold_backends, size(cold_backends), (backend, latency))
)

selected = 0

set(rpc, dst, selected)
send(rpc, down)
match(len(hot_backend)==0):
true =>
min_latency = inf
foreach(cold_backends, lambda(cold_backend):
backend, latency = cold_backend
match(RIF < min_RIF):
true =>
selected = backend
min_latency = latency
false =>
pass
)

false =>
min_RIF = inf
foreach(hot_backends, lambda(hot_backend):
backend, RIF = hot_backend
match(RIF < min_RIF):
true =>
selected = backend
min_RIF = RIF
false =>
pass
)


set(rpc, dst, selected) // Set the selected backend as the destination for the RPC.
send(rpc, down) // Send the RPC downstream to the selected backend.

resp(rpc):
send(rpc, up)
send(rpc, up) // Forward the response upstream.
34 changes: 23 additions & 11 deletions config/samples/prequal/prequal_server.appnet
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
state:
outstanding_req_count: int
request_map<int, float>
request_in_flight: int // Tracks the number of requests currently in flight.
request_map: Map<int, <int, float>> // Maps an RPC ID to a tuple containing the request's in-flight count at arrival and its start time.
latency_map: Map<int, Vec<float>> // Maps an in-flight count to a vector of latency values for requests at that count.

init():
outstanding_req_count = 0
request_in_flight = 0 // Initialize the in-flight request count to zero.

req(rpc):
outstanding_req_count = outstanding_req_count + 1
rpc_id = get(rpc, "id")
set(request_map, rpc_id, current_time())
send(rpc, up)
request_in_flight = request_in_flight + 1 // Increment the in-flight request count when a new request is received.

rpc_id = get(rpc, "id") // Extract the unique ID of the RPC.
// Store the current in-flight count and the current time as the start time for the RPC ID.
set(request_map, rpc_id, (request_in_flight, current_time()))

send(rpc, up) // Send the RPC upstream for further processing.

resp(rpc):
outstanding_req_count = outstanding_req_count - 1
rpc_id = get(rpc, "id")
latency = time_diff(current_time(), get(request_map, rpc_id))
send(rpc, down)
request_in_flight = request_in_flight - 1 // Decrement the in-flight request count when a response is received.

rpc_id = get(rpc, "id") // Extract the unique ID of the RPC.
// Retrieve the in-flight count and start time associated with the RPC ID.
(arrival_rif, start_time) = get(request_map, rpc_id)
// Calculate the latency as the difference between the current time and the start time.
latency = time_diff(current_time(), start_time)
// Retrieve the vector of latency values corresponding to the in-flight count.
latency_vec = get(latency_map, arrival_rif)
// Add the calculated latency to the latency vector.
set(latency_vec, size(latency_vec), latency)

send(rpc, down) // Send the RPC downstream after processing the response.

0 comments on commit 4b636b0

Please sign in to comment.