From 44c53d3e9dac9ae4ebe4d1b481f08dffdb9f4859 Mon Sep 17 00:00:00 2001 From: Kevin Howell Date: Tue, 21 May 2024 09:29:03 -0400 Subject: [PATCH] Add kafka producer to send task status messages To try locally, you can use the oci-env kafka profile from pulp/oci_env#159. Set up the oci-env to use the kafka profile: ``` COMPOSE_PROFILE=kafka ``` From a fresh oci-env pulp instance, try: ```shell export REPO_NAME=$(head /dev/urandom | tr -dc a-z | head -c5) export REMOTE_NAME=$(head /dev/urandom | tr -dc a-z | head -c5) oci-env pulp file repository create --name $REPO_NAME oci-env pulp file remote create --name $REMOTE_NAME \ --url 'https://fixtures.pulpproject.org/file/PULP_MANIFEST' oci-env pulp file repository sync --name $REPO_NAME --remote $REMOTE_NAME ``` Then inspect the kafka message that is produced via: ```shell oci-env exec -s kafka \ /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server=localhost:9092 \ --offset earliest \ --partition 0 \ --topic pulpcore.tasking.status \ --max-messages 1 ``` Closes #5337 --- CHANGES/5337.feature | 1 + docs/static/task-status-v1.yaml | 55 +++++++++++++++++ pulpcore/app/serializers/task.py | 19 ++++++ pulpcore/app/settings.py | 11 ++++ pulpcore/tasking/kafka.py | 65 ++++++++++++++++++++ pulpcore/tasking/tasks.py | 40 ++++++++++++ requirements.txt | 2 + staging_docs/admin/guides/integrate-kafka.md | 60 ++++++++++++++++++ staging_docs/admin/reference/settings.md | 54 ++++++++++++++++ 9 files changed, 307 insertions(+) create mode 100644 CHANGES/5337.feature create mode 100644 docs/static/task-status-v1.yaml create mode 100644 pulpcore/tasking/kafka.py create mode 100644 staging_docs/admin/guides/integrate-kafka.md diff --git a/CHANGES/5337.feature b/CHANGES/5337.feature new file mode 100644 index 0000000000..51db22e42c --- /dev/null +++ b/CHANGES/5337.feature @@ -0,0 +1 @@ +Added kafka integration (tech-preview). diff --git a/docs/static/task-status-v1.yaml b/docs/static/task-status-v1.yaml new file mode 100644 index 0000000000..da21c48aa4 --- /dev/null +++ b/docs/static/task-status-v1.yaml @@ -0,0 +1,55 @@ +$schema: http://json-schema.org/draft-07/hyper-schema +$id: https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml +type: object +properties: + pulp_href: + description: URI for the task in the pulp API + type: string + examples: + - /pulp/api/v3/tasks/018f973c-ad7b-7f03-96d0-b38a42c18100/ + pulp_created: + description: Created timestamp for the task + type: string + format: date-time + examples: + - 2024-05-20T18:21:27.292394Z + pulp_last_updated: + description: Last updated timestamp for the task + type: string + format: date-time + examples: + - 2024-05-20T18:21:27.292405Z + name: + description: Name of the task + type: string + examples: + - pulp_file.app.tasks.synchronizing.synchronize + state: + description: State of the task + type: string + enum: + - waiting + - skipped + - running + - completed + - failed + - canceled + - canceling + unblocked_at: + description: The time the task became unblocked + type: string + format: date-time + examples: + - 2024-05-20T18:21:27.317792Z + started_at: + description: The time the task started executing + type: string + format: date-time + examples: + - 2024-05-20T18:21:27.349481Z + finished_at: + description: The time the task finished executing + type: string + format: date-time + examples: + - 2024-05-20T18:21:28.074560Z diff --git a/pulpcore/app/serializers/task.py b/pulpcore/app/serializers/task.py index 88661b2461..9a7bf847c0 100755 --- a/pulpcore/app/serializers/task.py +++ b/pulpcore/app/serializers/task.py @@ -277,3 +277,22 @@ class Meta: "next_dispatch", "last_task", ) + + +class TaskStatusMessageSerializer(TaskSerializer): + """ + Serializer for Task status messages. + + Independent of other serializers in order to decouple the task message schema from other + interfaces. + """ + + class Meta: + model = models.Task + fields = ModelSerializer.Meta.fields + ( + "name", + "state", + "unblocked_at", + "started_at", + "finished_at", + ) diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index 6ba98056e4..3d82530b64 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -329,6 +329,17 @@ # By default, use all available workers. IMPORT_WORKERS_PERCENT = 100 +# Kafka settings +KAFKA_BOOTSTRAP_SERVERS = None # kafka integration disabled by default +KAFKA_TASKS_STATUS_TOPIC = "pulpcore.tasking.status" +KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED = False +KAFKA_PRODUCER_POLL_TIMEOUT = 0.1 +KAFKA_SECURITY_PROTOCOL = "plaintext" +KAFKA_SSL_CA_PEM = None +KAFKA_SASL_MECHANISM = None +KAFKA_SASL_USERNAME = None +KAFKA_SASL_PASSWORD = None + # HERE STARTS DYNACONF EXTENSION LOAD (Keep at the very bottom of settings.py) # Read more at https://www.dynaconf.com/django/ from dynaconf import DjangoDynaconf, Validator # noqa diff --git a/pulpcore/tasking/kafka.py b/pulpcore/tasking/kafka.py new file mode 100644 index 0000000000..37cb9643ce --- /dev/null +++ b/pulpcore/tasking/kafka.py @@ -0,0 +1,65 @@ +import atexit +import logging +import socket +from threading import Thread +from typing import Optional + +from confluent_kafka import Producer +from django.conf import settings + +_logger = logging.getLogger(__name__) +_kafka_producer = None +_bootstrap_servers = settings.get("KAFKA_BOOTSTRAP_SERVERS") +_producer_poll_timeout = settings.get("KAFKA_PRODUCER_POLL_TIMEOUT") +_security_protocol = settings.get("KAFKA_SECURITY_PROTOCOL") +_ssl_ca_pem = settings.get("KAFKA_SSL_CA_PEM") +_sasl_mechanism = settings.get("KAFKA_SASL_MECHANISM") +_sasl_username = settings.get("KAFKA_SASL_USERNAME") +_sasl_password = settings.get("KAFKA_SASL_PASSWORD") + + +class KafkaProducerPollingWorker: + def __init__(self, kafka_producer): + self._kafka_producer = kafka_producer + self._running = False + self._thread = None + + def start(self): + self._running = True + self._thread = Thread(target=self._run) + self._thread.start() + + def _run(self): + while self._running: + self._kafka_producer.poll(_producer_poll_timeout) + self._kafka_producer.flush() + + def stop(self): + self._running = False + self._thread.join() + + +def get_kafka_producer() -> Optional[Producer]: + global _kafka_producer + if _bootstrap_servers is None: + return None + if _kafka_producer is None: + conf = { + "bootstrap.servers": _bootstrap_servers, + "security.protocol": _security_protocol, + "client.id": socket.gethostname(), + } + optional_conf = { + "ssl.ca.pem": _ssl_ca_pem, + "sasl.mechanisms": _sasl_mechanism, + "sasl.username": _sasl_username, + "sasl.password": _sasl_password, + } + for key, value in optional_conf.items(): + if value: + conf[key] = value + _kafka_producer = Producer(conf, logger=_logger) + polling_worker = KafkaProducerPollingWorker(_kafka_producer) + polling_worker.start() + atexit.register(polling_worker.stop) + return _kafka_producer diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index dfd3151d93..437feb13e4 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -8,11 +8,16 @@ from datetime import timedelta from gettext import gettext as _ +# NOTE: in spite of the name, cloudevents.http.CloudEvent is appropriate for other protocols +from cloudevents.http import CloudEvent +from cloudevents.kafka import to_structured +from django.conf import settings from django.db import connection, transaction from django.db.models import Model, Max from django_guid import get_guid from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS from pulpcore.app.models import Task +from pulpcore.app.serializers.task import TaskStatusMessageSerializer from pulpcore.app.util import current_task, get_domain, get_prn from pulpcore.constants import ( TASK_FINAL_STATES, @@ -20,9 +25,13 @@ TASK_STATES, TASK_DISPATCH_LOCK, ) +from pulpcore.tasking.kafka import get_kafka_producer _logger = logging.getLogger(__name__) +_kafka_tasks_status_topic = settings.get("KAFKA_TASKS_STATUS_TOPIC") +_kafka_tasks_status_producer_sync_enabled = settings.get("KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED") + def _validate_and_get_resources(resources): resource_set = set() @@ -74,9 +83,11 @@ def _execute_task(task): task.set_failed(exc, tb) _logger.info(_("Task %s failed (%s)"), task.pk, exc) _logger.info("\n".join(traceback.format_list(traceback.extract_tb(tb)))) + _send_task_notification(task) else: task.set_completed() _logger.info(_("Task completed %s"), task.pk) + _send_task_notification(task) def dispatch( @@ -250,3 +261,32 @@ def cancel_task(task_id): cursor.execute("SELECT pg_notify('pulp_worker_cancel', %s)", (str(task.pk),)) cursor.execute("NOTIFY pulp_worker_wakeup") return task + + +def _send_task_notification(task): + kafka_producer = get_kafka_producer() + if kafka_producer is not None: + attributes = { + "type": "pulpcore.tasking.status", + "source": "pulpcore.tasking", + "datacontenttype": "application/json", + "dataref": "https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml", + } + data = TaskStatusMessageSerializer(task, context={"request": None}).data + task_message = to_structured(CloudEvent(attributes, data)) + kafka_producer.produce( + topic=_kafka_tasks_status_topic, + value=task_message.value, + key=task_message.key, + headers=task_message.headers, + on_delivery=_report_message_delivery, + ) + if _kafka_tasks_status_producer_sync_enabled: + kafka_producer.flush() + + +def _report_message_delivery(error, message): + if error is not None: + _logger.error(error) + elif _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Message delivery successfully with contents {message.value}") diff --git a/requirements.txt b/requirements.txt index 61bbd75d5e..7e79e0182d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,8 @@ asyncio-throttle>=1.0,<=1.0.2 async-timeout>=4.0.3,<4.0.4;python_version<"3.11" backoff>=2.1.2,<2.2.2 click>=8.1.0,<=8.1.7 +cloudevents==1.10.1 # Pinned because project warns "things might (and will) break with every update" +confluent-kafka~=2.4.0 cryptography>=38.0.1,<42.0.9 Django~=4.2.0 # LTS version, switch only if we have a compelling reason to django-filter>=23.1,<=24.2 diff --git a/staging_docs/admin/guides/integrate-kafka.md b/staging_docs/admin/guides/integrate-kafka.md new file mode 100644 index 0000000000..004b1dd5f4 --- /dev/null +++ b/staging_docs/admin/guides/integrate-kafka.md @@ -0,0 +1,60 @@ +# Integrate Kafka + +Pulp can be configured to emit messages as tasks are created and executed. + +Kafka configuration depends on how the kafka broker is configured. Which settings are applicable depends on the broker +configuration. + +For a development preview of this functionality, the kafka profile from +[oci_env](https://github.com/pulp/oci_env/pull/159) can be used: + +``` +COMPOSE_PROFILE=kafka +``` + +After triggering task(s) any kafka consumer can be used to explore the resulting messages. +For convenience, the previously mentioned `oci_env` setup contains a CLI consumer that can be invoked as follows: + +```shell +oci-env exec -s kafka \ + /opt/kafka/bin/kafka-console-consumer.sh \ + --bootstrap-server=localhost:9092 \ + --offset earliest \ + --partition 0 \ + --topic pulpcore.tasking.status \ + --max-messages 1 +``` + +## Common Configuration + +`KAFKA_BOOTSTRAP_SERVERS` is a comma-separated list of hostname and port pairs. Setting this enables the kafka +integration. + +Example values: + +- `localhost:9092` +- `kafka1.example.com:9092,kafka2.example.com:9092` + +## Authentication: Username/Password + +In order to use username/password authentication, it's necessary to set an appropriate `KAFKA_SECURITY_PROTOCOL` value: + +- `sasl_ssl` when the connection uses TLS. +- `sasl_plaintext` when the connection does not use TLS. + +It's also necessary to set the appropriate value for `KAFKA_SASL_MECHANISM`; consult kafka broker configuration, typical +values include: + +- `SCRAM-SHA-256` +- `SCRAM-SHA-512` + +## TLS Settings + +If the TLS truststore needs to be customized, then `KAFKA_SSL_CA_PEM` can be used to provide CA certs in PEM format. + +!!! note + The pulp kafka integration does not currently expose settings necessary for mTLS (client certificates). + +## Other settings + +See [Kafka Settings](../reference/settings.md#kafka-settings) for details. \ No newline at end of file diff --git a/staging_docs/admin/reference/settings.md b/staging_docs/admin/reference/settings.md index 0d95f11dca..e8cfb8363d 100644 --- a/staging_docs/admin/reference/settings.md +++ b/staging_docs/admin/reference/settings.md @@ -403,3 +403,57 @@ If `True`, Pulp will anonymously post analytics information to `analytics docs ` for more info on exactly what is posted along with an example. Defaults to `True`. + +## Kafka Settings + +!!! note + Kafka integration functionality is in tech preview and may change based on user feedback. + +See [librdkafka configuration documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) +for details on client configuration properties. + +### KAFKA_BOOTSTRAP_SERVERS + +`bootstrap.servers` value for the client. Specifies endpoint(s) for the kafka client. Kafka integration is disabled if +unspecified. + +### KAFKA_SECURITY_PROTOCOL + +`security.protocol` value for the client. What protocol to use for communication with the broker. + +Defaults to `plaintext` (unencrypted). + +### KAFKA_SSL_CA_PEM + +`ssl.ca.pem` value for the client (optional). Used to override the TLS truststore for broker connections. + +### KAFKA_SASL_MECHANISM + +`sasl.mechanisms` value for the client (optional). Specifies the authentication method used by the kafka broker. + +### KAFKA_SASL_USERNAME + +`sasl.username` value for the client (optional). Username for broker authentication. + +### KAFKA_SASL_PASSWORD + +`sasl.password` value for the client (optional). Password for broker authentication. + +### KAFKA_TASKS_STATUS_TOPIC + +What kafka topic to emit notifications to when tasks start/stop. + +Defaults to `pulpcore.tasking.status`. + +### KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED + +Whether to synchronously send task status messages. When `True`, the task message is sent synchronously, otherwise the +sends happen asynchronously, with a background thread periodically sending messages to the kafka server. + +Defaults to `False`. + +### KAFKA_PRODUCER_POLL_TIMEOUT + +Timeout in seconds for the kafka producer polling thread's `poll` calls. + +Defaults to `0.1`.