Skip to content

Commit

Permalink
Add kafka producer to send task status messages
Browse files Browse the repository at this point in the history
To try locally, you can use the oci-env kafka profile from pulp/oci_env#159.

Set up the oci-env to use the kafka profile:

```
COMPOSE_PROFILE=kafka
```

From a fresh oci-env pulp instance, try:

```shell
export REPO_NAME=$(head /dev/urandom | tr -dc a-z | head -c5)
export REMOTE_NAME=$(head /dev/urandom | tr -dc a-z | head -c5)
oci-env pulp file repository create --name $REPO_NAME
oci-env pulp file remote create --name $REMOTE_NAME \
    --url 'https://fixtures.pulpproject.org/file/PULP_MANIFEST'
oci-env pulp file repository sync --name $REPO_NAME --remote $REMOTE_NAME
```

Then inspect the kafka message that is produced via:

```shell
oci-env exec -s kafka \
  /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server=localhost:9092 \
  --offset earliest \
  --partition 0 \
  --topic pulpcore.tasking.status \
  --max-messages 1
```

Closes #5337
  • Loading branch information
kahowell committed Jul 18, 2024
1 parent 6fe9bf3 commit c8bc087
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 0 deletions.
55 changes: 55 additions & 0 deletions docs/static/task-status-v1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
$schema: http://json-schema.org/draft-07/hyper-schema
$id: https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml
type: object
properties:
pulp_href:
description: URI for the task in the pulp API
type: string
examples:
- /pulp/api/v3/tasks/018f973c-ad7b-7f03-96d0-b38a42c18100/
pulp_created:
description: Created timestamp for the task
type: string
format: date-time
examples:
- 2024-05-20T18:21:27.292394Z
pulp_last_updated:
description: Last updated timestamp for the task
type: string
format: date-time
examples:
- 2024-05-20T18:21:27.292405Z
name:
description: Name of the task
type: string
examples:
- pulp_file.app.tasks.synchronizing.synchronize
state:
description: State of the task
type: string
enum:
- waiting
- skipped
- running
- completed
- failed
- canceled
- canceling
unblocked_at:
description: The time the task became unblocked
type: string
format: date-time
examples:
- 2024-05-20T18:21:27.317792Z
started_at:
description: The time the task started executing
type: string
format: date-time
examples:
- 2024-05-20T18:21:27.349481Z
finished_at:
description: The time the task finished executing
type: string
format: date-time
examples:
- 2024-05-20T18:21:28.074560Z
18 changes: 18 additions & 0 deletions pulpcore/app/serializers/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,21 @@ class Meta:
"next_dispatch",
"last_task",
)


class TaskStatusMessageSerializer(TaskSerializer):
"""
Serializer for Task status messages.
Independent of other serializers in order to decouple the task message schema from other interfaces.
"""

class Meta:
model = models.Task
fields = ModelSerializer.Meta.fields + (
"name",
"state",
"unblocked_at",
"started_at",
"finished_at",
)
11 changes: 11 additions & 0 deletions pulpcore/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@
# By default, use all available workers.
IMPORT_WORKERS_PERCENT = 100

# Kafka settings
KAFKA_BOOTSTRAP_SERVERS = None # kafka integration disabled by default
KAFKA_TASKS_STATUS_TOPIC = "pulpcore.tasking.status"
KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED = False
KAFKA_PRODUCER_POLL_TIMEOUT = 0.1
KAFKA_SECURITY_PROTOCOL = "plaintext"
KAFKA_SSL_CA_PEM = None
KAFKA_SASL_MECHANISM = None
KAFKA_SASL_USERNAME = None
KAFKA_SASL_PASSWORD = None

# HERE STARTS DYNACONF EXTENSION LOAD (Keep at the very bottom of settings.py)
# Read more at https://www.dynaconf.com/django/
from dynaconf import DjangoDynaconf, Validator # noqa
Expand Down
65 changes: 65 additions & 0 deletions pulpcore/tasking/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import atexit
import logging
import socket
from threading import Thread
from typing import Optional

from confluent_kafka import Producer
from django.conf import settings

