Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add stream option in Benchmark #2038

Merged
merged 11 commits into from
Aug 9, 2024
75 changes: 30 additions & 45 deletions benchmark/benchmark_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,27 @@
import asyncio
import logging
import random
import time
from typing import List, Tuple, Optional

import numpy as np
from utils import get_tokenizer, sample_requests, send_request
from utils import get_tokenizer, sample_requests
from benchmark_runner import BenchmarkRunner


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

REQUEST_LATENCY: List[Tuple[int, int, float]] = []


async def benchmark(
api_url: str,
model_uid: str,
input_requests: List[Tuple[str, int, int]],
api_key: Optional[str] = None,
) -> None:
for request in input_requests:
prompt, prompt_len, output_len = request
await send_request(
api_url, model_uid, prompt, prompt_len, output_len, REQUEST_LATENCY
)
class LatencyBenchmarkRunner(BenchmarkRunner):
async def _run(self):
total_requests = len(self.input_requests)
for i, request in enumerate(self.input_requests):
await self.send_request(request)
remaining = total_requests - (i + 1)
print(
f"\rProcessed {i + 1}/{total_requests} requests, {remaining} remaining.",
end="",
)
print("")


def main(args: argparse.Namespace):
Expand All @@ -54,36 +52,17 @@ def main(args: argparse.Namespace):
input_requests = sample_requests(args.dataset, args.num_prompts, tokenizer)

logger.info("Benchmark starts.")
benchmark_start_time = time.time()

asyncio.run(
benchmark(
api_url,
model_uid,
input_requests,
api_key=args.api_key,
)
)

benchmark_end_time = time.time()
benchmark_time = benchmark_end_time - benchmark_start_time
print(f"Total time: {benchmark_time:.2f} s")
print(f"Throughput: {len(REQUEST_LATENCY) / benchmark_time:.2f} requests/s")

# Compute the latency statistics.
avg_latency = np.mean([latency for _, _, latency in REQUEST_LATENCY])
print(f"Average latency: {avg_latency:.2f} s")
avg_per_token_latency = np.mean(
[
latency / (prompt_len + output_len)
for prompt_len, output_len, latency in REQUEST_LATENCY
]
)
print(f"Average latency per token: {avg_per_token_latency:.2f} s")
avg_per_output_token_latency = np.mean(
[latency / output_len for _, output_len, latency in REQUEST_LATENCY]
benchmark = LatencyBenchmarkRunner(
api_url,
model_uid,
input_requests,
args.stream,
args.api_key,
)
print("Average latency per output token: " f"{avg_per_output_token_latency:.2f} s")
asyncio.run(benchmark.run())

benchmark.print_stats()


if __name__ == "__main__":
Expand All @@ -109,7 +88,13 @@ def main(args: argparse.Namespace):
)
parser.add_argument("--model-uid", type=str, help="Xinference model UID.")
parser.add_argument(
"--api-key", type=str, default=None, help="Authorization api key",
"--stream", action="store_true", help="Enable streaming responses."
)
parser.add_argument(
"--api-key",
type=str,
default=None,
help="Authorization api key",
)

args = parser.parse_args()
Expand Down
89 changes: 21 additions & 68 deletions benchmark/benchmark_long.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,62 +16,33 @@
import asyncio
import logging
import random
import time
from typing import List, Tuple, Optional

import numpy as np

from utils import generate_sorting_prompts, get_tokenizer, send_request
from utils import generate_sorting_prompts, get_tokenizer
from benchmark_runner import ConcurrentBenchmarkRunner


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

REQUEST_LATENCY: List[Tuple[int, int, float]] = []


class BenchmarkRunner:

def __init__(
self,
api_url: str,
model_uid: str,
input_requests: List[Tuple[str, int, int]],
concurrency: int,
api_key: Optional[str]=None,
):

self.api_url = api_url
self.model_uid = model_uid
self.input_requests = input_requests
self.concurrency = concurrency
self.sent = 0
self.left = len(input_requests)
self.api_key = api_key

async def run(self):
class LongBenchmarkRunner(ConcurrentBenchmarkRunner):
async def _run(self):
tasks = []
for i in range(0, self.concurrency):
for i in range(self.concurrency):
tasks.append(asyncio.create_task(self.worker(i)))
await asyncio.gather(*tasks)

await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

async def worker(self, i: int):
r = random.Random(i)
index = r.randint(0, len(self.input_requests) - 1)
while self.sent < len(self.input_requests):
prompt, prompt_len, output_len = self.input_requests[index]
while self.left > 0:
request = self.input_requests[index]
index += 1
self.sent += 1
index = index % len(self.input_requests)
await send_request(
self.api_url,
self.model_uid,
prompt,
prompt_len,
output_len,
REQUEST_LATENCY,
api_key=self.api_key,
)
await self.send_request(request)
self.left -= 1
# pring longer space to overwrite the previous when left decrease
print("\rdone_request, left %d " % (self.left), end="")
Expand All @@ -85,6 +56,9 @@ def main(args: argparse.Namespace):
args.concurrency = args.num_prompts
print(args)

random.seed(args.seed)
np.random.seed(args.seed)

api_url = f"http://{args.host}:{args.port}/v1/chat/completions"
model_uid = args.model_uid

Expand All @@ -97,43 +71,18 @@ def main(args: argparse.Namespace):
)

logger.info("Benchmark starts.")
benchmark_start_time = time.time()

benchmark = BenchmarkRunner(
benchmark = LongBenchmarkRunner(
api_url,
model_uid,
input_requests,
args.stream,
concurrency=args.concurrency,
api_key=args.api_key,
)
asyncio.run(benchmark.run())
benchmark_end_time = time.time()
benchmark_time = benchmark_end_time - benchmark_start_time
print(f"Total time: {benchmark_time:.2f} s")
print(f"Throughput: {args.num_prompts / benchmark_time:.2f} requests/s")

# Compute the latency statistics.
avg_latency = np.mean([latency for _, _, latency in REQUEST_LATENCY])
print(f"Average latency: {avg_latency:.2f} s")
avg_per_token_latency = np.mean(
[
latency / (prompt_len + output_len)
for prompt_len, output_len, latency in REQUEST_LATENCY
]
)
print(f"Average latency per token: {avg_per_token_latency:.2f} s")
avg_per_output_token_latency = np.mean(
[latency / output_len for _, output_len, latency in REQUEST_LATENCY]
)
print("Average latency per output token: " f"{avg_per_output_token_latency:.2f} s")
average_io_tokens = np.average(
[(prompt_len + output_len) for prompt_len, output_len, _ in REQUEST_LATENCY]
)
print(f"Average io length:" f"{average_io_tokens}")
throughput = (
sum([output_len for _, output_len, _ in REQUEST_LATENCY]) / benchmark_time
)
print(f"Throughput: {throughput} tokens/s")

benchmark.print_stats()


if __name__ == "__main__":
Expand Down Expand Up @@ -167,5 +116,9 @@ def main(args: argparse.Namespace):
parser.add_argument(
"--api-key", type=str, default=None, help="Authorization api key",
)
parser.add_argument("--seed", type=int, default=0)
parser.add_argument(
"--stream", action="store_true", help="Enable streaming responses."
)
args = parser.parse_args()
main(args)
Loading
Loading