From 6598cb1e114b26d0ffbdad178ee30319edf62f33 Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Wed, 15 May 2024 13:50:16 -0400 Subject: [PATCH 01/15] add files needed to create vm and connect to test stream --- broker/consumer/rubin/admin.properties | 5 +- .../rubin/psconnect-worker.properties | 6 +- broker/consumer/rubin/vm_install.sh | 74 +++++++++++++ broker/consumer/rubin/vm_shutdown.sh | 11 ++ broker/consumer/rubin/vm_startup.sh | 102 ++++++++++++++++++ broker/setup_broker/rubin/create_vms.sh | 46 ++++++++ broker/setup_broker/rubin/setup_broker.sh | 89 +++++++++++++++ .../rubin/upload_broker_bucket.sh | 8 ++ 8 files changed, 335 insertions(+), 6 deletions(-) create mode 100644 broker/consumer/rubin/vm_install.sh create mode 100644 broker/consumer/rubin/vm_shutdown.sh create mode 100644 broker/consumer/rubin/vm_startup.sh create mode 100644 broker/setup_broker/rubin/create_vms.sh create mode 100644 broker/setup_broker/rubin/setup_broker.sh create mode 100755 broker/setup_broker/rubin/upload_broker_bucket.sh diff --git a/broker/consumer/rubin/admin.properties b/broker/consumer/rubin/admin.properties index 1edb544b..7487803e 100644 --- a/broker/consumer/rubin/admin.properties +++ b/broker/consumer/rubin/admin.properties @@ -1,9 +1,8 @@ # see https://kafka.apache.org/documentation/#adminclientconfigs -bootstrap.servers=alert-stream-int.lsst.cloud:9094 +bootstrap.servers=usdf-alert-stream-dev.lsst.cloud:9094 sasl.mechanism=SCRAM-SHA-512 -sasl.kerberos.service.name=kafka -security.protocol=SASL_SSL +security.protocol=SASL_PLAINTEXT sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="pittgoogle-idfint"\ password="KAFKA_PASSWORD"; diff --git a/broker/consumer/rubin/psconnect-worker.properties b/broker/consumer/rubin/psconnect-worker.properties index c169df0e..6243384d 100644 --- a/broker/consumer/rubin/psconnect-worker.properties +++ b/broker/consumer/rubin/psconnect-worker.properties @@ -1,7 +1,7 @@ # Kafka connect worker configuration # See: https://docs.confluent.io/platform/current/connect/references/allconfigs.html -bootstrap.servers=alert-stream-int.lsst.cloud:9094 +bootstrap.servers=usdf-alert-stream-dev.lsst.cloud:9094 plugin.path=/usr/local/share/kafka/plugins offset.storage.file.filename=/tmp/connect.offsets @@ -27,7 +27,7 @@ value.converter=org.apache.kafka.connect.converters.ByteArrayConverter # workers need to use SASL sasl.mechanism=SCRAM-SHA-512 sasl.kerberos.service.name=kafka -security.protocol=SASL_SSL +security.protocol=SASL_PLAINTEXT sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="pittgoogle-idfint"\ password="KAFKA_PASSWORD"; @@ -38,7 +38,7 @@ consumer.group.id=pittgoogle-idfint-kafka-pubsub-connector consumer.auto.offset.reset=earliest consumer.sasl.mechanism=SCRAM-SHA-512 consumer.sasl.kerberos.service.name=kafka -consumer.security.protocol=SASL_SSL +consumer.security.protocol=SASL_PLAINTEXT consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="pittgoogle-idfint"\ password="KAFKA_PASSWORD"; diff --git a/broker/consumer/rubin/vm_install.sh b/broker/consumer/rubin/vm_install.sh new file mode 100644 index 00000000..f147fa40 --- /dev/null +++ b/broker/consumer/rubin/vm_install.sh @@ -0,0 +1,74 @@ +#! /bin/bash +# Installs the software required to run the Kafka Consumer. +# Assumes a Debian 10 OS. + +#--- Get metadata attributes +baseurl="http://metadata.google.internal/computeMetadata/v1" +H="Metadata-Flavor: Google" +PROJECT_ID=$(curl "${baseurl}/project/project-id" -H "${H}") +consumerVM=$(curl "${baseurl}/instance/name" -H "${H}") +zone=$(curl "${baseurl}/instance/zone" -H "${H}") + +# parse the survey name and testid from the VM name +survey=$(echo "$consumerVM" | awk -F "-" '{print $1}') +if [ "$consumerVM" = "${survey}-consumer" ]; then + testid="False" +else + testid=$(echo "$consumerVM" | awk -F "-" '{print $NF}') +fi + +#--- GCP resources used in this script +broker_bucket="${PROJECT_ID}-${survey}-broker_files" +# use test resources, if requested +if [ "$testid" != "False" ]; then + broker_bucket="${broker_bucket}-${testid}" +fi + +#--- Install general utils +apt-get update +apt-get install -y wget screen software-properties-common snapd +# software-properties-common installs add-apt-repository +# install yq (requires snap) +snap install core +snap install yq + +#--- Install Java and the dev kit +# see https://www.digitalocean.com/community/tutorials/how-to-install-java-with-apt-on-debian-10 +apt update +echo "Installing Java..." +apt install -y default-jre +apt install -y default-jdk +echo 'JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64/bin/java"' >> /etc/environment +source /etc/environment +echo $JAVA_HOME +echo "Done installing Java." +apt update + +#--- Install Confluent Platform (includes Kafka) +# see https://docs.confluent.io/platform/current/installation/installing_cp/deb-ubuntu.html +echo "Installing Confluent Platform..." +# install the key used to sign packages +wget -qO - https://packages.confluent.io/deb/6.0/archive.key | sudo apt-key add - +# add the repository +add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/6.0 stable main" +# install +apt-get update && sudo apt-get install -y confluent-platform +echo "Done installing Confluent Platform." + +#--- Install Kafka -> Pub/Sub connector +# see https://github.com/GoogleCloudPlatform/pubsub/tree/master/kafka-connector +echo "Installing the Kafka -> Pub/Sub connector" +plugindir=/usr/local/share/kafka/plugins +CONNECTOR_RELEASE=v0.5-alpha +mkdir -p ${plugindir} +#- install the connector +cd ${plugindir} +wget https://github.com/GoogleCloudPlatform/pubsub/releases/download/${CONNECTOR_RELEASE}/pubsub-kafka-connector.jar +echo "Done installing the Kafka -> Pub/Sub connector" + +#--- Set the startup script and shutdown +startupscript="gs://${broker_bucket}/consumer/${survey}/vm_startup.sh" +gcloud compute instances add-metadata "$consumerVM" --zone "$zone" \ + --metadata startup-script-url="$startupscript" +echo "vm_install.sh is complete. Shutting down." +shutdown -h now \ No newline at end of file diff --git a/broker/consumer/rubin/vm_shutdown.sh b/broker/consumer/rubin/vm_shutdown.sh new file mode 100644 index 00000000..f0aabf91 --- /dev/null +++ b/broker/consumer/rubin/vm_shutdown.sh @@ -0,0 +1,11 @@ +#! /bin/bash + +# Get VM name +baseurl="http://metadata.google.internal/computeMetadata/v1" +H="Metadata-Flavor: Google" +vm_name="$(curl "${baseurl}/instance/name" -H "${H}")" +zone=$(curl "${baseurl}/instance/zone" -H "${H}") + +# Unset FORCE topics in metadata so there's no unexpected behvaior on next startup +topics="KAFKA_TOPIC_FORCE=,PS_TOPIC_FORCE=" +gcloud compute instances add-metadata "${vm_name}" --zone="${zone}" --metadata="${topics}" diff --git a/broker/consumer/rubin/vm_startup.sh b/broker/consumer/rubin/vm_startup.sh new file mode 100644 index 00000000..df7a8d8c --- /dev/null +++ b/broker/consumer/rubin/vm_startup.sh @@ -0,0 +1,102 @@ +#! /bin/bash +# Configure and Start the Kafka -> Pub/Sub connector + +brokerdir=/home/broker + +#--- Get project and instance metadata +# for info on working with metadata, see here +# https://cloud.google.com/compute/docs/storing-retrieving-metadata +baseurl="http://metadata.google.internal/computeMetadata/v1" +H="Metadata-Flavor: Google" +PROJECT_ID=$(curl "${baseurl}/project/project-id" -H "${H}") +zone=$(curl "${baseurl}/instance/zone" -H "${H}") +PS_TOPIC_FORCE=$(curl "${baseurl}/instance/attributes/PS_TOPIC_FORCE" -H "${H}") +KAFKA_TOPIC_FORCE=$(curl "${baseurl}/instance/attributes/KAFKA_TOPIC_FORCE" -H "${H}") +# parse the survey name and testid from the VM name +consumerVM=$(curl "${baseurl}/instance/name" -H "${H}") +survey=$(echo "$consumerVM" | awk -F "-" '{print $1}') +if [ "$consumerVM" = "${survey}-consumer" ]; then + testid="False" +else + testid=$(echo "$consumerVM" | awk -F "-" '{print $NF}') +fi + +#--- GCP resources used in this script +broker_bucket="${PROJECT_ID}-${survey}-broker_files" +PS_TOPIC_DEFAULT="${survey}-alerts_raw" +# use test resources, if requested +if [ "$testid" != "False" ]; then + broker_bucket="${broker_bucket}-${testid}" + PS_TOPIC_DEFAULT="${PS_TOPIC_DEFAULT}-${testid}" +fi + +#--- Download config files from GCS +# remove all files +rm -r "${brokerdir}" +# download fresh files +mkdir "${brokerdir}" +cd ${brokerdir} || exit +gsutil -m cp -r "gs://${broker_bucket}/consumer" . +# wait. otherwise the script may continue before all files are downloaded, with adverse behavior. +sleep 30s + +#--- Set the topic names to the "FORCE" metadata attributes if exist, else defaults +KAFKA_TOPIC_DEFAULT="alerts-simulated" +KAFKA_TOPIC="${KAFKA_TOPIC_FORCE:-${KAFKA_TOPIC_DEFAULT}}" +PS_TOPIC="${PS_TOPIC_FORCE:-${PS_TOPIC_DEFAULT}}" +# set VM metadata, just for clarity and easy viewing +gcloud compute instances add-metadata "$consumerVM" --zone "$zone" \ + --metadata="PS_TOPIC=${PS_TOPIC},KAFKA_TOPIC=${KAFKA_TOPIC}" + +#--- Files this script will write +workingdir="${brokerdir}/consumer/${survey}" +fout_run="${workingdir}/run-connector.out" +fout_topics="${workingdir}/list.topics" + +#--- Set the connector's configs (Kafka username and password, project, and topics) +# define Rubin-related parameters +#kafka_username="${survey}-${PROJECT_ID}-kafka-username" +kafka_password="${survey}-${PROJECT_ID}-kafka-password" +#KAFKA_USERNAME=$(gcloud secrets versions access latest --secret="${kafka_username}") +KAFKA_PASSWORD=$(gcloud secrets versions access latest --secret="${kafka_password}") + +cd "${workingdir}" || exit + +fconfig=admin.properties +#sed -i "s/KAFKA_USERNAME/${KAFKA_USERNAME}/g" ${fconfig} +sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" ${fconfig} + +fconfig=psconnect-worker.properties +#sed -i "s/KAFKA_USERNAME/${KAFKA_USERNAME}/g" ${fconfig} +sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" ${fconfig} + +fconfig=ps-connector.properties +sed -i "s/PROJECT_ID/${PROJECT_ID}/g" ${fconfig} +sed -i "s/PS_TOPIC/${PS_TOPIC}/g" ${fconfig} +sed -i "s/KAFKA_TOPIC/${KAFKA_TOPIC}/g" ${fconfig} + +#--- Check until alerts start streaming into the topic +alerts_flowing=false +while [ "${alerts_flowing}" = false ] +do + # get list of topics and dump to file + /bin/kafka-topics \ + --bootstrap-server usdf-alert-stream-dev.lsst.cloud:9094 \ + --list \ + --command-config "${workingdir}/admin.properties" \ + > "${fout_topics}" + + # check if our topic is in the list + if grep -Fq "${KAFKA_TOPIC}" "${fout_topics}" + then + alerts_flowing=true # start consuming + else + sleep 60s # sleep 1 min, then try again + fi +done + +#--- Start the Kafka -> Pub/Sub connector, save stdout and stderr to file +/bin/connect-standalone \ + "${workingdir}/psconnect-worker.properties" \ + "${workingdir}/ps-connector.properties" \ + &>> "${fout_run}" diff --git a/broker/setup_broker/rubin/create_vms.sh b/broker/setup_broker/rubin/create_vms.sh new file mode 100644 index 00000000..8b7ad6e8 --- /dev/null +++ b/broker/setup_broker/rubin/create_vms.sh @@ -0,0 +1,46 @@ +#! /bin/bash +# Creates or deletes the GCP VM instances needed by the broker. +# This script will not delete VMs that are in production + + +broker_bucket=$1 # name of GCS bucket where broker files are staged +testid="${2:-test}" +# "False" uses production resources +# any other string will be appended to the names of all resources +teardown="${3:-False}" # "True" tearsdown/deletes resources, else setup +survey="${4:-rubin}" +# name of the survey this broker instance will ingest +zone="${5:-us-central1-a}" +firewallrule="${6:-tcpport9094}" + +#--- GCP resources used in this script +consumerVM="${survey}-consumer" +# use test resources, if requested +if [ "$testid" != "False" ]; then + consumerVM="${consumerVM}-${testid}" +fi + +#--- Teardown resources +if [ "$teardown" = "True" ]; then + # ensure that we do not teardown production resources + if [ "$testid" != "False" ]; then + gcloud compute instances delete "$consumerVM" --zone="$zone" + fi + +#--- Create resources +else +#--- Consumer VM + # create VM + machinetype=e2-standard-2 + # metadata + googlelogging="google-logging-enabled=true" + startupscript="startup-script-url=gs://${broker_bucket}/consumer/${survey}/vm_install.sh" + shutdownscript="shutdown-script-url=gs://${broker_bucket}/consumer/${survey}/vm_shutdown.sh" + gcloud compute instances create "${consumerVM}" \ + --zone="${zone}" \ + --machine-type="${machinetype}" \ + --scopes=cloud-platform \ + --metadata="${googlelogging},${startupscript},${shutdownscript}" \ + --tags="${firewallrule}" + +fi diff --git a/broker/setup_broker/rubin/setup_broker.sh b/broker/setup_broker/rubin/setup_broker.sh new file mode 100644 index 00000000..acc44133 --- /dev/null +++ b/broker/setup_broker/rubin/setup_broker.sh @@ -0,0 +1,89 @@ +#! /bin/bash +# Create and configure GCP resources needed to run the nightly broker. + +testid="${1:-test}" +# "False" uses production resources +# any other string will be appended to the names of all resources +teardown="${2:-False}" +# "True" tearsdown/deletes resources, else setup +survey="${3:-rubin}" +# name of the survey this broker instance will ingest +region="${4:-us-central1}" +zone="${region}-a" # just use zone "a" instead of adding another script arg + +PROJECT_ID=$GOOGLE_CLOUD_PROJECT # get the environment variable + +#--- Make the user confirm the settings +echo +echo "setup_broker.sh will run with the following configs: " +echo +echo "GOOGLE_CLOUD_PROJECT = ${PROJECT_ID}" +echo "survey = ${survey}" +echo "testid = ${testid}" +echo "teardown = ${teardown}" +echo +echo "Continue? [y/(n)]: " + +read continue_with_setup +continue_with_setup="${continue_with_setup:-n}" +if [ "$continue_with_setup" != "y" ]; then + echo "Exiting setup." + echo + exit +fi + +#--- GCP resources used directly in this script +broker_bucket="${PROJECT_ID}-${survey}-broker_files" +topic_alerts="${survey}-alerts" +# use test resources, if requested +# (there must be a better way to do this) +if [ "$testid" != "False" ]; then + broker_bucket="${broker_bucket}-${testid}" + topic_alerts="${topic_alerts}-${testid}" +fi + + +#--- Create (or delete) GCS, Pub/Sub resources +if [ "${teardown}" != "True" ]; then + # create broker bucket and upload files + echo "Creating broker_bucket and uploading files..." + gsutil mb -b on -l "${region}" "gs://${broker_bucket}" + ./upload_broker_bucket.sh "${broker_bucket}" + + # create pubsub + echo "Configuring Pub/Sub resources..." + gcloud pubsub topics create "${topic_alerts}" + + # Set IAM policies on resources + user="allUsers" + roleid="projects/${GOOGLE_CLOUD_PROJECT}/roles/userPublic" + gcloud pubsub topics add-iam-policy-binding "${topic_alerts}" --member="${user}" --role="${roleid}" + +else + # ensure that we do not teardown production resources + if [ "${testid}" != "False" ]; then + o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs + gsutil -m -o "${o}" rm -r "gs://${broker_bucket}" + gcloud pubsub topics delete "${topic_alerts}" + fi +fi + +if [ "$teardown" != "True" ]; then + #--- Create a firewall rule to open the port used by Kafka/Rubin + # on any instance with the flag --tags=tcpport9094 + echo + echo "Configuring Rubin/Kafka firewall rule..." + firewallrule="tcpport9094" + gcloud compute firewall-rules create "${firewallrule}" \ + --allow=tcp:9094 \ + --description="Allow incoming traffic on TCP port 9094" \ + --direction=INGRESS \ + --enable-logging +fi + +#--- Create VM instances +echo +echo "Configuring VMs..." +./create_vms.sh "${broker_bucket}" "${testid}" "${teardown}" "${survey}" "${zone}" "${firewallrule}" + + diff --git a/broker/setup_broker/rubin/upload_broker_bucket.sh b/broker/setup_broker/rubin/upload_broker_bucket.sh new file mode 100755 index 00000000..2f6c0471 --- /dev/null +++ b/broker/setup_broker/rubin/upload_broker_bucket.sh @@ -0,0 +1,8 @@ +#! /bin/bash + +broker_bucket=$1 # name of GCS bucket where broker files should be staged + +echo +echo "Uploading broker files to GCS..." +o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs +gsutil -m -o "${o}" cp -r ../../consumer "gs://${broker_bucket}" \ No newline at end of file From e7d5eadfc8eef9ebd8223fae05997b840d4e1196 Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Mon, 20 May 2024 12:56:06 -0400 Subject: [PATCH 02/15] address codacy issues --- broker/consumer/rubin/vm_install.sh | 5 +++-- broker/setup_broker/rubin/setup_broker.sh | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/broker/consumer/rubin/vm_install.sh b/broker/consumer/rubin/vm_install.sh index f147fa40..0c51bb04 100644 --- a/broker/consumer/rubin/vm_install.sh +++ b/broker/consumer/rubin/vm_install.sh @@ -39,8 +39,9 @@ echo "Installing Java..." apt install -y default-jre apt install -y default-jdk echo 'JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64/bin/java"' >> /etc/environment +# shellcheck source=/dev/null source /etc/environment -echo $JAVA_HOME +echo "$JAVA_HOME" echo "Done installing Java." apt update @@ -62,7 +63,7 @@ plugindir=/usr/local/share/kafka/plugins CONNECTOR_RELEASE=v0.5-alpha mkdir -p ${plugindir} #- install the connector -cd ${plugindir} +cd ${plugindir} || exit wget https://github.com/GoogleCloudPlatform/pubsub/releases/download/${CONNECTOR_RELEASE}/pubsub-kafka-connector.jar echo "Done installing the Kafka -> Pub/Sub connector" diff --git a/broker/setup_broker/rubin/setup_broker.sh b/broker/setup_broker/rubin/setup_broker.sh index acc44133..76699c1e 100644 --- a/broker/setup_broker/rubin/setup_broker.sh +++ b/broker/setup_broker/rubin/setup_broker.sh @@ -24,7 +24,7 @@ echo "teardown = ${teardown}" echo echo "Continue? [y/(n)]: " -read continue_with_setup +read -r continue_with_setup continue_with_setup="${continue_with_setup:-n}" if [ "$continue_with_setup" != "y" ]; then echo "Exiting setup." From 59adbe210871903a6e5f43a33ba12e8c2779ce64 Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Mon, 20 May 2024 12:58:15 -0400 Subject: [PATCH 03/15] add pubsub subscription in setup_broker.sh --- broker/setup_broker/rubin/setup_broker.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/broker/setup_broker/rubin/setup_broker.sh b/broker/setup_broker/rubin/setup_broker.sh index 76699c1e..1d5d59f3 100644 --- a/broker/setup_broker/rubin/setup_broker.sh +++ b/broker/setup_broker/rubin/setup_broker.sh @@ -35,11 +35,13 @@ fi #--- GCP resources used directly in this script broker_bucket="${PROJECT_ID}-${survey}-broker_files" topic_alerts="${survey}-alerts" +pubsub_subscription="${topic_alerts}" # use test resources, if requested # (there must be a better way to do this) if [ "$testid" != "False" ]; then broker_bucket="${broker_bucket}-${testid}" topic_alerts="${topic_alerts}-${testid}" + pubsub_subscription="${pubsub_subscription}-${testid}" fi @@ -53,6 +55,7 @@ if [ "${teardown}" != "True" ]; then # create pubsub echo "Configuring Pub/Sub resources..." gcloud pubsub topics create "${topic_alerts}" + gcloud pubsub subscriptions create "${pubsub_subscription}" --topic="${topic_alerts}" # Set IAM policies on resources user="allUsers" @@ -65,6 +68,7 @@ else o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs gsutil -m -o "${o}" rm -r "gs://${broker_bucket}" gcloud pubsub topics delete "${topic_alerts}" + gcloud pubsub subscriptions delete "${pubsub_subscription}" fi fi From 6abaf2dd3af15b269ecdb39afa575c8206c81910 Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Tue, 21 May 2024 10:51:07 -0400 Subject: [PATCH 04/15] remove kafka_username env var. from vm_startup.sh --- broker/consumer/rubin/vm_startup.sh | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/broker/consumer/rubin/vm_startup.sh b/broker/consumer/rubin/vm_startup.sh index df7a8d8c..fd129d02 100644 --- a/broker/consumer/rubin/vm_startup.sh +++ b/broker/consumer/rubin/vm_startup.sh @@ -53,21 +53,17 @@ workingdir="${brokerdir}/consumer/${survey}" fout_run="${workingdir}/run-connector.out" fout_topics="${workingdir}/list.topics" -#--- Set the connector's configs (Kafka username and password, project, and topics) +#--- Set the connector's configs (Kafka password, project, and topic) # define Rubin-related parameters -#kafka_username="${survey}-${PROJECT_ID}-kafka-username" kafka_password="${survey}-${PROJECT_ID}-kafka-password" -#KAFKA_USERNAME=$(gcloud secrets versions access latest --secret="${kafka_username}") KAFKA_PASSWORD=$(gcloud secrets versions access latest --secret="${kafka_password}") cd "${workingdir}" || exit fconfig=admin.properties -#sed -i "s/KAFKA_USERNAME/${KAFKA_USERNAME}/g" ${fconfig} sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" ${fconfig} fconfig=psconnect-worker.properties -#sed -i "s/KAFKA_USERNAME/${KAFKA_USERNAME}/g" ${fconfig} sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" ${fconfig} fconfig=ps-connector.properties From 5a6515cfee389c11359ea94c5f9845e258974be6 Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Tue, 21 May 2024 16:46:58 -0400 Subject: [PATCH 05/15] remove blank lines in setup_broker.sh --- broker/setup_broker/rubin/create_vms.sh | 0 broker/setup_broker/rubin/setup_broker.sh | 2 -- 2 files changed, 2 deletions(-) mode change 100644 => 100755 broker/setup_broker/rubin/create_vms.sh mode change 100644 => 100755 broker/setup_broker/rubin/setup_broker.sh diff --git a/broker/setup_broker/rubin/create_vms.sh b/broker/setup_broker/rubin/create_vms.sh old mode 100644 new mode 100755 diff --git a/broker/setup_broker/rubin/setup_broker.sh b/broker/setup_broker/rubin/setup_broker.sh old mode 100644 new mode 100755 index 1d5d59f3..e3518aad --- a/broker/setup_broker/rubin/setup_broker.sh +++ b/broker/setup_broker/rubin/setup_broker.sh @@ -89,5 +89,3 @@ fi echo echo "Configuring VMs..." ./create_vms.sh "${broker_bucket}" "${testid}" "${teardown}" "${survey}" "${zone}" "${firewallrule}" - - From 5119f76dff51a561bdf789e535c0515f1e2354f2 Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Tue, 21 May 2024 16:47:27 -0400 Subject: [PATCH 06/15] update default pubsub topic value in vm_startup.sh --- broker/consumer/rubin/vm_startup.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/consumer/rubin/vm_startup.sh b/broker/consumer/rubin/vm_startup.sh index fd129d02..ae86d44c 100644 --- a/broker/consumer/rubin/vm_startup.sh +++ b/broker/consumer/rubin/vm_startup.sh @@ -23,7 +23,7 @@ fi #--- GCP resources used in this script broker_bucket="${PROJECT_ID}-${survey}-broker_files" -PS_TOPIC_DEFAULT="${survey}-alerts_raw" +PS_TOPIC_DEFAULT="${survey}-alerts" # use test resources, if requested if [ "$testid" != "False" ]; then broker_bucket="${broker_bucket}-${testid}" From 527d830d86df940d33f429bdc72f3519dafb24da Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Wed, 22 May 2024 10:21:21 -0400 Subject: [PATCH 07/15] create README in setup_broker/rubin dir --- broker/setup_broker/rubin/README.md | 126 ++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 broker/setup_broker/rubin/README.md diff --git a/broker/setup_broker/rubin/README.md b/broker/setup_broker/rubin/README.md new file mode 100644 index 00000000..0f36a3e6 --- /dev/null +++ b/broker/setup_broker/rubin/README.md @@ -0,0 +1,126 @@ +# Connect Pitt-Google to the Rubin-Broker integration test stream + +May 2024 - Author: Christopher Hernández + +- [Overview](#overview) +- [Setup](#setup) +- [Ingest the Rubin test stream](#ingest-the-rubin-test-stream) +- [Delete broker instance](#delete-broker-instance) + +## Overview + +During the first week of May 2024, a broker integration exercise was held and focused on the topic of connectivity. +This document outlines my procedure in connecting to the test stream. + +For this exercise, the same credentials as the IDF integration exercise were used. Credential information was emailed to me +by Troy Raen, and the credential value (e.g., `kafka_password`) was stored as a +[secret](https://cloud.google.com/secret-manager/docs/overview#secret) using Google Cloud's +[Secret Manager](https://cloud.google.com/secret-manager/docs/overview). + +Details on the number of alerts in this exercise from a conversation with Brianna Smart: +"The DC2 finished producing alerts, putting 303,288 new alerts into the stream, for a total of 376,800, the first +76,800 of which are a small subset duplicated 3 times." + +## Setup + +This section assumes that you have: +- Set the environment variables `GOOGLE_CLOUD_PROJECT` and `GOOGLE_APPLICATION_CREDENTIALS` to appropriate values for +your GCP project and service account credentials +- Authenticated the service account to make `gcloud` calls through the project +- Enabled the [Secret Manager API](https://cloud.google.com/secret-manager/docs/configuring-secret-manager#enable_api) +in your Google Cloud Project +- Granted the default compute service account the role of `Secret Manager Secret Accessor` in the +[IAM & Admin page](https://console.cloud.google.com/iam-admin) + +You may want to +[activate a service account for `gcloud` calls](https://pitt-broker.readthedocs.io/en/u-tjr-workingnotes/working-notes/troyraen/service-account.html#switch-the-service-account-your-api-calls-use) +or +[set up a GCP project from scratch](https://pitt-broker.readthedocs.io/en/latest/broker/run-a-broker-instance/initial-setup.html#setup-local-environment). + +Create secrets for your access credential: + +```bash +# define parameters +survey="rubin" +PROJECT_ID=$GOOGLE_CLOUD_PROJECT + +# define secret names +kafka_password="${survey}-${PROJECT_ID}-kafka-password" + +# create secret +gcloud secrets create "${kafka_password}" \ + --replication-policy="automatic" +``` + +Select one of the following options to add a secret version. Note that adding a version directly on the command line is +discouraged by Google Cloud, see [add a secret version documentation](https://cloud.google.com/secret-manager/docs/add-secret-version#add-secret-version) for details. + +```bash +# add a secret version from the contents of a file on disk +gcloud secrets versions add "${client_id}" --data-file="/path/to/file.txt" +gcloud secrets versions add "${client_secret}" --data-file="/path/to/file.txt" + +# add a secret version directly on the command line +echo -n "enter the client id provided by GCN" | \ + gcloud secrets versions add "${client_id}" --data-file=- +echo -n "enter the client secret provided by GCN" | \ + gcloud secrets versions add "${client_secret}" --data-file=- +``` + +Clone the repo and cd into the directory: + +```bash +git clone https://github.com/mwvgroup/Pitt-Google-Broker.git +cd Pitt-Google-Broker/broker/setup_broker/rubin +``` + +Define the variables used below. + +```bash +testid="enter testid value" +teardown="False" +survey="rubin" +region="us-central1" +``` + +Execute the `setup_broker.sh` script: + +```bash +./setup_broker.sh "${testid}" "${teardown}" "${survey}" "${region}" +``` +This will create all of the necessary GCP resources. Allow the consumer VM to finish its installation process. Once +complete, the VM will shut down automatically. You can check the status of the VM in the +[Google Cloud Console](https://console.cloud.google.com/compute). This entire process should take less than 10 minutes. + +## Ingest the Rubin test stream + +### Setup Consumer VM + +```bash +zone="${region}-a" +consumerVM="${survey}-consumer-${testid}" + +# Set the VM metadata +KAFKA_TOPIC="alerts-simulated" +PS_TOPIC="${survey}-alerts-${testid}" +gcloud compute instances add-metadata "${consumerVM}" --zone "${zone}" \ + --metadata="PS_TOPIC_FORCE=${PS_TOPIC},KAFKA_TOPIC_FORCE=${KAFKA_TOPIC}" + +# Start the VM +gcloud compute instances start "${consumerVM}" --zone ${zone} +# this launches the startup script which configures and starts the +# Kafka -> Pub/Sub connector +``` + +## Delete broker instance + +Initialize parameters and call the deployment script: + +```bash +testid="mytest" +teardown="True" +survey="rubin" +region="us-central1" + +./setup_broker.sh "${testid}" "${teardown}" "${survey}" "${region}" +``` From 3f20af019e6672c8a1d8f497efa4400c62db5602 Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Thu, 23 May 2024 09:31:59 -0400 Subject: [PATCH 08/15] address whitespace issue in README.md --- broker/setup_broker/rubin/README.md | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/broker/setup_broker/rubin/README.md b/broker/setup_broker/rubin/README.md index 0f36a3e6..71cd5f1c 100644 --- a/broker/setup_broker/rubin/README.md +++ b/broker/setup_broker/rubin/README.md @@ -14,7 +14,7 @@ This document outlines my procedure in connecting to the test stream. For this exercise, the same credentials as the IDF integration exercise were used. Credential information was emailed to me by Troy Raen, and the credential value (e.g., `kafka_password`) was stored as a -[secret](https://cloud.google.com/secret-manager/docs/overview#secret) using Google Cloud's +[secret](https://cloud.google.com/secret-manager/docs/overview#secret) using Google Cloud's [Secret Manager](https://cloud.google.com/secret-manager/docs/overview). Details on the number of alerts in this exercise from a conversation with Brianna Smart: @@ -24,12 +24,13 @@ Details on the number of alerts in this exercise from a conversation with Briann ## Setup This section assumes that you have: + - Set the environment variables `GOOGLE_CLOUD_PROJECT` and `GOOGLE_APPLICATION_CREDENTIALS` to appropriate values for your GCP project and service account credentials - Authenticated the service account to make `gcloud` calls through the project - Enabled the [Secret Manager API](https://cloud.google.com/secret-manager/docs/configuring-secret-manager#enable_api) in your Google Cloud Project -- Granted the default compute service account the role of `Secret Manager Secret Accessor` in the +- Granted the default compute service account the role of `Secret Manager Secret Accessor` in the [IAM & Admin page](https://console.cloud.google.com/iam-admin) You may want to @@ -53,7 +54,9 @@ gcloud secrets create "${kafka_password}" \ ``` Select one of the following options to add a secret version. Note that adding a version directly on the command line is -discouraged by Google Cloud, see [add a secret version documentation](https://cloud.google.com/secret-manager/docs/add-secret-version#add-secret-version) for details. +discouraged by Google Cloud, see +[add a secret version documentation](https://cloud.google.com/secret-manager/docs/add-secret-version#add-secret-version) +for details. ```bash # add a secret version from the contents of a file on disk @@ -88,8 +91,9 @@ Execute the `setup_broker.sh` script: ```bash ./setup_broker.sh "${testid}" "${teardown}" "${survey}" "${region}" ``` -This will create all of the necessary GCP resources. Allow the consumer VM to finish its installation process. Once -complete, the VM will shut down automatically. You can check the status of the VM in the + +This will create all of the necessary GCP resources. Allow the consumer VM to finish its installation process. Once +complete, the VM will shut down automatically. You can check the status of the VM in the [Google Cloud Console](https://console.cloud.google.com/compute). This entire process should take less than 10 minutes. ## Ingest the Rubin test stream From e4605dcfe08f6101a2da9a0f319d03c2c4967af7 Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Thu, 23 May 2024 09:39:47 -0400 Subject: [PATCH 09/15] address codacy issues --- broker/setup_broker/rubin/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/broker/setup_broker/rubin/README.md b/broker/setup_broker/rubin/README.md index 71cd5f1c..04decfe3 100644 --- a/broker/setup_broker/rubin/README.md +++ b/broker/setup_broker/rubin/README.md @@ -12,8 +12,8 @@ May 2024 - Author: Christopher Hernández During the first week of May 2024, a broker integration exercise was held and focused on the topic of connectivity. This document outlines my procedure in connecting to the test stream. -For this exercise, the same credentials as the IDF integration exercise were used. Credential information was emailed to me -by Troy Raen, and the credential value (e.g., `kafka_password`) was stored as a +For this exercise, the same credentials as the IDF integration exercise were used. Credential information was emailed +to me by Troy Raen, and the credential value (e.g., `kafka_password`) was stored as a [secret](https://cloud.google.com/secret-manager/docs/overview#secret) using Google Cloud's [Secret Manager](https://cloud.google.com/secret-manager/docs/overview). @@ -54,7 +54,7 @@ gcloud secrets create "${kafka_password}" \ ``` Select one of the following options to add a secret version. Note that adding a version directly on the command line is -discouraged by Google Cloud, see +discouraged by Google Cloud, see [add a secret version documentation](https://cloud.google.com/secret-manager/docs/add-secret-version#add-secret-version) for details. From 16d7925609de8738de5b3de92a141db212f3b0cd Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Thu, 23 May 2024 09:50:30 -0400 Subject: [PATCH 10/15] rename previous README.md file --- .../rubin/first_rubinstreamingtest_dec2021.md | 344 ++++++++++++++++++ 1 file changed, 344 insertions(+) create mode 100644 broker/consumer/rubin/first_rubinstreamingtest_dec2021.md diff --git a/broker/consumer/rubin/first_rubinstreamingtest_dec2021.md b/broker/consumer/rubin/first_rubinstreamingtest_dec2021.md new file mode 100644 index 00000000..a9daae0b --- /dev/null +++ b/broker/consumer/rubin/first_rubinstreamingtest_dec2021.md @@ -0,0 +1,344 @@ +# Connect Pitt-Google to the Rubin alert stream testing deployment + +December 2021 - Author: Troy Raen + +- [Overview](#overview) +- [Setup](#setup) +- [Ingest the Rubin test stream](#ingest-the-rubin-test-stream) +- [Pull a Pub/Sub message and open it](#pull-a-pubsub-message-and-open-it) +- [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) + +## Overview + +Details and access credentials were sent to us by Eric Bellm via email. +Spencer Nelson provided some additional details specific to our Kafka Connect consumer. +Here are some links they gave us for reference which were used to set this up: + +- [Rubin sample alerts: obtaining the data with Kafka](https://github.com/lsst-dm/sample_alert_info#obtaining-the-data-with-kafka) +- [Rubin Alert Stream Integration Endpoint](https://github.com/lsst-dm/sample_alert_info/blob/main/doc/alert_stream_integration_endpoint.md) +- Schemas are stored at: +- [Using schema registry with Kafka Connect](https://docs.confluent.io/platform/7.0.1/schema-registry/connect.html). + Spencer says, "Our stream uses Avro for the message values, not keys (we + don't set the key to anything in particular), so you probably want the + `value.converter` properties." +- Tools and libraries for VOEvents: + +- [Rubin example: java console consumer](https://github.com/lsst-dm/sample_alert_info/tree/main/examples/alert_stream_integration_endpoint/java_console_consumer) + +Rubin alert packets will be Avro serialized, but the schema will not be included with the packet. +There are several ways to handle this. +For now, I have simply passed the alert bytes straight through from Kafka to Pub/Sub and deserialized +alerts after pulling from the Pub/Sub stream. +For other methods, see +[Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below. + +Below is the code I used to set up the necessary resources in GCP, ingest the Rubin stream, pull +messages from the resulting Pub/Sub stream and deserialize the alerts. + +## Setup + +The following assumes you have set the environment variables +`GOOGLE_CLOUD_PROJECT` and `GOOGLE_APPLICATION_CREDENTIALS` +to appropriate values for your GCP project and service account credentials, and that +the service account is authenticated to make `gcloud` calls through the project. +You may want to +[activate a service account for `gcloud` calls](https://pitt-broker.readthedocs.io/en/u-tjr-workingnotes/working-notes/troyraen/service-account.html#switch-the-service-account-your-api-calls-use) +or +[set up a GCP project from scratch](https://pitt-broker.readthedocs.io/en/latest/broker/run-a-broker-instance/initial-setup.html#setup-local-environment). + +Clone the repo and cd into the directory: + +```bash +git clone https://github.com/mwvgroup/Pitt-Google-Broker.git +cd Pitt-Google-Broker +``` + +Define variables used below in multiple calls. +The `KAFKA_USERNAME` and `KAFKA_PASSWORD` must be customized + +```bash +PROJECT_ID="${GOOGLE_CLOUD_PROJECT}" +# For reference, I ran this with: +# PROJECT_ID="avid-heading-329016" # project name: pitt-google-broker-testing +survey="rubin" +broker_bucket="${PROJECT_ID}-${survey}-broker_files" +consumerVM="${survey}-consumer" +firewallrule="tcpport9094" + +# Kafka credentials for the Rubin stream +KAFKA_USERNAME="pittgoogle-idfint" # set to correct username +KAFKA_PASSWORD="" # set to correct password + +PUBSUB_TOPIC="rubin-alerts" +PUBSUB_SUBSCRIPTION="${PUBSUB_TOPIC}" +KAFKA_TOPIC="alerts-simulated" +``` + +Setup resources on Google Cloud Platform. + +```bash +# Create a firewall rule to open port 9094 (only needs to be done once, per project) +gcloud compute firewall-rules create "${firewallrule}" \ + --allow=tcp:9094 \ + --description="Allow incoming traffic on TCP port 9094" \ + --direction=INGRESS \ + --enable-logging + +# Create a Cloud Storage bucket to store the consumer config files +gsutil mb "gs://${broker_bucket}" + +# Upload the install script and config files for the consumer +o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs +gsutil -m -o "$o" cp -r broker/consumer "gs://${broker_bucket}" + +# Create a Pub/Sub topic and subscription for Rubin alerts +gcloud pubsub topics create "${PUBSUB_TOPIC}" +gcloud pubsub subscriptions create "${PUBSUB_SUBSCRIPTION}" --topic="${PUBSUB_TOPIC}" + +# Create a Rubin Consumer VM +zone="us-central1-a" +machinetype="e2-standard-2" +installscript="gs://${broker_bucket}/consumer/vm_install.sh" +gcloud compute instances create "${consumerVM}" \ + --zone="${zone}" \ + --machine-type="${machinetype}" \ + --scopes=cloud-platform \ + --metadata=google-logging-enabled=true,startup-script-url="${installscript}" \ + --tags="${firewallrule}" +``` + +## Ingest the Rubin test stream + +### Setup Consumer VM + +```bash +# start the consumer vm and ssh in +gcloud compute instances start "${consumerVM}" +gcloud compute ssh "${consumerVM}" + +# define some variables +brokerdir=/home/broker # user's home dir on this machine +workingdir="${brokerdir}/consumer/rubin" # consumer's working dir on this machine + +# We will also need the variables defined at the top of this document. +# Go back up to the "Setup" section and define the variables given +# in the code block under "Define variables...", in your environment. +``` + +### Test the connection + +#### Check available Kafka topics + +```bash +/bin/kafka-topics \ + --bootstrap-server alert-stream-int.lsst.cloud:9094 \ + --list \ + --command-config "${workingdir}/admin.properties" +# should see output that includes the topic: alerts-simulated +``` + +#### Test the topic connection using the Kafka Console Consumer + +Set Java env variable + +```bash +export JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64" +``` + +Make a file called 'consumer.properties' and fill it with this +(change `KAFKA_PASSWORD` to the appropriate value): + +```bash +security.protocol=SASL_SSL +sasl.mechanism=SCRAM-SHA-512 + +sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ + username="pittgoogle-idfint"\ + password="KAFKA_PASSWORD"; +``` + +Run the Kafka console consumer + +```bash +sudo /bin/kafka-avro-console-consumer \ + --bootstrap-server alert-stream-int.lsst.cloud:9094 \ + --group "${KAFKA_USERNAME}-example-javaconsole" \ + --topic "${KAFKA_TOPIC}" \ + --property schema.registry.url=https://alert-schemas-int.lsst.cloud \ + --consumer.config consumer.properties \ + --timeout-ms=60000 +# if successful, you will see a lot of JSON flood the terminal +``` + +### Run the Kafka -> Pub/Sub connector + +Setup: + +```bash +# download the config files from broker_bucket +sudo mkdir "${brokerdir}" +sudo gsutil -m cp -r "gs://${broker_bucket}/consumer" "${brokerdir}" + +# set the password in two of the config files +sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/admin.properties" +sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/psconnect-worker.properties" + +# replace topic and project configs in ps-connector.properties +fconfig="${workingdir}/ps-connector.properties" +sudo sed -i "s/PROJECT_ID/${PROJECT_ID}/g" ${fconfig} +sudo sed -i "s/PUBSUB_TOPIC/${PUBSUB_TOPIC}/g" ${fconfig} +sudo sed -i "s/KAFKA_TOPIC/${KAFKA_TOPIC}/g" ${fconfig} +``` + +Run the connector: + +```bash +mydir="/home/troyraen" # use my dir because don't have permission to write to workingdir +fout_run="${mydir}/run-connector.out" +sudo /bin/connect-standalone \ + ${workingdir}/psconnect-worker.properties \ + ${workingdir}/ps-connector.properties \ + &> ${fout_run} +``` + +## Pull a Pub/Sub message and open it + +In the future, we should download schemas from the Confluent Schema Registry and store them. +Then for each alert, check the schema version in the Confluent Wire header, and load the schema file using `fastavro`. +See [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below. + +For now, use the schema in the `lsst-alert-packet` library. Install the library: + +```bash +pip install lsst-alert-packet +``` + +Following the deserialization example at + + +```python +import io +import fastavro +from google.cloud import pubsub_v1 +from lsst.alert.packet import Schema + +# pull a message +project_id = "avid-heading-329016" +subscription_name = "rubin-alerts" +max_messages = 5 + +subscriber = pubsub_v1.SubscriberClient() +subscription_path = subscriber.subscription_path(project_id, subscription_name) +request = { + "subscription": subscription_path, + "max_messages": max_messages, +} + +response = subscriber.pull(**request) + +# load the schema +latest_schema = Schema.from_file().definition + +# deserialize the alerts. +# This follows the deserialization example at +# https://github.com/lsst-dm/alert_stream/blob/main/python/lsst/alert/stream/serialization.py +for received_message in response.received_messages: + alert_bytes = received_message.message.data + # header_bytes = alert_bytes[:5] + # schema_version = deserialize_confluent_wire_header(header_bytes) + content_bytes = io.BytesIO(alert_bytes[5:]) + alert_dict = fastavro.schemaless_reader(content_bytes, latest_schema) + alertId = alert_dict['alertId'] + diaSourceId = alert_dict['diaSource']['diaSourceId'] + psFlux = alert_dict['diaSource']['psFlux'] + print(f"alertId: {alertId}, diaSourceId: {diaSourceId}, psFlux: {psFlux}") +``` + +## Alternative methods for handling the schema + +### Download with a `GET` request, and read the alert's schema version from the Confluent Wire header + +In the future, we should download schemas from the Confluent Schema Registry and store them +(assuming we do not use the schema registry directly in the Kafka connector). +Then for each alert, check the schema version in the Confluent Wire header, and load the schema +file using `fastavro`. + +Recommendation from Spencer Nelson: + +> You might want to look at how Rubin's alert database ingester works. It does the same steps of +> deserializing alert packets, but uses the schema registry instead of lsst.alert.packet: +> +> +> + +Pub/Sub topics can be configured with an Avro schema attached, but it cannot be changed once attached. +We would have to create a new topic for every schema version. +Therefore, I don't think we should do it this way. + +#### Download a schema from the Confluent Schema Registry using a `GET` request + +```bash +SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=$KAFKA_USERNAME:$KAFKA_PASSWORD +SCHEMA_REGISTRY_URL="https://alert-schemas-int.lsst.cloud" +schema_version=1 +fout_rubinschema="rubinschema_v${schema_version}.avsc" + +# get list of schema subjects +curl --silent -X GET -u "${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" "${SCHEMA_REGISTRY_URL}/subjects" +# download a particular schema +curl --silent -X GET -u \ + "${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" \ + "${SCHEMA_REGISTRY_URL}/schemas/ids/${schema_version}" \ + > "${fout_rubinschema}" +``` + +#### Read the alert's schema version from the Confluent Wire header + +The following is copied from + + +```python +import struct + +_ConfluentWireFormatHeader = struct.Struct(">bi") + +def deserialize_confluent_wire_header(raw): + """Parses the byte prefix for Confluent Wire Format-style Kafka messages. + Parameters + ---------- + raw : `bytes` + The 5-byte encoded message prefix. + Returns + ------- + schema_version : `int` + A version number which indicates the Confluent Schema Registry ID + number of the Avro schema used to encode the message that follows this + header. + """ + _, version = _ConfluentWireFormatHeader.unpack(raw) + return version + +header_bytes = alert_bytes[:5] +schema_version = deserialize_confluent_wire_header(header_bytes) +``` + +### Use the Confluent Schema Registry with the Kafka Connector + +Kafka Connect can use the Confluent Schema Registry directly. +But schemas are stored under subjects and Kafka Connect is picky about how those +subjects are named. +See + +**Rubin has set the schema subject name to “alert-packet”**, which does not conform +to any of the name strategies that Kafka Connect uses. +I did not find a workaround for this issue. +Instead, I passed the alert bytes straight through into Pub/Sub and deserialized +them after pulling the messages from Pub/Sub. + +If you want to try this in the future, set the following configs in the connector's psconnect-worker.properties file. + +```bash +value.converter=io.confluent.connect.avro.AvroConverter +value.converter.schema.registry.url=https://alert-schemas-int.lsst.cloud +value.converter.enhanced.avro.schema.support=true +``` From 5693d52596e2d42ed51b52fb89060844a99d7841 Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Thu, 23 May 2024 09:50:52 -0400 Subject: [PATCH 11/15] create new README.md in `broker/consumer/rubin` --- broker/consumer/rubin/README.md | 353 ++------------------------------ 1 file changed, 22 insertions(+), 331 deletions(-) diff --git a/broker/consumer/rubin/README.md b/broker/consumer/rubin/README.md index a9daae0b..eca00a51 100644 --- a/broker/consumer/rubin/README.md +++ b/broker/consumer/rubin/README.md @@ -1,344 +1,35 @@ -# Connect Pitt-Google to the Rubin alert stream testing deployment +# Start the Rubin consumer VM -December 2021 - Author: Troy Raen +See `Pitt-Google-Broker/broker/setup_broker/rubin/README.md` for setup instructions. -- [Overview](#overview) -- [Setup](#setup) -- [Ingest the Rubin test stream](#ingest-the-rubin-test-stream) -- [Pull a Pub/Sub message and open it](#pull-a-pubsub-message-and-open-it) -- [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) - -## Overview - -Details and access credentials were sent to us by Eric Bellm via email. -Spencer Nelson provided some additional details specific to our Kafka Connect consumer. -Here are some links they gave us for reference which were used to set this up: - -- [Rubin sample alerts: obtaining the data with Kafka](https://github.com/lsst-dm/sample_alert_info#obtaining-the-data-with-kafka) -- [Rubin Alert Stream Integration Endpoint](https://github.com/lsst-dm/sample_alert_info/blob/main/doc/alert_stream_integration_endpoint.md) -- Schemas are stored at: -- [Using schema registry with Kafka Connect](https://docs.confluent.io/platform/7.0.1/schema-registry/connect.html). - Spencer says, "Our stream uses Avro for the message values, not keys (we - don't set the key to anything in particular), so you probably want the - `value.converter` properties." -- Tools and libraries for VOEvents: - -- [Rubin example: java console consumer](https://github.com/lsst-dm/sample_alert_info/tree/main/examples/alert_stream_integration_endpoint/java_console_consumer) - -Rubin alert packets will be Avro serialized, but the schema will not be included with the packet. -There are several ways to handle this. -For now, I have simply passed the alert bytes straight through from Kafka to Pub/Sub and deserialized -alerts after pulling from the Pub/Sub stream. -For other methods, see -[Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below. - -Below is the code I used to set up the necessary resources in GCP, ingest the Rubin stream, pull -messages from the resulting Pub/Sub stream and deserialize the alerts. - -## Setup - -The following assumes you have set the environment variables -`GOOGLE_CLOUD_PROJECT` and `GOOGLE_APPLICATION_CREDENTIALS` -to appropriate values for your GCP project and service account credentials, and that -the service account is authenticated to make `gcloud` calls through the project. -You may want to -[activate a service account for `gcloud` calls](https://pitt-broker.readthedocs.io/en/u-tjr-workingnotes/working-notes/troyraen/service-account.html#switch-the-service-account-your-api-calls-use) -or -[set up a GCP project from scratch](https://pitt-broker.readthedocs.io/en/latest/broker/run-a-broker-instance/initial-setup.html#setup-local-environment). - -Clone the repo and cd into the directory: - -```bash -git clone https://github.com/mwvgroup/Pitt-Google-Broker.git -cd Pitt-Google-Broker -``` - -Define variables used below in multiple calls. -The `KAFKA_USERNAME` and `KAFKA_PASSWORD` must be customized +To start the consumer VM: ```bash -PROJECT_ID="${GOOGLE_CLOUD_PROJECT}" -# For reference, I ran this with: -# PROJECT_ID="avid-heading-329016" # project name: pitt-google-broker-testing survey="rubin" -broker_bucket="${PROJECT_ID}-${survey}-broker_files" -consumerVM="${survey}-consumer" -firewallrule="tcpport9094" - -# Kafka credentials for the Rubin stream -KAFKA_USERNAME="pittgoogle-idfint" # set to correct username -KAFKA_PASSWORD="" # set to correct password - -PUBSUB_TOPIC="rubin-alerts" -PUBSUB_SUBSCRIPTION="${PUBSUB_TOPIC}" -KAFKA_TOPIC="alerts-simulated" -``` - -Setup resources on Google Cloud Platform. - -```bash -# Create a firewall rule to open port 9094 (only needs to be done once, per project) -gcloud compute firewall-rules create "${firewallrule}" \ - --allow=tcp:9094 \ - --description="Allow incoming traffic on TCP port 9094" \ - --direction=INGRESS \ - --enable-logging - -# Create a Cloud Storage bucket to store the consumer config files -gsutil mb "gs://${broker_bucket}" - -# Upload the install script and config files for the consumer -o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs -gsutil -m -o "$o" cp -r broker/consumer "gs://${broker_bucket}" - -# Create a Pub/Sub topic and subscription for Rubin alerts -gcloud pubsub topics create "${PUBSUB_TOPIC}" -gcloud pubsub subscriptions create "${PUBSUB_SUBSCRIPTION}" --topic="${PUBSUB_TOPIC}" - -# Create a Rubin Consumer VM +testid="mytest" +consumerVM="${survey}-consumer-${testid}" zone="us-central1-a" -machinetype="e2-standard-2" -installscript="gs://${broker_bucket}/consumer/vm_install.sh" -gcloud compute instances create "${consumerVM}" \ - --zone="${zone}" \ - --machine-type="${machinetype}" \ - --scopes=cloud-platform \ - --metadata=google-logging-enabled=true,startup-script-url="${installscript}" \ - --tags="${firewallrule}" -``` -## Ingest the Rubin test stream - -### Setup Consumer VM - -```bash -# start the consumer vm and ssh in -gcloud compute instances start "${consumerVM}" -gcloud compute ssh "${consumerVM}" - -# define some variables -brokerdir=/home/broker # user's home dir on this machine -workingdir="${brokerdir}/consumer/rubin" # consumer's working dir on this machine - -# We will also need the variables defined at the top of this document. -# Go back up to the "Setup" section and define the variables given -# in the code block under "Define variables...", in your environment. -``` - -### Test the connection - -#### Check available Kafka topics - -```bash -/bin/kafka-topics \ - --bootstrap-server alert-stream-int.lsst.cloud:9094 \ - --list \ - --command-config "${workingdir}/admin.properties" -# should see output that includes the topic: alerts-simulated -``` - -#### Test the topic connection using the Kafka Console Consumer - -Set Java env variable - -```bash -export JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64" -``` - -Make a file called 'consumer.properties' and fill it with this -(change `KAFKA_PASSWORD` to the appropriate value): - -```bash -security.protocol=SASL_SSL -sasl.mechanism=SCRAM-SHA-512 - -sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ - username="pittgoogle-idfint"\ - password="KAFKA_PASSWORD"; -``` - -Run the Kafka console consumer - -```bash -sudo /bin/kafka-avro-console-consumer \ - --bootstrap-server alert-stream-int.lsst.cloud:9094 \ - --group "${KAFKA_USERNAME}-example-javaconsole" \ - --topic "${KAFKA_TOPIC}" \ - --property schema.registry.url=https://alert-schemas-int.lsst.cloud \ - --consumer.config consumer.properties \ - --timeout-ms=60000 -# if successful, you will see a lot of JSON flood the terminal -``` - -### Run the Kafka -> Pub/Sub connector - -Setup: - -```bash -# download the config files from broker_bucket -sudo mkdir "${brokerdir}" -sudo gsutil -m cp -r "gs://${broker_bucket}/consumer" "${brokerdir}" - -# set the password in two of the config files -sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/admin.properties" -sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/psconnect-worker.properties" - -# replace topic and project configs in ps-connector.properties -fconfig="${workingdir}/ps-connector.properties" -sudo sed -i "s/PROJECT_ID/${PROJECT_ID}/g" ${fconfig} -sudo sed -i "s/PUBSUB_TOPIC/${PUBSUB_TOPIC}/g" ${fconfig} -sudo sed -i "s/KAFKA_TOPIC/${KAFKA_TOPIC}/g" ${fconfig} -``` - -Run the connector: - -```bash -mydir="/home/troyraen" # use my dir because don't have permission to write to workingdir -fout_run="${mydir}/run-connector.out" -sudo /bin/connect-standalone \ - ${workingdir}/psconnect-worker.properties \ - ${workingdir}/ps-connector.properties \ - &> ${fout_run} -``` - -## Pull a Pub/Sub message and open it - -In the future, we should download schemas from the Confluent Schema Registry and store them. -Then for each alert, check the schema version in the Confluent Wire header, and load the schema file using `fastavro`. -See [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below. - -For now, use the schema in the `lsst-alert-packet` library. Install the library: - -```bash -pip install lsst-alert-packet -``` - -Following the deserialization example at - - -```python -import io -import fastavro -from google.cloud import pubsub_v1 -from lsst.alert.packet import Schema - -# pull a message -project_id = "avid-heading-329016" -subscription_name = "rubin-alerts" -max_messages = 5 - -subscriber = pubsub_v1.SubscriberClient() -subscription_path = subscriber.subscription_path(project_id, subscription_name) -request = { - "subscription": subscription_path, - "max_messages": max_messages, -} - -response = subscriber.pull(**request) - -# load the schema -latest_schema = Schema.from_file().definition +# Set the VM metadata +KAFKA_TOPIC="alerts-simulated" +PS_TOPIC="${survey}-alerts-${testid}" +gcloud compute instances add-metadata "${consumerVM}" --zone "${zone}" \ + --metadata="PS_TOPIC_FORCE=${PS_TOPIC},KAFKA_TOPIC_FORCE=${KAFKA_TOPIC}" -# deserialize the alerts. -# This follows the deserialization example at -# https://github.com/lsst-dm/alert_stream/blob/main/python/lsst/alert/stream/serialization.py -for received_message in response.received_messages: - alert_bytes = received_message.message.data - # header_bytes = alert_bytes[:5] - # schema_version = deserialize_confluent_wire_header(header_bytes) - content_bytes = io.BytesIO(alert_bytes[5:]) - alert_dict = fastavro.schemaless_reader(content_bytes, latest_schema) - alertId = alert_dict['alertId'] - diaSourceId = alert_dict['diaSource']['diaSourceId'] - psFlux = alert_dict['diaSource']['psFlux'] - print(f"alertId: {alertId}, diaSourceId: {diaSourceId}, psFlux: {psFlux}") +# Start the VM +gcloud compute instances start ${consumerVM} --zone ${zone} +# this launches the startup script which configures and starts the +# Kafka -> Pub/Sub connector ``` -## Alternative methods for handling the schema - -### Download with a `GET` request, and read the alert's schema version from the Confluent Wire header - -In the future, we should download schemas from the Confluent Schema Registry and store them -(assuming we do not use the schema registry directly in the Kafka connector). -Then for each alert, check the schema version in the Confluent Wire header, and load the schema -file using `fastavro`. - -Recommendation from Spencer Nelson: - -> You might want to look at how Rubin's alert database ingester works. It does the same steps of -> deserializing alert packets, but uses the schema registry instead of lsst.alert.packet: -> -> -> - -Pub/Sub topics can be configured with an Avro schema attached, but it cannot be changed once attached. -We would have to create a new topic for every schema version. -Therefore, I don't think we should do it this way. - -#### Download a schema from the Confluent Schema Registry using a `GET` request +To stop stop the consumer VM: ```bash -SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=$KAFKA_USERNAME:$KAFKA_PASSWORD -SCHEMA_REGISTRY_URL="https://alert-schemas-int.lsst.cloud" -schema_version=1 -fout_rubinschema="rubinschema_v${schema_version}.avsc" - -# get list of schema subjects -curl --silent -X GET -u "${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" "${SCHEMA_REGISTRY_URL}/subjects" -# download a particular schema -curl --silent -X GET -u \ - "${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" \ - "${SCHEMA_REGISTRY_URL}/schemas/ids/${schema_version}" \ - > "${fout_rubinschema}" -``` - -#### Read the alert's schema version from the Confluent Wire header - -The following is copied from - - -```python -import struct - -_ConfluentWireFormatHeader = struct.Struct(">bi") - -def deserialize_confluent_wire_header(raw): - """Parses the byte prefix for Confluent Wire Format-style Kafka messages. - Parameters - ---------- - raw : `bytes` - The 5-byte encoded message prefix. - Returns - ------- - schema_version : `int` - A version number which indicates the Confluent Schema Registry ID - number of the Avro schema used to encode the message that follows this - header. - """ - _, version = _ConfluentWireFormatHeader.unpack(raw) - return version - -header_bytes = alert_bytes[:5] -schema_version = deserialize_confluent_wire_header(header_bytes) -``` - -### Use the Confluent Schema Registry with the Kafka Connector - -Kafka Connect can use the Confluent Schema Registry directly. -But schemas are stored under subjects and Kafka Connect is picky about how those -subjects are named. -See - -**Rubin has set the schema subject name to “alert-packet”**, which does not conform -to any of the name strategies that Kafka Connect uses. -I did not find a workaround for this issue. -Instead, I passed the alert bytes straight through into Pub/Sub and deserialized -them after pulling the messages from Pub/Sub. - -If you want to try this in the future, set the following configs in the connector's psconnect-worker.properties file. +survey="rubin" +testid="mytest" +consumerVM="${survey}-consumer-${testid}" +zone="us-central1-a" -```bash -value.converter=io.confluent.connect.avro.AvroConverter -value.converter.schema.registry.url=https://alert-schemas-int.lsst.cloud -value.converter.enhanced.avro.schema.support=true -``` +# Stop the VM +gcloud compute instances stop ${consumerVM} --zone ${zone} +``` \ No newline at end of file From c51edb86181949c4e1fe9f2a6f190bbef35c323e Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Thu, 23 May 2024 09:53:58 -0400 Subject: [PATCH 12/15] updated wording in README.md --- broker/setup_broker/rubin/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/setup_broker/rubin/README.md b/broker/setup_broker/rubin/README.md index 04decfe3..454798da 100644 --- a/broker/setup_broker/rubin/README.md +++ b/broker/setup_broker/rubin/README.md @@ -38,7 +38,7 @@ You may want to or [set up a GCP project from scratch](https://pitt-broker.readthedocs.io/en/latest/broker/run-a-broker-instance/initial-setup.html#setup-local-environment). -Create secrets for your access credential: +Create a secret for your access credential: ```bash # define parameters From b27d214b5053d5c8a5404c5681855e10cf4bf135 Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Thu, 23 May 2024 09:55:22 -0400 Subject: [PATCH 13/15] README.md now ends with a single newline character --- broker/consumer/rubin/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/consumer/rubin/README.md b/broker/consumer/rubin/README.md index eca00a51..13565e85 100644 --- a/broker/consumer/rubin/README.md +++ b/broker/consumer/rubin/README.md @@ -32,4 +32,4 @@ zone="us-central1-a" # Stop the VM gcloud compute instances stop ${consumerVM} --zone ${zone} -``` \ No newline at end of file +``` From 645710ca27af591f0de3d400572697292c4a7ad7 Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Thu, 23 May 2024 10:36:14 -0400 Subject: [PATCH 14/15] rename file to create_vm.sh --- broker/setup_broker/rubin/{create_vms.sh => create_vm.sh} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename broker/setup_broker/rubin/{create_vms.sh => create_vm.sh} (100%) diff --git a/broker/setup_broker/rubin/create_vms.sh b/broker/setup_broker/rubin/create_vm.sh similarity index 100% rename from broker/setup_broker/rubin/create_vms.sh rename to broker/setup_broker/rubin/create_vm.sh From a2fb70541b1a780308c12410cd16153bd0acdf74 Mon Sep 17 00:00:00 2001 From: Chris Hernandez Date: Thu, 23 May 2024 10:37:20 -0400 Subject: [PATCH 15/15] upload_broker_bucket now ends with a single newline character --- broker/setup_broker/rubin/upload_broker_bucket.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/setup_broker/rubin/upload_broker_bucket.sh b/broker/setup_broker/rubin/upload_broker_bucket.sh index 2f6c0471..ae85979e 100755 --- a/broker/setup_broker/rubin/upload_broker_bucket.sh +++ b/broker/setup_broker/rubin/upload_broker_bucket.sh @@ -5,4 +5,4 @@ broker_bucket=$1 # name of GCS bucket where broker files should be staged echo echo "Uploading broker files to GCS..." o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs -gsutil -m -o "${o}" cp -r ../../consumer "gs://${broker_bucket}" \ No newline at end of file +gsutil -m -o "${o}" cp -r ../../consumer "gs://${broker_bucket}"