_logger = logging.getLogger(__name__)
_kafka_producer = None
_bootstrap_servers = settings.get("KAFKA_BOOTSTRAP_SERVERS")
_producer_poll_timeout = settings.get("KAFKA_PRODUCER_POLL_TIMEOUT")
_security_protocol = settings.get("KAFKA_SECURITY_PROTOCOL")
_ssl_ca_pem = settings.get("KAFKA_SSL_CA_PEM")
_sasl_mechanism = settings.get("KAFKA_SASL_MECHANISM")
_sasl_username = settings.get("KAFKA_SASL_USERNAME")
_sasl_password = settings.get("KAFKA_SASL_PASSWORD")


class KafkaProducerPollingWorker:
def __init__(self, kafka_producer):
self._kafka_producer = kafka_producer
self._running = False
self._thread = None

def start(self):
self._running = True
self._thread = Thread(target=self._run)
self._thread.start()

def _run(self):
while self._running:
self._kafka_producer.poll(_producer_poll_timeout)
self._kafka_producer.flush()

def stop(self):
self._running = False
self._thread.join()


def get_kafka_producer() -> Optional[Producer]:
global _kafka_producer
if _bootstrap_servers is None:
return None
if _kafka_producer is None:
conf = {
"bootstrap.servers": _bootstrap_servers,
"security.protocol": _security_protocol,
"client.id": socket.gethostname(),
}
optional_conf = {
"ssl.ca.pem": _ssl_ca_pem,
"sasl.mechanisms": _sasl_mechanism,
"sasl.username": _sasl_username,
"sasl.password": _sasl_password,
}
for key, value in optional_conf.items():
if value:
conf[key] = value
_kafka_producer = Producer(conf, logger=_logger)
polling_worker = KafkaProducerPollingWorker(_kafka_producer)
polling_worker.start()
atexit.register(polling_worker.stop)
return _kafka_producer
41 changes: 41 additions & 0 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,32 @@
import traceback
from datetime import timedelta
from gettext import gettext as _
from typing import Optional

# NOTE: in spite of the name, cloudevents.http.CloudEvent is appropriate for other protocols
from cloudevents.http import CloudEvent
from cloudevents.kafka import to_structured
from django.conf import settings
from django.db import connection, transaction
from django.db.models import Model, Max
from django_guid import get_guid
from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS
from pulpcore.app.models import Task
from pulpcore.app.serializers.task import TaskStatusMessageSerializer
from pulpcore.app.util import current_task, get_domain, get_prn
from pulpcore.constants import (
TASK_FINAL_STATES,
TASK_INCOMPLETE_STATES,
TASK_STATES,
TASK_DISPATCH_LOCK,
)
from pulpcore.tasking.kafka import get_kafka_producer

_logger = logging.getLogger(__name__)

_kafka_tasks_status_topic = settings.get("KAFKA_TASKS_STATUS_TOPIC")
_kafka_tasks_status_producer_sync_enabled = settings.get("KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED")


def _validate_and_get_resources(resources):
resource_set = set()
Expand Down Expand Up @@ -74,9 +84,11 @@ def _execute_task(task):
task.set_failed(exc, tb)
_logger.info(_("Task %s failed (%s)"), task.pk, exc)
_logger.info("\n".join(traceback.format_list(traceback.extract_tb(tb))))
_send_task_notification(task)
else:
task.set_completed()
_logger.info(_("Task completed %s"), task.pk)
_send_task_notification(task)


