-
Notifications
You must be signed in to change notification settings - Fork 3
FastAPI Quickstart
Alon Gubkin edited this page Jun 8, 2022
·
1 revision
This guide will explain how to stream inferences from any FastAPI server to a parquet file on S3.
- Kubernetes cluster
- Kafka - with Schema Registry, Kafka Connect, and Confluent S3 Sink connector plugin
To get started as quickly as possible, see the Kafka deployment tutorial, which shows how to set up Kafka in minutes.
First, you'll need to install the aiokafka library:
pip install aiokafka
Initialize a new Kafka producer. With FastAPI, it looks like this:
aioproducer = None
@app.on_event("startup")
async def startup_event():
global aioproducer
aioproducer = AIOKafkaProducer(bootstrap_servers="kafka-cp-kafka:9092")
await aioproducer.start()
@app.on_event("shutdown")
async def shutdown_event():
await aioproducer.stop()
Then, whenever you have a new prediction you can publish it to a Kafka topic:
@app.post("/predict")
async def predict(request: PredictRequest):
...
await aioproducer.send("my-model", json.dumps({
"id": str(uuid.uuid4()),
"model_name": "my model",
"model_version": "v1",
"inputs": [{
"age": 38,
"previously_insured": True,
}],
"outputs": [{
"will_buy_insurance": True,
"confidence": 0.98,
}],
}).encode("ascii"))
Finally, we can create an InferenceLogger resource to stream the predictions to S3 using InferenceDB:
apiVersion: inferencedb.aporia.com/v1alpha1
kind: InferenceLogger
metadata:
name: my-model
namespace: default
spec:
topic: my-model
events:
type: json
config: {}
destination:
type: confluent-s3
config:
url: s3://aporia-data/inferencedb
format: parquet
# Optional - Only if you want to override column names
# schema:
# type: avro
# config:
# columnNames:
# inputs: [sepal_width, petal_width, sepal_length, petal_length]
# outputs: [flower]
If everything was configured correctly, these predictions should have been logged to a Parquet file in S3.
import pandas as pd
df = pd.read_parquet("s3://aporia-data/inferencedb/default-my-model/")
print(df)