Skip to content

Commit

Permalink
Merge pull request #224 from mwvgroup/u/ch/rubin
Browse files Browse the repository at this point in the history
Participate in Rubin-Broker Integration Exercise (May 2024)
  • Loading branch information
hernandezc1 authored May 28, 2024
2 parents b6bbc69 + a2fb705 commit 60f8a63
Show file tree
Hide file tree
Showing 11 changed files with 829 additions and 336 deletions.
351 changes: 21 additions & 330 deletions broker/consumer/rubin/README.md
Original file line number Diff line number Diff line change
@@ -1,344 +1,35 @@
# Connect Pitt-Google to the Rubin alert stream testing deployment
# Start the Rubin consumer VM

December 2021 - Author: Troy Raen
See `Pitt-Google-Broker/broker/setup_broker/rubin/README.md` for setup instructions.

- [Overview](#overview)
- [Setup](#setup)
- [Ingest the Rubin test stream](#ingest-the-rubin-test-stream)
- [Pull a Pub/Sub message and open it](#pull-a-pubsub-message-and-open-it)
- [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema)

## Overview

Details and access credentials were sent to us by Eric Bellm via email.
Spencer Nelson provided some additional details specific to our Kafka Connect consumer.
Here are some links they gave us for reference which were used to set this up:

- [Rubin sample alerts: obtaining the data with Kafka](https://github.com/lsst-dm/sample_alert_info#obtaining-the-data-with-kafka)
- [Rubin Alert Stream Integration Endpoint](https://github.com/lsst-dm/sample_alert_info/blob/main/doc/alert_stream_integration_endpoint.md)
- Schemas are stored at: <https://alert-schemas-int.lsst.cloud/>
- [Using schema registry with Kafka Connect](https://docs.confluent.io/platform/7.0.1/schema-registry/connect.html).
Spencer says, "Our stream uses Avro for the message values, not keys (we
don't set the key to anything in particular), so you probably want the
`value.converter` properties."
- Tools and libraries for VOEvents:
<https://wiki.ivoa.net/twiki/bin/view/IVOA/IvoaVOEvent#Tools_and_Libraries>
- [Rubin example: java console consumer](https://github.com/lsst-dm/sample_alert_info/tree/main/examples/alert_stream_integration_endpoint/java_console_consumer)

Rubin alert packets will be Avro serialized, but the schema will not be included with the packet.
There are several ways to handle this.
For now, I have simply passed the alert bytes straight through from Kafka to Pub/Sub and deserialized
alerts after pulling from the Pub/Sub stream.
For other methods, see
[Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below.

Below is the code I used to set up the necessary resources in GCP, ingest the Rubin stream, pull
messages from the resulting Pub/Sub stream and deserialize the alerts.

## Setup

The following assumes you have set the environment variables
`GOOGLE_CLOUD_PROJECT` and `GOOGLE_APPLICATION_CREDENTIALS`
to appropriate values for your GCP project and service account credentials, and that
the service account is authenticated to make `gcloud` calls through the project.
You may want to
[activate a service account for `gcloud` calls](https://pitt-broker.readthedocs.io/en/u-tjr-workingnotes/working-notes/troyraen/service-account.html#switch-the-service-account-your-api-calls-use)
or
[set up a GCP project from scratch](https://pitt-broker.readthedocs.io/en/latest/broker/run-a-broker-instance/initial-setup.html#setup-local-environment).

Clone the repo and cd into the directory:

```bash
git clone https://github.com/mwvgroup/Pitt-Google-Broker.git
cd Pitt-Google-Broker
```

Define variables used below in multiple calls.
The `KAFKA_USERNAME` and `KAFKA_PASSWORD` must be customized
To start the consumer VM:

```bash
PROJECT_ID="${GOOGLE_CLOUD_PROJECT}"
# For reference, I ran this with:
# PROJECT_ID="avid-heading-329016" # project name: pitt-google-broker-testing
survey="rubin"
broker_bucket="${PROJECT_ID}-${survey}-broker_files"
consumerVM="${survey}-consumer"
firewallrule="tcpport9094"

# Kafka credentials for the Rubin stream
KAFKA_USERNAME="pittgoogle-idfint" # set to correct username
KAFKA_PASSWORD="" # set to correct password

PUBSUB_TOPIC="rubin-alerts"
PUBSUB_SUBSCRIPTION="${PUBSUB_TOPIC}"
KAFKA_TOPIC="alerts-simulated"
```

Setup resources on Google Cloud Platform.

```bash
# Create a firewall rule to open port 9094 (only needs to be done once, per project)
gcloud compute firewall-rules create "${firewallrule}" \
--allow=tcp:9094 \
--description="Allow incoming traffic on TCP port 9094" \
--direction=INGRESS \
--enable-logging

# Create a Cloud Storage bucket to store the consumer config files
gsutil mb "gs://${broker_bucket}"

# Upload the install script and config files for the consumer
o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs
gsutil -m -o "$o" cp -r broker/consumer "gs://${broker_bucket}"

# Create a Pub/Sub topic and subscription for Rubin alerts
gcloud pubsub topics create "${PUBSUB_TOPIC}"
gcloud pubsub subscriptions create "${PUBSUB_SUBSCRIPTION}" --topic="${PUBSUB_TOPIC}"

# Create a Rubin Consumer VM
testid="mytest"
consumerVM="${survey}-consumer-${testid}"
zone="us-central1-a"
machinetype="e2-standard-2"
installscript="gs://${broker_bucket}/consumer/vm_install.sh"
gcloud compute instances create "${consumerVM}" \
--zone="${zone}" \
--machine-type="${machinetype}" \
--scopes=cloud-platform \
--metadata=google-logging-enabled=true,startup-script-url="${installscript}" \
--tags="${firewallrule}"
```

## Ingest the Rubin test stream

### Setup Consumer VM

```bash
# start the consumer vm and ssh in
gcloud compute instances start "${consumerVM}"
gcloud compute ssh "${consumerVM}"

# define some variables
brokerdir=/home/broker # user's home dir on this machine
workingdir="${brokerdir}/consumer/rubin" # consumer's working dir on this machine

# We will also need the variables defined at the top of this document.
# Go back up to the "Setup" section and define the variables given
# in the code block under "Define variables...", in your environment.
```

### Test the connection

#### Check available Kafka topics

```bash
/bin/kafka-topics \
--bootstrap-server alert-stream-int.lsst.cloud:9094 \
--list \
--command-config "${workingdir}/admin.properties"
# should see output that includes the topic: alerts-simulated
```

#### Test the topic connection using the Kafka Console Consumer

Set Java env variable

```bash
export JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64"
```

Make a file called 'consumer.properties' and fill it with this
(change `KAFKA_PASSWORD` to the appropriate value):

```bash
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="pittgoogle-idfint"\
password="KAFKA_PASSWORD";
```

Run the Kafka console consumer

```bash
sudo /bin/kafka-avro-console-consumer \
--bootstrap-server alert-stream-int.lsst.cloud:9094 \
--group "${KAFKA_USERNAME}-example-javaconsole" \
--topic "${KAFKA_TOPIC}" \
--property schema.registry.url=https://alert-schemas-int.lsst.cloud \
--consumer.config consumer.properties \
--timeout-ms=60000
# if successful, you will see a lot of JSON flood the terminal
```

### Run the Kafka -> Pub/Sub connector

Setup:

```bash
# download the config files from broker_bucket
sudo mkdir "${brokerdir}"
sudo gsutil -m cp -r "gs://${broker_bucket}/consumer" "${brokerdir}"

# set the password in two of the config files
sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/admin.properties"
sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/psconnect-worker.properties"

# replace topic and project configs in ps-connector.properties
fconfig="${workingdir}/ps-connector.properties"
sudo sed -i "s/PROJECT_ID/${PROJECT_ID}/g" ${fconfig}
sudo sed -i "s/PUBSUB_TOPIC/${PUBSUB_TOPIC}/g" ${fconfig}
sudo sed -i "s/KAFKA_TOPIC/${KAFKA_TOPIC}/g" ${fconfig}
```

Run the connector:

```bash
mydir="/home/troyraen" # use my dir because don't have permission to write to workingdir
fout_run="${mydir}/run-connector.out"
sudo /bin/connect-standalone \
${workingdir}/psconnect-worker.properties \
${workingdir}/ps-connector.properties \
&> ${fout_run}
```

## Pull a Pub/Sub message and open it

In the future, we should download schemas from the Confluent Schema Registry and store them.
Then for each alert, check the schema version in the Confluent Wire header, and load the schema file using `fastavro`.
See [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below.

For now, use the schema in the `lsst-alert-packet` library. Install the library:

```bash
pip install lsst-alert-packet
```

Following the deserialization example at
<https://github.com/lsst-dm/alert_stream/blob/main/python/lsst/alert/stream/serialization.py>

```python
import io
import fastavro
from google.cloud import pubsub_v1
from lsst.alert.packet import Schema

# pull a message
project_id = "avid-heading-329016"
subscription_name = "rubin-alerts"
max_messages = 5

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)
request = {
"subscription": subscription_path,
"max_messages": max_messages,
}

response = subscriber.pull(**request)

# load the schema
latest_schema = Schema.from_file().definition
# Set the VM metadata
KAFKA_TOPIC="alerts-simulated"
PS_TOPIC="${survey}-alerts-${testid}"
gcloud compute instances add-metadata "${consumerVM}" --zone "${zone}" \
--metadata="PS_TOPIC_FORCE=${PS_TOPIC},KAFKA_TOPIC_FORCE=${KAFKA_TOPIC}"

# deserialize the alerts.
# This follows the deserialization example at
# https://github.com/lsst-dm/alert_stream/blob/main/python/lsst/alert/stream/serialization.py
for received_message in response.received_messages:
alert_bytes = received_message.message.data
# header_bytes = alert_bytes[:5]
# schema_version = deserialize_confluent_wire_header(header_bytes)
content_bytes = io.BytesIO(alert_bytes[5:])
alert_dict = fastavro.schemaless_reader(content_bytes, latest_schema)
alertId = alert_dict['alertId']
diaSourceId = alert_dict['diaSource']['diaSourceId']
psFlux = alert_dict['diaSource']['psFlux']
print(f"alertId: {alertId}, diaSourceId: {diaSourceId}, psFlux: {psFlux}")
# Start the VM
gcloud compute instances start ${consumerVM} --zone ${zone}
# this launches the startup script which configures and starts the
# Kafka -> Pub/Sub connector
```

## Alternative methods for handling the schema

### Download with a `GET` request, and read the alert's schema version from the Confluent Wire header

In the future, we should download schemas from the Confluent Schema Registry and store them
(assuming we do not use the schema registry directly in the Kafka connector).
Then for each alert, check the schema version in the Confluent Wire header, and load the schema
file using `fastavro`.

Recommendation from Spencer Nelson:

> You might want to look at how Rubin's alert database ingester works. It does the same steps of
> deserializing alert packets, but uses the schema registry instead of lsst.alert.packet:
>
> <https://github.com/lsst-dm/alert_database_ingester/blob/main/alertingest/ingester.py#L192-L209>
> <https://github.com/lsst-dm/alert_database_ingester/blob/main/alertingest/schema_registry.py>
Pub/Sub topics can be configured with an Avro schema attached, but it cannot be changed once attached.
We would have to create a new topic for every schema version.
Therefore, I don't think we should do it this way.

#### Download a schema from the Confluent Schema Registry using a `GET` request
To stop stop the consumer VM:

```bash
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=$KAFKA_USERNAME:$KAFKA_PASSWORD
SCHEMA_REGISTRY_URL="https://alert-schemas-int.lsst.cloud"
schema_version=1
fout_rubinschema="rubinschema_v${schema_version}.avsc"

# get list of schema subjects
curl --silent -X GET -u "${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" "${SCHEMA_REGISTRY_URL}/subjects"
# download a particular schema
curl --silent -X GET -u \
"${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" \
"${SCHEMA_REGISTRY_URL}/schemas/ids/${schema_version}" \
> "${fout_rubinschema}"
```

#### Read the alert's schema version from the Confluent Wire header

The following is copied from
<https://github.com/lsst-dm/alert_stream/blob/main/python/lsst/alert/stream/serialization.py>

```python
import struct

_ConfluentWireFormatHeader = struct.Struct(">bi")

def deserialize_confluent_wire_header(raw):
"""Parses the byte prefix for Confluent Wire Format-style Kafka messages.
Parameters
----------
raw : `bytes`
The 5-byte encoded message prefix.
Returns
-------
schema_version : `int`
A version number which indicates the Confluent Schema Registry ID
number of the Avro schema used to encode the message that follows this
header.
"""
_, version = _ConfluentWireFormatHeader.unpack(raw)
return version

header_bytes = alert_bytes[:5]
schema_version = deserialize_confluent_wire_header(header_bytes)
```

### Use the Confluent Schema Registry with the Kafka Connector

Kafka Connect can use the Confluent Schema Registry directly.
But schemas are stored under subjects and Kafka Connect is picky about how those
subjects are named.
See
<https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy>
**Rubin has set the schema subject name to “alert-packet”**, which does not conform
to any of the name strategies that Kafka Connect uses.
I did not find a workaround for this issue.
Instead, I passed the alert bytes straight through into Pub/Sub and deserialized
them after pulling the messages from Pub/Sub.

If you want to try this in the future, set the following configs in the connector's psconnect-worker.properties file.
survey="rubin"
testid="mytest"
consumerVM="${survey}-consumer-${testid}"
zone="us-central1-a"

```bash
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=https://alert-schemas-int.lsst.cloud
value.converter.enhanced.avro.schema.support=true
# Stop the VM
gcloud compute instances stop ${consumerVM} --zone ${zone}
```
5 changes: 2 additions & 3 deletions broker/consumer/rubin/admin.properties
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# see https://kafka.apache.org/documentation/#adminclientconfigs

bootstrap.servers=alert-stream-int.lsst.cloud:9094
bootstrap.servers=usdf-alert-stream-dev.lsst.cloud:9094
sasl.mechanism=SCRAM-SHA-512
sasl.kerberos.service.name=kafka
security.protocol=SASL_SSL
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="pittgoogle-idfint"\
password="KAFKA_PASSWORD";
Loading

0 comments on commit 60f8a63

Please sign in to comment.