def dispatch(
Expand Down Expand Up @@ -250,3 +262,32 @@ def cancel_task(task_id):
cursor.execute("SELECT pg_notify('pulp_worker_cancel', %s)", (str(task.pk),))
cursor.execute("NOTIFY pulp_worker_wakeup")
return task


def _send_task_notification(task):
kafka_producer = get_kafka_producer()
if kafka_producer is not None:
attributes = {
"type": "pulpcore.tasking.status",
"source": "pulpcore.tasking",
"datacontenttype": "application/json",
"dataref": "https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml",
}
data = TaskStatusMessageSerializer(task, context={"request": None}).data
task_message = to_structured(CloudEvent(attributes, data))
kafka_producer.produce(
topic=_kafka_tasks_status_topic,
value=task_message.value,
key=task_message.key,
headers=task_message.headers,
on_delivery=_report_message_delivery,
)
if _kafka_tasks_status_producer_sync_enabled:
kafka_producer.flush()


def _report_message_delivery(error, message):
if error is not None:
_logger.error(error)
elif _logger.isEnabledFor(logging.DEBUG):
_logger.debug(f"Message delivery successfully with contents {message.value}")
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ asyncio-throttle>=1.0,<=1.0.2
async-timeout>=4.0.3,<4.0.4;python_version<"3.11"
backoff>=2.1.2,<2.2.2
click>=8.1.0,<=8.1.7
cloudevents==1.10.1 # Pinned because project warns "things might (and will) break with every update"
confluent-kafka~=2.4.0
cryptography>=38.0.1,<42.0.9
Django~=4.2.0 # LTS version, switch only if we have a compelling reason to
django-filter>=23.1,<=24.2
Expand Down
60 changes: 60 additions & 0 deletions staging_docs/admin/guides/integrate-kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Integrate Kafka

Pulp can be configured to emit messages as tasks are created and executed.

Kafka configuration depends on how the kafka broker is configured. Which settings are applicable depends on the broker
configuration.

For a development preview of this functionality, the kafka profile from
[oci_env](https://github.com/pulp/oci_env/pull/159) can be used:

```
COMPOSE_PROFILE=kafka
```

After triggering task(s) any kafka consumer can be used to explore the resulting messages.
For convenience, the previously mentioned `oci_env` setup contains a CLI consumer that can be invoked as follows:

```shell
oci-env exec -s kafka \
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server=localhost:9092 \
--offset earliest \
--partition 0 \
--topic pulpcore.tasking.status \
--max-messages 1
```

## Common Configuration

`KAFKA_BOOTSTRAP_SERVERS` is a comma-separated list of hostname and port pairs. Setting this enables the kafka
integration.

Example values:

- `localhost:9092`
- `kafka1.example.com:9092,kafka2.example.com:9092`

## Authentication: Username/Password

In order to use username/password authentication, it's necessary to set an appropriate `KAFKA_SECURITY_PROTOCOL` value:

- `sasl_ssl` when the connection uses TLS.
- `sasl_plaintext` when the connection does not use TLS.

It's also necessary to set the appropriate value for `KAFKA_SASL_MECHANISM`; consult kafka broker configuration, typical
values include:

- `SCRAM-SHA-256`
- `SCRAM-SHA-512`

## TLS Settings

If the TLS truststore needs to be customized, then `KAFKA_SSL_CA_PEM` can be used to provide CA certs in PEM format.

!!! note
The pulp kafka integration does not currently expose settings necessary for mTLS (client certificates).

## Other settings

See [Kafka Settings](../reference/settings.md#kafka-settings) for details.
54 changes: 54 additions & 0 deletions staging_docs/admin/reference/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,57 @@ If `True`, Pulp will anonymously post analytics information to
`analytics docs ` for more info on exactly what is posted along with an example.

Defaults to `True`.

## Kafka Settings

!!! note
Kafka integration functionality is in tech preview and may change based on user feedback.

See [librdkafka configuration documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
for details on client configuration properties.

### KAFKA_BOOTSTRAP_SERVERS

`bootstrap.servers` value for the client. Specifies endpoint(s) for the kafka client. Kafka integration is disabled if
unspecified.

### KAFKA_SECURITY_PROTOCOL

`security.protocol` value for the client. What protocol to use for communication with the broker.

Defaults to `plaintext` (unencrypted).

### KAFKA_SSL_CA_PEM

`ssl.ca.pem` value for the client (optional). Used to override the TLS truststore for broker connections.

### KAFKA_SASL_MECHANISM

`sasl.mechanisms` value for the client (optional). Specifies the authentication method used by the kafka broker.

### KAFKA_SASL_USERNAME

`sasl.username` value for the client (optional). Username for broker authentication.

### KAFKA_SASL_PASSWORD

`sasl.password` value for the client (optional). Password for broker authentication.

### KAFKA_TASKS_STATUS_TOPIC

What kafka topic to emit notifications to when tasks start/stop.

Defaults to `pulpcore.tasking.status`.

### KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED

Whether to synchronously send task status messages. When `True`, the task message is sent synchronously, otherwise the
sends happen asynchronously, with a background thread periodically sending messages to the kafka server.

Defaults to `False`.

### KAFKA_PRODUCER_POLL_TIMEOUT

Timeout in seconds for the kafka producer polling thread's `poll` calls.

Defaults to `0.1`.

0 comments on commit c8bc087

Please sign in to comment.