diff --git a/broker/consumer/rubin/README.md b/broker/consumer/rubin/README.md index a9daae0b..13565e85 100644 --- a/broker/consumer/rubin/README.md +++ b/broker/consumer/rubin/README.md @@ -1,344 +1,35 @@ -# Connect Pitt-Google to the Rubin alert stream testing deployment +# Start the Rubin consumer VM -December 2021 - Author: Troy Raen +See `Pitt-Google-Broker/broker/setup_broker/rubin/README.md` for setup instructions. -- [Overview](#overview) -- [Setup](#setup) -- [Ingest the Rubin test stream](#ingest-the-rubin-test-stream) -- [Pull a Pub/Sub message and open it](#pull-a-pubsub-message-and-open-it) -- [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) - -## Overview - -Details and access credentials were sent to us by Eric Bellm via email. -Spencer Nelson provided some additional details specific to our Kafka Connect consumer. -Here are some links they gave us for reference which were used to set this up: - -- [Rubin sample alerts: obtaining the data with Kafka](https://github.com/lsst-dm/sample_alert_info#obtaining-the-data-with-kafka) -- [Rubin Alert Stream Integration Endpoint](https://github.com/lsst-dm/sample_alert_info/blob/main/doc/alert_stream_integration_endpoint.md) -- Schemas are stored at: -- [Using schema registry with Kafka Connect](https://docs.confluent.io/platform/7.0.1/schema-registry/connect.html). - Spencer says, "Our stream uses Avro for the message values, not keys (we - don't set the key to anything in particular), so you probably want the - `value.converter` properties." -- Tools and libraries for VOEvents: - -- [Rubin example: java console consumer](https://github.com/lsst-dm/sample_alert_info/tree/main/examples/alert_stream_integration_endpoint/java_console_consumer) - -Rubin alert packets will be Avro serialized, but the schema will not be included with the packet. -There are several ways to handle this. -For now, I have simply passed the alert bytes straight through from Kafka to Pub/Sub and deserialized -alerts after pulling from the Pub/Sub stream. -For other methods, see -[Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below. - -Below is the code I used to set up the necessary resources in GCP, ingest the Rubin stream, pull -messages from the resulting Pub/Sub stream and deserialize the alerts. - -## Setup - -The following assumes you have set the environment variables -`GOOGLE_CLOUD_PROJECT` and `GOOGLE_APPLICATION_CREDENTIALS` -to appropriate values for your GCP project and service account credentials, and that -the service account is authenticated to make `gcloud` calls through the project. -You may want to -[activate a service account for `gcloud` calls](https://pitt-broker.readthedocs.io/en/u-tjr-workingnotes/working-notes/troyraen/service-account.html#switch-the-service-account-your-api-calls-use) -or -[set up a GCP project from scratch](https://pitt-broker.readthedocs.io/en/latest/broker/run-a-broker-instance/initial-setup.html#setup-local-environment). - -Clone the repo and cd into the directory: - -```bash -git clone https://github.com/mwvgroup/Pitt-Google-Broker.git -cd Pitt-Google-Broker -``` - -Define variables used below in multiple calls. -The `KAFKA_USERNAME` and `KAFKA_PASSWORD` must be customized +To start the consumer VM: ```bash -PROJECT_ID="${GOOGLE_CLOUD_PROJECT}" -# For reference, I ran this with: -# PROJECT_ID="avid-heading-329016" # project name: pitt-google-broker-testing survey="rubin" -broker_bucket="${PROJECT_ID}-${survey}-broker_files" -consumerVM="${survey}-consumer" -firewallrule="tcpport9094" - -# Kafka credentials for the Rubin stream -KAFKA_USERNAME="pittgoogle-idfint" # set to correct username -KAFKA_PASSWORD="" # set to correct password - -PUBSUB_TOPIC="rubin-alerts" -PUBSUB_SUBSCRIPTION="${PUBSUB_TOPIC}" -KAFKA_TOPIC="alerts-simulated" -``` - -Setup resources on Google Cloud Platform. - -```bash -# Create a firewall rule to open port 9094 (only needs to be done once, per project) -gcloud compute firewall-rules create "${firewallrule}" \ - --allow=tcp:9094 \ - --description="Allow incoming traffic on TCP port 9094" \ - --direction=INGRESS \ - --enable-logging - -# Create a Cloud Storage bucket to store the consumer config files -gsutil mb "gs://${broker_bucket}" - -# Upload the install script and config files for the consumer -o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs -gsutil -m -o "$o" cp -r broker/consumer "gs://${broker_bucket}" - -# Create a Pub/Sub topic and subscription for Rubin alerts -gcloud pubsub topics create "${PUBSUB_TOPIC}" -gcloud pubsub subscriptions create "${PUBSUB_SUBSCRIPTION}" --topic="${PUBSUB_TOPIC}" - -# Create a Rubin Consumer VM +testid="mytest" +consumerVM="${survey}-consumer-${testid}" zone="us-central1-a" -machinetype="e2-standard-2" -installscript="gs://${broker_bucket}/consumer/vm_install.sh" -gcloud compute instances create "${consumerVM}" \ - --zone="${zone}" \ - --machine-type="${machinetype}" \ - --scopes=cloud-platform \ - --metadata=google-logging-enabled=true,startup-script-url="${installscript}" \ - --tags="${firewallrule}" -``` - -## Ingest the Rubin test stream - -### Setup Consumer VM - -```bash -# start the consumer vm and ssh in -gcloud compute instances start "${consumerVM}" -gcloud compute ssh "${consumerVM}" - -# define some variables -brokerdir=/home/broker # user's home dir on this machine -workingdir="${brokerdir}/consumer/rubin" # consumer's working dir on this machine - -# We will also need the variables defined at the top of this document. -# Go back up to the "Setup" section and define the variables given -# in the code block under "Define variables...", in your environment. -``` - -### Test the connection - -#### Check available Kafka topics - -```bash -/bin/kafka-topics \ - --bootstrap-server alert-stream-int.lsst.cloud:9094 \ - --list \ - --command-config "${workingdir}/admin.properties" -# should see output that includes the topic: alerts-simulated -``` - -#### Test the topic connection using the Kafka Console Consumer - -Set Java env variable - -```bash -export JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64" -``` - -Make a file called 'consumer.properties' and fill it with this -(change `KAFKA_PASSWORD` to the appropriate value): - -```bash -security.protocol=SASL_SSL -sasl.mechanism=SCRAM-SHA-512 - -sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ - username="pittgoogle-idfint"\ - password="KAFKA_PASSWORD"; -``` - -Run the Kafka console consumer -```bash -sudo /bin/kafka-avro-console-consumer \ - --bootstrap-server alert-stream-int.lsst.cloud:9094 \ - --group "${KAFKA_USERNAME}-example-javaconsole" \ - --topic "${KAFKA_TOPIC}" \ - --property schema.registry.url=https://alert-schemas-int.lsst.cloud \ - --consumer.config consumer.properties \ - --timeout-ms=60000 -# if successful, you will see a lot of JSON flood the terminal -``` - -### Run the Kafka -> Pub/Sub connector - -Setup: - -```bash -# download the config files from broker_bucket -sudo mkdir "${brokerdir}" -sudo gsutil -m cp -r "gs://${broker_bucket}/consumer" "${brokerdir}" - -# set the password in two of the config files -sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/admin.properties" -sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/psconnect-worker.properties" - -# replace topic and project configs in ps-connector.properties -fconfig="${workingdir}/ps-connector.properties" -sudo sed -i "s/PROJECT_ID/${PROJECT_ID}/g" ${fconfig} -sudo sed -i "s/PUBSUB_TOPIC/${PUBSUB_TOPIC}/g" ${fconfig} -sudo sed -i "s/KAFKA_TOPIC/${KAFKA_TOPIC}/g" ${fconfig} -``` - -Run the connector: - -```bash -mydir="/home/troyraen" # use my dir because don't have permission to write to workingdir -fout_run="${mydir}/run-connector.out" -sudo /bin/connect-standalone \ - ${workingdir}/psconnect-worker.properties \ - ${workingdir}/ps-connector.properties \ - &> ${fout_run} -``` - -## Pull a Pub/Sub message and open it - -In the future, we should download schemas from the Confluent Schema Registry and store them. -Then for each alert, check the schema version in the Confluent Wire header, and load the schema file using `fastavro`. -See [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below. - -For now, use the schema in the `lsst-alert-packet` library. Install the library: - -```bash -pip install lsst-alert-packet -``` - -Following the deserialization example at - - -```python -import io -import fastavro -from google.cloud import pubsub_v1 -from lsst.alert.packet import Schema - -# pull a message -project_id = "avid-heading-329016" -subscription_name = "rubin-alerts" -max_messages = 5 - -subscriber = pubsub_v1.SubscriberClient() -subscription_path = subscriber.subscription_path(project_id, subscription_name) -request = { - "subscription": subscription_path, - "max_messages": max_messages, -} - -response = subscriber.pull(**request) - -# load the schema -latest_schema = Schema.from_file().definition +# Set the VM metadata +KAFKA_TOPIC="alerts-simulated" +PS_TOPIC="${survey}-alerts-${testid}" +gcloud compute instances add-metadata "${consumerVM}" --zone "${zone}" \ + --metadata="PS_TOPIC_FORCE=${PS_TOPIC},KAFKA_TOPIC_FORCE=${KAFKA_TOPIC}" -# deserialize the alerts. -# This follows the deserialization example at -# https://github.com/lsst-dm/alert_stream/blob/main/python/lsst/alert/stream/serialization.py -for received_message in response.received_messages: - alert_bytes = received_message.message.data - # header_bytes = alert_bytes[:5] - # schema_version = deserialize_confluent_wire_header(header_bytes) - content_bytes = io.BytesIO(alert_bytes[5:]) - alert_dict = fastavro.schemaless_reader(content_bytes, latest_schema) - alertId = alert_dict['alertId'] - diaSourceId = alert_dict['diaSource']['diaSourceId'] - psFlux = alert_dict['diaSource']['psFlux'] - print(f"alertId: {alertId}, diaSourceId: {diaSourceId}, psFlux: {psFlux}") +# Start the VM +gcloud compute instances start ${consumerVM} --zone ${zone} +# this launches the startup script which configures and starts the +# Kafka -> Pub/Sub connector ``` -## Alternative methods for handling the schema - -### Download with a `GET` request, and read the alert's schema version from the Confluent Wire header - -In the future, we should download schemas from the Confluent Schema Registry and store them -(assuming we do not use the schema registry directly in the Kafka connector). -Then for each alert, check the schema version in the Confluent Wire header, and load the schema -file using `fastavro`. - -Recommendation from Spencer Nelson: - -> You might want to look at how Rubin's alert database ingester works. It does the same steps of -> deserializing alert packets, but uses the schema registry instead of lsst.alert.packet: -> -> -> - -Pub/Sub topics can be configured with an Avro schema attached, but it cannot be changed once attached. -We would have to create a new topic for every schema version. -Therefore, I don't think we should do it this way. - -#### Download a schema from the Confluent Schema Registry using a `GET` request +To stop stop the consumer VM: ```bash -SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=$KAFKA_USERNAME:$KAFKA_PASSWORD -SCHEMA_REGISTRY_URL="https://alert-schemas-int.lsst.cloud" -schema_version=1 -fout_rubinschema="rubinschema_v${schema_version}.avsc" - -# get list of schema subjects -curl --silent -X GET -u "${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" "${SCHEMA_REGISTRY_URL}/subjects" -# download a particular schema -curl --silent -X GET -u \ - "${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" \ - "${SCHEMA_REGISTRY_URL}/schemas/ids/${schema_version}" \ - > "${fout_rubinschema}" -``` - -#### Read the alert's schema version from the Confluent Wire header - -The following is copied from - - -```python -import struct - -_ConfluentWireFormatHeader = struct.Struct(">bi") - -def deserialize_confluent_wire_header(raw): - """Parses the byte prefix for Confluent Wire Format-style Kafka messages. - Parameters - ---------- - raw : `bytes` - The 5-byte encoded message prefix. - Returns - ------- - schema_version : `int` - A version number which indicates the Confluent Schema Registry ID - number of the Avro schema used to encode the message that follows this - header. - """ - _, version = _ConfluentWireFormatHeader.unpack(raw) - return version - -header_bytes = alert_bytes[:5] -schema_version = deserialize_confluent_wire_header(header_bytes) -``` - -### Use the Confluent Schema Registry with the Kafka Connector - -Kafka Connect can use the Confluent Schema Registry directly. -But schemas are stored under subjects and Kafka Connect is picky about how those -subjects are named. -See - -**Rubin has set the schema subject name to “alert-packet”**, which does not conform -to any of the name strategies that Kafka Connect uses. -I did not find a workaround for this issue. -Instead, I passed the alert bytes straight through into Pub/Sub and deserialized -them after pulling the messages from Pub/Sub. - -If you want to try this in the future, set the following configs in the connector's psconnect-worker.properties file. +survey="rubin" +testid="mytest" +consumerVM="${survey}-consumer-${testid}" +zone="us-central1-a" -```bash -value.converter=io.confluent.connect.avro.AvroConverter -value.converter.schema.registry.url=https://alert-schemas-int.lsst.cloud -value.converter.enhanced.avro.schema.support=true +# Stop the VM +gcloud compute instances stop ${consumerVM} --zone ${zone} ``` diff --git a/broker/consumer/rubin/admin.properties b/broker/consumer/rubin/admin.properties index 1edb544b..7487803e 100644 --- a/broker/consumer/rubin/admin.properties +++ b/broker/consumer/rubin/admin.properties @@ -1,9 +1,8 @@ # see https://kafka.apache.org/documentation/#adminclientconfigs -bootstrap.servers=alert-stream-int.lsst.cloud:9094 +bootstrap.servers=usdf-alert-stream-dev.lsst.cloud:9094 sasl.mechanism=SCRAM-SHA-512 -sasl.kerberos.service.name=kafka -security.protocol=SASL_SSL +security.protocol=SASL_PLAINTEXT sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="pittgoogle-idfint"\ password="KAFKA_PASSWORD"; diff --git a/broker/consumer/rubin/first_rubinstreamingtest_dec2021.md b/broker/consumer/rubin/first_rubinstreamingtest_dec2021.md new file mode 100644 index 00000000..a9daae0b --- /dev/null +++ b/broker/consumer/rubin/first_rubinstreamingtest_dec2021.md @@ -0,0 +1,344 @@ +# Connect Pitt-Google to the Rubin alert stream testing deployment + +December 2021 - Author: Troy Raen + +- [Overview](#overview) +- [Setup](#setup) +- [Ingest the Rubin test stream](#ingest-the-rubin-test-stream) +- [Pull a Pub/Sub message and open it](#pull-a-pubsub-message-and-open-it) +- [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) + +## Overview + +Details and access credentials were sent to us by Eric Bellm via email. +Spencer Nelson provided some additional details specific to our Kafka Connect consumer. +Here are some links they gave us for reference which were used to set this up: + +- [Rubin sample alerts: obtaining the data with Kafka](https://github.com/lsst-dm/sample_alert_info#obtaining-the-data-with-kafka) +- [Rubin Alert Stream Integration Endpoint](https://github.com/lsst-dm/sample_alert_info/blob/main/doc/alert_stream_integration_endpoint.md) +- Schemas are stored at: +- [Using schema registry with Kafka Connect](https://docs.confluent.io/platform/7.0.1/schema-registry/connect.html). + Spencer says, "Our stream uses Avro for the message values, not keys (we + don't set the key to anything in particular), so you probably want the + `value.converter` properties." +- Tools and libraries for VOEvents: + +- [Rubin example: java console consumer](https://github.com/lsst-dm/sample_alert_info/tree/main/examples/alert_stream_integration_endpoint/java_console_consumer) + +Rubin alert packets will be Avro serialized, but the schema will not be included with the packet. +There are several ways to handle this. +For now, I have simply passed the alert bytes straight through from Kafka to Pub/Sub and deserialized +alerts after pulling from the Pub/Sub stream. +For other methods, see +[Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below. + +Below is the code I used to set up the necessary resources in GCP, ingest the Rubin stream, pull +messages from the resulting Pub/Sub stream and deserialize the alerts. + +## Setup + +The following assumes you have set the environment variables +`GOOGLE_CLOUD_PROJECT` and `GOOGLE_APPLICATION_CREDENTIALS` +to appropriate values for your GCP project and service account credentials, and that +the service account is authenticated to make `gcloud` calls through the project. +You may want to +[activate a service account for `gcloud` calls](https://pitt-broker.readthedocs.io/en/u-tjr-workingnotes/working-notes/troyraen/service-account.html#switch-the-service-account-your-api-calls-use) +or +[set up a GCP project from scratch](https://pitt-broker.readthedocs.io/en/latest/broker/run-a-broker-instance/initial-setup.html#setup-local-environment). + +Clone the repo and cd into the directory: + +```bash +git clone https://github.com/mwvgroup/Pitt-Google-Broker.git +cd Pitt-Google-Broker +``` + +Define variables used below in multiple calls. +The `KAFKA_USERNAME` and `KAFKA_PASSWORD` must be customized + +```bash +PROJECT_ID="${GOOGLE_CLOUD_PROJECT}" +# For reference, I ran this with: +# PROJECT_ID="avid-heading-329016" # project name: pitt-google-broker-testing +survey="rubin" +broker_bucket="${PROJECT_ID}-${survey}-broker_files" +consumerVM="${survey}-consumer" +firewallrule="tcpport9094" + +# Kafka credentials for the Rubin stream +KAFKA_USERNAME="pittgoogle-idfint" # set to correct username +KAFKA_PASSWORD="" # set to correct password + +PUBSUB_TOPIC="rubin-alerts" +PUBSUB_SUBSCRIPTION="${PUBSUB_TOPIC}" +KAFKA_TOPIC="alerts-simulated" +``` + +Setup resources on Google Cloud Platform. + +```bash +# Create a firewall rule to open port 9094 (only needs to be done once, per project) +gcloud compute firewall-rules create "${firewallrule}" \ + --allow=tcp:9094 \ + --description="Allow incoming traffic on TCP port 9094" \ + --direction=INGRESS \ + --enable-logging + +# Create a Cloud Storage bucket to store the consumer config files +gsutil mb "gs://${broker_bucket}" + +# Upload the install script and config files for the consumer +o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs +gsutil -m -o "$o" cp -r broker/consumer "gs://${broker_bucket}" + +# Create a Pub/Sub topic and subscription for Rubin alerts +gcloud pubsub topics create "${PUBSUB_TOPIC}" +gcloud pubsub subscriptions create "${PUBSUB_SUBSCRIPTION}" --topic="${PUBSUB_TOPIC}" + +# Create a Rubin Consumer VM +zone="us-central1-a" +machinetype="e2-standard-2" +installscript="gs://${broker_bucket}/consumer/vm_install.sh" +gcloud compute instances create "${consumerVM}" \ + --zone="${zone}" \ + --machine-type="${machinetype}" \ + --scopes=cloud-platform \ + --metadata=google-logging-enabled=true,startup-script-url="${installscript}" \ + --tags="${firewallrule}" +``` + +## Ingest the Rubin test stream + +### Setup Consumer VM + +```bash +# start the consumer vm and ssh in +gcloud compute instances start "${consumerVM}" +gcloud compute ssh "${consumerVM}" + +# define some variables +brokerdir=/home/broker # user's home dir on this machine +workingdir="${brokerdir}/consumer/rubin" # consumer's working dir on this machine + +# We will also need the variables defined at the top of this document. +# Go back up to the "Setup" section and define the variables given +# in the code block under "Define variables...", in your environment. +``` + +### Test the connection + +#### Check available Kafka topics + +```bash +/bin/kafka-topics \ + --bootstrap-server alert-stream-int.lsst.cloud:9094 \ + --list \ + --command-config "${workingdir}/admin.properties" +# should see output that includes the topic: alerts-simulated +``` + +#### Test the topic connection using the Kafka Console Consumer + +Set Java env variable + +```bash +export JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64" +``` + +Make a file called 'consumer.properties' and fill it with this +(change `KAFKA_PASSWORD` to the appropriate value): + +```bash +security.protocol=SASL_SSL +sasl.mechanism=SCRAM-SHA-512 + +sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ + username="pittgoogle-idfint"\ + password="KAFKA_PASSWORD"; +``` + +Run the Kafka console consumer + +```bash +sudo /bin/kafka-avro-console-consumer \ + --bootstrap-server alert-stream-int.lsst.cloud:9094 \ + --group "${KAFKA_USERNAME}-example-javaconsole" \ + --topic "${KAFKA_TOPIC}" \ + --property schema.registry.url=https://alert-schemas-int.lsst.cloud \ + --consumer.config consumer.properties \ + --timeout-ms=60000 +# if successful, you will see a lot of JSON flood the terminal +``` + +### Run the Kafka -> Pub/Sub connector + +Setup: + +```bash +# download the config files from broker_bucket +sudo mkdir "${brokerdir}" +sudo gsutil -m cp -r "gs://${broker_bucket}/consumer" "${brokerdir}" + +# set the password in two of the config files +sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/admin.properties" +sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/psconnect-worker.properties" + +# replace topic and project configs in ps-connector.properties +fconfig="${workingdir}/ps-connector.properties" +sudo sed -i "s/PROJECT_ID/${PROJECT_ID}/g" ${fconfig} +sudo sed -i "s/PUBSUB_TOPIC/${PUBSUB_TOPIC}/g" ${fconfig} +sudo sed -i "s/KAFKA_TOPIC/${KAFKA_TOPIC}/g" ${fconfig} +``` + +Run the connector: + +```bash +mydir="/home/troyraen" # use my dir because don't have permission to write to workingdir +fout_run="${mydir}/run-connector.out" +sudo /bin/connect-standalone \ + ${workingdir}/psconnect-worker.properties \ + ${workingdir}/ps-connector.properties \ + &> ${fout_run} +``` + +## Pull a Pub/Sub message and open it + +In the future, we should download schemas from the Confluent Schema Registry and store them. +Then for each alert, check the schema version in the Confluent Wire header, and load the schema file using `fastavro`. +See [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below. + +For now, use the schema in the `lsst-alert-packet` library. Install the library: + +```bash +pip install lsst-alert-packet +``` + +Following the deserialization example at + + +```python +import io +import fastavro +from google.cloud import pubsub_v1 +from lsst.alert.packet import Schema + +# pull a message +project_id = "avid-heading-329016" +subscription_name = "rubin-alerts" +max_messages = 5 + +subscriber = pubsub_v1.SubscriberClient() +subscription_path = subscriber.subscription_path(project_id, subscription_name) +request = { + "subscription": subscription_path, + "max_messages": max_messages, +} + +response = subscriber.pull(**request) + +# load the schema +latest_schema = Schema.from_file().definition + +# deserialize the alerts. +# This follows the deserialization example at +# https://github.com/lsst-dm/alert_stream/blob/main/python/lsst/alert/stream/serialization.py +for received_message in response.received_messages: + alert_bytes = received_message.message.data + # header_bytes = alert_bytes[:5] + # schema_version = deserialize_confluent_wire_header(header_bytes) + content_bytes = io.BytesIO(alert_bytes[5:]) + alert_dict = fastavro.schemaless_reader(content_bytes, latest_schema) + alertId = alert_dict['alertId'] + diaSourceId = alert_dict['diaSource']['diaSourceId'] + psFlux = alert_dict['diaSource']['psFlux'] + print(f"alertId: {alertId}, diaSourceId: {diaSourceId}, psFlux: {psFlux}") +``` + +## Alternative methods for handling the schema + +### Download with a `GET` request, and read the alert's schema version from the Confluent Wire header + +In the future, we should download schemas from the Confluent Schema Registry and store them +(assuming we do not use the schema registry directly in the Kafka connector). +Then for each alert, check the schema version in the Confluent Wire header, and load the schema +file using `fastavro`. + +Recommendation from Spencer Nelson: + +> You might want to look at how Rubin's alert database ingester works. It does the same steps of +> deserializing alert packets, but uses the schema registry instead of lsst.alert.packet: +> +> +> + +Pub/Sub topics can be configured with an Avro schema attached, but it cannot be changed once attached. +We would have to create a new topic for every schema version. +Therefore, I don't think we should do it this way. + +#### Download a schema from the Confluent Schema Registry using a `GET` request + +```bash +SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=$KAFKA_USERNAME:$KAFKA_PASSWORD +SCHEMA_REGISTRY_URL="https://alert-schemas-int.lsst.cloud" +schema_version=1 +fout_rubinschema="rubinschema_v${schema_version}.avsc" + +# get list of schema subjects +curl --silent -X GET -u "${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" "${SCHEMA_REGISTRY_URL}/subjects" +# download a particular schema +curl --silent -X GET -u \ + "${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" \ + "${SCHEMA_REGISTRY_URL}/schemas/ids/${schema_version}" \ + > "${fout_rubinschema}" +``` + +#### Read the alert's schema version from the Confluent Wire header + +The following is copied from + + +```python +import struct + +_ConfluentWireFormatHeader = struct.Struct(">bi") + +def deserialize_confluent_wire_header(raw): + """Parses the byte prefix for Confluent Wire Format-style Kafka messages. + Parameters + ---------- + raw : `bytes` + The 5-byte encoded message prefix. + Returns + ------- + schema_version : `int` + A version number which indicates the Confluent Schema Registry ID + number of the Avro schema used to encode the message that follows this + header. + """ + _, version = _ConfluentWireFormatHeader.unpack(raw) + return version + +header_bytes = alert_bytes[:5] +schema_version = deserialize_confluent_wire_header(header_bytes) +``` + +### Use the Confluent Schema Registry with the Kafka Connector + +Kafka Connect can use the Confluent Schema Registry directly. +But schemas are stored under subjects and Kafka Connect is picky about how those +subjects are named. +See + +**Rubin has set the schema subject name to “alert-packet”**, which does not conform +to any of the name strategies that Kafka Connect uses. +I did not find a workaround for this issue. +Instead, I passed the alert bytes straight through into Pub/Sub and deserialized +them after pulling the messages from Pub/Sub. + +If you want to try this in the future, set the following configs in the connector's psconnect-worker.properties file. + +```bash +value.converter=io.confluent.connect.avro.AvroConverter +value.converter.schema.registry.url=https://alert-schemas-int.lsst.cloud +value.converter.enhanced.avro.schema.support=true +``` diff --git a/broker/consumer/rubin/psconnect-worker.properties b/broker/consumer/rubin/psconnect-worker.properties index c169df0e..6243384d 100644 --- a/broker/consumer/rubin/psconnect-worker.properties +++ b/broker/consumer/rubin/psconnect-worker.properties @@ -1,7 +1,7 @@ # Kafka connect worker configuration # See: https://docs.confluent.io/platform/current/connect/references/allconfigs.html -bootstrap.servers=alert-stream-int.lsst.cloud:9094 +bootstrap.servers=usdf-alert-stream-dev.lsst.cloud:9094 plugin.path=/usr/local/share/kafka/plugins offset.storage.file.filename=/tmp/connect.offsets @@ -27,7 +27,7 @@ value.converter=org.apache.kafka.connect.converters.ByteArrayConverter # workers need to use SASL sasl.mechanism=SCRAM-SHA-512 sasl.kerberos.service.name=kafka -security.protocol=SASL_SSL +security.protocol=SASL_PLAINTEXT sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="pittgoogle-idfint"\ password="KAFKA_PASSWORD"; @@ -38,7 +38,7 @@ consumer.group.id=pittgoogle-idfint-kafka-pubsub-connector consumer.auto.offset.reset=earliest consumer.sasl.mechanism=SCRAM-SHA-512 consumer.sasl.kerberos.service.name=kafka -consumer.security.protocol=SASL_SSL +consumer.security.protocol=SASL_PLAINTEXT consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="pittgoogle-idfint"\ password="KAFKA_PASSWORD"; diff --git a/broker/consumer/rubin/vm_install.sh b/broker/consumer/rubin/vm_install.sh new file mode 100644 index 00000000..0c51bb04 --- /dev/null +++ b/broker/consumer/rubin/vm_install.sh @@ -0,0 +1,75 @@ +#! /bin/bash +# Installs the software required to run the Kafka Consumer. +# Assumes a Debian 10 OS. + +#--- Get metadata attributes +baseurl="http://metadata.google.internal/computeMetadata/v1" +H="Metadata-Flavor: Google" +PROJECT_ID=$(curl "${baseurl}/project/project-id" -H "${H}") +consumerVM=$(curl "${baseurl}/instance/name" -H "${H}") +zone=$(curl "${baseurl}/instance/zone" -H "${H}") + +# parse the survey name and testid from the VM name +survey=$(echo "$consumerVM" | awk -F "-" '{print $1}') +if [ "$consumerVM" = "${survey}-consumer" ]; then + testid="False" +else + testid=$(echo "$consumerVM" | awk -F "-" '{print $NF}') +fi + +#--- GCP resources used in this script +broker_bucket="${PROJECT_ID}-${survey}-broker_files" +# use test resources, if requested +if [ "$testid" != "False" ]; then + broker_bucket="${broker_bucket}-${testid}" +fi + +#--- Install general utils +apt-get update +apt-get install -y wget screen software-properties-common snapd +# software-properties-common installs add-apt-repository +# install yq (requires snap) +snap install core +snap install yq + +#--- Install Java and the dev kit +# see https://www.digitalocean.com/community/tutorials/how-to-install-java-with-apt-on-debian-10 +apt update +echo "Installing Java..." +apt install -y default-jre +apt install -y default-jdk +echo 'JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64/bin/java"' >> /etc/environment +# shellcheck source=/dev/null +source /etc/environment +echo "$JAVA_HOME" +echo "Done installing Java." +apt update + +#--- Install Confluent Platform (includes Kafka) +# see https://docs.confluent.io/platform/current/installation/installing_cp/deb-ubuntu.html +echo "Installing Confluent Platform..." +# install the key used to sign packages +wget -qO - https://packages.confluent.io/deb/6.0/archive.key | sudo apt-key add - +# add the repository +add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/6.0 stable main" +# install +apt-get update && sudo apt-get install -y confluent-platform +echo "Done installing Confluent Platform." + +#--- Install Kafka -> Pub/Sub connector +# see https://github.com/GoogleCloudPlatform/pubsub/tree/master/kafka-connector +echo "Installing the Kafka -> Pub/Sub connector" +plugindir=/usr/local/share/kafka/plugins +CONNECTOR_RELEASE=v0.5-alpha +mkdir -p ${plugindir} +#- install the connector +cd ${plugindir} || exit +wget https://github.com/GoogleCloudPlatform/pubsub/releases/download/${CONNECTOR_RELEASE}/pubsub-kafka-connector.jar +echo "Done installing the Kafka -> Pub/Sub connector" + +#--- Set the startup script and shutdown +startupscript="gs://${broker_bucket}/consumer/${survey}/vm_startup.sh" +gcloud compute instances add-metadata "$consumerVM" --zone "$zone" \ + --metadata startup-script-url="$startupscript" +echo "vm_install.sh is complete. Shutting down." +shutdown -h now \ No newline at end of file diff --git a/broker/consumer/rubin/vm_shutdown.sh b/broker/consumer/rubin/vm_shutdown.sh new file mode 100644 index 00000000..f0aabf91 --- /dev/null +++ b/broker/consumer/rubin/vm_shutdown.sh @@ -0,0 +1,11 @@ +#! /bin/bash + +# Get VM name +baseurl="http://metadata.google.internal/computeMetadata/v1" +H="Metadata-Flavor: Google" +vm_name="$(curl "${baseurl}/instance/name" -H "${H}")" +zone=$(curl "${baseurl}/instance/zone" -H "${H}") + +# Unset FORCE topics in metadata so there's no unexpected behvaior on next startup +topics="KAFKA_TOPIC_FORCE=,PS_TOPIC_FORCE=" +gcloud compute instances add-metadata "${vm_name}" --zone="${zone}" --metadata="${topics}" diff --git a/broker/consumer/rubin/vm_startup.sh b/broker/consumer/rubin/vm_startup.sh new file mode 100644 index 00000000..ae86d44c --- /dev/null +++ b/broker/consumer/rubin/vm_startup.sh @@ -0,0 +1,98 @@ +#! /bin/bash +# Configure and Start the Kafka -> Pub/Sub connector + +brokerdir=/home/broker + +#--- Get project and instance metadata +# for info on working with metadata, see here +# https://cloud.google.com/compute/docs/storing-retrieving-metadata +baseurl="http://metadata.google.internal/computeMetadata/v1" +H="Metadata-Flavor: Google" +PROJECT_ID=$(curl "${baseurl}/project/project-id" -H "${H}") +zone=$(curl "${baseurl}/instance/zone" -H "${H}") +PS_TOPIC_FORCE=$(curl "${baseurl}/instance/attributes/PS_TOPIC_FORCE" -H "${H}") +KAFKA_TOPIC_FORCE=$(curl "${baseurl}/instance/attributes/KAFKA_TOPIC_FORCE" -H "${H}") +# parse the survey name and testid from the VM name +consumerVM=$(curl "${baseurl}/instance/name" -H "${H}") +survey=$(echo "$consumerVM" | awk -F "-" '{print $1}') +if [ "$consumerVM" = "${survey}-consumer" ]; then + testid="False" +else + testid=$(echo "$consumerVM" | awk -F "-" '{print $NF}') +fi + +#--- GCP resources used in this script +broker_bucket="${PROJECT_ID}-${survey}-broker_files" +PS_TOPIC_DEFAULT="${survey}-alerts" +# use test resources, if requested +if [ "$testid" != "False" ]; then + broker_bucket="${broker_bucket}-${testid}" + PS_TOPIC_DEFAULT="${PS_TOPIC_DEFAULT}-${testid}" +fi + +#--- Download config files from GCS +# remove all files +rm -r "${brokerdir}" +# download fresh files +mkdir "${brokerdir}" +cd ${brokerdir} || exit +gsutil -m cp -r "gs://${broker_bucket}/consumer" . +# wait. otherwise the script may continue before all files are downloaded, with adverse behavior. +sleep 30s + +#--- Set the topic names to the "FORCE" metadata attributes if exist, else defaults +KAFKA_TOPIC_DEFAULT="alerts-simulated" +KAFKA_TOPIC="${KAFKA_TOPIC_FORCE:-${KAFKA_TOPIC_DEFAULT}}" +PS_TOPIC="${PS_TOPIC_FORCE:-${PS_TOPIC_DEFAULT}}" +# set VM metadata, just for clarity and easy viewing +gcloud compute instances add-metadata "$consumerVM" --zone "$zone" \ + --metadata="PS_TOPIC=${PS_TOPIC},KAFKA_TOPIC=${KAFKA_TOPIC}" + +#--- Files this script will write +workingdir="${brokerdir}/consumer/${survey}" +fout_run="${workingdir}/run-connector.out" +fout_topics="${workingdir}/list.topics" + +#--- Set the connector's configs (Kafka password, project, and topic) +# define Rubin-related parameters +kafka_password="${survey}-${PROJECT_ID}-kafka-password" +KAFKA_PASSWORD=$(gcloud secrets versions access latest --secret="${kafka_password}") + +cd "${workingdir}" || exit + +fconfig=admin.properties +sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" ${fconfig} + +fconfig=psconnect-worker.properties +sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" ${fconfig} + +fconfig=ps-connector.properties +sed -i "s/PROJECT_ID/${PROJECT_ID}/g" ${fconfig} +sed -i "s/PS_TOPIC/${PS_TOPIC}/g" ${fconfig} +sed -i "s/KAFKA_TOPIC/${KAFKA_TOPIC}/g" ${fconfig} + +#--- Check until alerts start streaming into the topic +alerts_flowing=false +while [ "${alerts_flowing}" = false ] +do + # get list of topics and dump to file + /bin/kafka-topics \ + --bootstrap-server usdf-alert-stream-dev.lsst.cloud:9094 \ + --list \ + --command-config "${workingdir}/admin.properties" \ + > "${fout_topics}" + + # check if our topic is in the list + if grep -Fq "${KAFKA_TOPIC}" "${fout_topics}" + then + alerts_flowing=true # start consuming + else + sleep 60s # sleep 1 min, then try again + fi +done + +#--- Start the Kafka -> Pub/Sub connector, save stdout and stderr to file +/bin/connect-standalone \ + "${workingdir}/psconnect-worker.properties" \ + "${workingdir}/ps-connector.properties" \ + &>> "${fout_run}" diff --git a/broker/setup_broker/rubin/README.md b/broker/setup_broker/rubin/README.md new file mode 100644 index 00000000..454798da --- /dev/null +++ b/broker/setup_broker/rubin/README.md @@ -0,0 +1,130 @@ +# Connect Pitt-Google to the Rubin-Broker integration test stream + +May 2024 - Author: Christopher Hernández + +- [Overview](#overview) +- [Setup](#setup) +- [Ingest the Rubin test stream](#ingest-the-rubin-test-stream) +- [Delete broker instance](#delete-broker-instance) + +## Overview + +During the first week of May 2024, a broker integration exercise was held and focused on the topic of connectivity. +This document outlines my procedure in connecting to the test stream. + +For this exercise, the same credentials as the IDF integration exercise were used. Credential information was emailed +to me by Troy Raen, and the credential value (e.g., `kafka_password`) was stored as a +[secret](https://cloud.google.com/secret-manager/docs/overview#secret) using Google Cloud's +[Secret Manager](https://cloud.google.com/secret-manager/docs/overview). + +Details on the number of alerts in this exercise from a conversation with Brianna Smart: +"The DC2 finished producing alerts, putting 303,288 new alerts into the stream, for a total of 376,800, the first +76,800 of which are a small subset duplicated 3 times." + +## Setup + +This section assumes that you have: + +- Set the environment variables `GOOGLE_CLOUD_PROJECT` and `GOOGLE_APPLICATION_CREDENTIALS` to appropriate values for +your GCP project and service account credentials +- Authenticated the service account to make `gcloud` calls through the project +- Enabled the [Secret Manager API](https://cloud.google.com/secret-manager/docs/configuring-secret-manager#enable_api) +in your Google Cloud Project +- Granted the default compute service account the role of `Secret Manager Secret Accessor` in the +[IAM & Admin page](https://console.cloud.google.com/iam-admin) + +You may want to +[activate a service account for `gcloud` calls](https://pitt-broker.readthedocs.io/en/u-tjr-workingnotes/working-notes/troyraen/service-account.html#switch-the-service-account-your-api-calls-use) +or +[set up a GCP project from scratch](https://pitt-broker.readthedocs.io/en/latest/broker/run-a-broker-instance/initial-setup.html#setup-local-environment). + +Create a secret for your access credential: + +```bash +# define parameters +survey="rubin" +PROJECT_ID=$GOOGLE_CLOUD_PROJECT + +# define secret names +kafka_password="${survey}-${PROJECT_ID}-kafka-password" + +# create secret +gcloud secrets create "${kafka_password}" \ + --replication-policy="automatic" +``` + +Select one of the following options to add a secret version. Note that adding a version directly on the command line is +discouraged by Google Cloud, see +[add a secret version documentation](https://cloud.google.com/secret-manager/docs/add-secret-version#add-secret-version) +for details. + +```bash +# add a secret version from the contents of a file on disk +gcloud secrets versions add "${client_id}" --data-file="/path/to/file.txt" +gcloud secrets versions add "${client_secret}" --data-file="/path/to/file.txt" + +# add a secret version directly on the command line +echo -n "enter the client id provided by GCN" | \ + gcloud secrets versions add "${client_id}" --data-file=- +echo -n "enter the client secret provided by GCN" | \ + gcloud secrets versions add "${client_secret}" --data-file=- +``` + +Clone the repo and cd into the directory: + +```bash +git clone https://github.com/mwvgroup/Pitt-Google-Broker.git +cd Pitt-Google-Broker/broker/setup_broker/rubin +``` + +Define the variables used below. + +```bash +testid="enter testid value" +teardown="False" +survey="rubin" +region="us-central1" +``` + +Execute the `setup_broker.sh` script: + +```bash +./setup_broker.sh "${testid}" "${teardown}" "${survey}" "${region}" +``` + +This will create all of the necessary GCP resources. Allow the consumer VM to finish its installation process. Once +complete, the VM will shut down automatically. You can check the status of the VM in the +[Google Cloud Console](https://console.cloud.google.com/compute). This entire process should take less than 10 minutes. + +## Ingest the Rubin test stream + +### Setup Consumer VM + +```bash +zone="${region}-a" +consumerVM="${survey}-consumer-${testid}" + +# Set the VM metadata +KAFKA_TOPIC="alerts-simulated" +PS_TOPIC="${survey}-alerts-${testid}" +gcloud compute instances add-metadata "${consumerVM}" --zone "${zone}" \ + --metadata="PS_TOPIC_FORCE=${PS_TOPIC},KAFKA_TOPIC_FORCE=${KAFKA_TOPIC}" + +# Start the VM +gcloud compute instances start "${consumerVM}" --zone ${zone} +# this launches the startup script which configures and starts the +# Kafka -> Pub/Sub connector +``` + +## Delete broker instance + +Initialize parameters and call the deployment script: + +```bash +testid="mytest" +teardown="True" +survey="rubin" +region="us-central1" + +./setup_broker.sh "${testid}" "${teardown}" "${survey}" "${region}" +``` diff --git a/broker/setup_broker/rubin/create_vm.sh b/broker/setup_broker/rubin/create_vm.sh new file mode 100755 index 00000000..8b7ad6e8 --- /dev/null +++ b/broker/setup_broker/rubin/create_vm.sh @@ -0,0 +1,46 @@ +#! /bin/bash +# Creates or deletes the GCP VM instances needed by the broker. +# This script will not delete VMs that are in production + + +broker_bucket=$1 # name of GCS bucket where broker files are staged +testid="${2:-test}" +# "False" uses production resources +# any other string will be appended to the names of all resources +teardown="${3:-False}" # "True" tearsdown/deletes resources, else setup +survey="${4:-rubin}" +# name of the survey this broker instance will ingest +zone="${5:-us-central1-a}" +firewallrule="${6:-tcpport9094}" + +#--- GCP resources used in this script +consumerVM="${survey}-consumer" +# use test resources, if requested +if [ "$testid" != "False" ]; then + consumerVM="${consumerVM}-${testid}" +fi + +#--- Teardown resources +if [ "$teardown" = "True" ]; then + # ensure that we do not teardown production resources + if [ "$testid" != "False" ]; then + gcloud compute instances delete "$consumerVM" --zone="$zone" + fi + +#--- Create resources +else +#--- Consumer VM + # create VM + machinetype=e2-standard-2 + # metadata + googlelogging="google-logging-enabled=true" + startupscript="startup-script-url=gs://${broker_bucket}/consumer/${survey}/vm_install.sh" + shutdownscript="shutdown-script-url=gs://${broker_bucket}/consumer/${survey}/vm_shutdown.sh" + gcloud compute instances create "${consumerVM}" \ + --zone="${zone}" \ + --machine-type="${machinetype}" \ + --scopes=cloud-platform \ + --metadata="${googlelogging},${startupscript},${shutdownscript}" \ + --tags="${firewallrule}" + +fi diff --git a/broker/setup_broker/rubin/setup_broker.sh b/broker/setup_broker/rubin/setup_broker.sh new file mode 100755 index 00000000..e3518aad --- /dev/null +++ b/broker/setup_broker/rubin/setup_broker.sh @@ -0,0 +1,91 @@ +#! /bin/bash +# Create and configure GCP resources needed to run the nightly broker. + +testid="${1:-test}" +# "False" uses production resources +# any other string will be appended to the names of all resources +teardown="${2:-False}" +# "True" tearsdown/deletes resources, else setup +survey="${3:-rubin}" +# name of the survey this broker instance will ingest +region="${4:-us-central1}" +zone="${region}-a" # just use zone "a" instead of adding another script arg + +PROJECT_ID=$GOOGLE_CLOUD_PROJECT # get the environment variable + +#--- Make the user confirm the settings +echo +echo "setup_broker.sh will run with the following configs: " +echo +echo "GOOGLE_CLOUD_PROJECT = ${PROJECT_ID}" +echo "survey = ${survey}" +echo "testid = ${testid}" +echo "teardown = ${teardown}" +echo +echo "Continue? [y/(n)]: " + +read -r continue_with_setup +continue_with_setup="${continue_with_setup:-n}" +if [ "$continue_with_setup" != "y" ]; then + echo "Exiting setup." + echo + exit +fi + +#--- GCP resources used directly in this script +broker_bucket="${PROJECT_ID}-${survey}-broker_files" +topic_alerts="${survey}-alerts" +pubsub_subscription="${topic_alerts}" +# use test resources, if requested +# (there must be a better way to do this) +if [ "$testid" != "False" ]; then + broker_bucket="${broker_bucket}-${testid}" + topic_alerts="${topic_alerts}-${testid}" + pubsub_subscription="${pubsub_subscription}-${testid}" +fi + + +#--- Create (or delete) GCS, Pub/Sub resources +if [ "${teardown}" != "True" ]; then + # create broker bucket and upload files + echo "Creating broker_bucket and uploading files..." + gsutil mb -b on -l "${region}" "gs://${broker_bucket}" + ./upload_broker_bucket.sh "${broker_bucket}" + + # create pubsub + echo "Configuring Pub/Sub resources..." + gcloud pubsub topics create "${topic_alerts}" + gcloud pubsub subscriptions create "${pubsub_subscription}" --topic="${topic_alerts}" + + # Set IAM policies on resources + user="allUsers" + roleid="projects/${GOOGLE_CLOUD_PROJECT}/roles/userPublic" + gcloud pubsub topics add-iam-policy-binding "${topic_alerts}" --member="${user}" --role="${roleid}" + +else + # ensure that we do not teardown production resources + if [ "${testid}" != "False" ]; then + o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs + gsutil -m -o "${o}" rm -r "gs://${broker_bucket}" + gcloud pubsub topics delete "${topic_alerts}" + gcloud pubsub subscriptions delete "${pubsub_subscription}" + fi +fi + +if [ "$teardown" != "True" ]; then + #--- Create a firewall rule to open the port used by Kafka/Rubin + # on any instance with the flag --tags=tcpport9094 + echo + echo "Configuring Rubin/Kafka firewall rule..." + firewallrule="tcpport9094" + gcloud compute firewall-rules create "${firewallrule}" \ + --allow=tcp:9094 \ + --description="Allow incoming traffic on TCP port 9094" \ + --direction=INGRESS \ + --enable-logging +fi + +#--- Create VM instances +echo +echo "Configuring VMs..." +./create_vms.sh "${broker_bucket}" "${testid}" "${teardown}" "${survey}" "${zone}" "${firewallrule}" diff --git a/broker/setup_broker/rubin/upload_broker_bucket.sh b/broker/setup_broker/rubin/upload_broker_bucket.sh new file mode 100755 index 00000000..ae85979e --- /dev/null +++ b/broker/setup_broker/rubin/upload_broker_bucket.sh @@ -0,0 +1,8 @@ +#! /bin/bash + +broker_bucket=$1 # name of GCS bucket where broker files should be staged + +echo +echo "Uploading broker files to GCS..." +o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs +gsutil -m -o "${o}" cp -r ../../consumer "gs://${broker_bucket}"