Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Parse MISP events to update attributes #14

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ plugins:
#kafka:
# topics:
# - misp_attribute
# - misp_event
# poll_interval: 1.0
# # All config entries are passed as-is to librdkafka
# # https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Expand Down
3 changes: 3 additions & 0 deletions plugins/apps/threatbus-misp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ make deploy
- `Plugin.Kafka_brokers` -> `172.17.0.1:9092` <- In this example, 172.17.0.1 is the Docker host, reachable from other Docker networks. The port is reachable when the Kafka Docker setup binds to it globally.
- `Plugin.Kafka_attribute_notifications_enable` -> `true`
- `Plugin.Kafka_attribute_notifications_topic` -> `misp_attribute` <- The topic goes into the threatbus `config.yaml`
- `Plugin.Kafka_event_notifications_enable` -> `true`
- `Plugin.Kafka_event_notifications_topic` -> `misp_event` <- The topic goes into the threatbus `config.yaml`

*Install Kafka inside the `misp-server` container*

Expand Down Expand Up @@ -173,6 +175,7 @@ exit # leave the Docker container shell
- Find the ZeroMQ plugin section and enable it
- Go to `Administration` -> `Server Settings & Maintenance` -> `Plugin settings Tab`
- Set the entry `Plugin.ZeroMQ_attribute_notifications_enable` to `true`
- Set the entry `Plugin.ZeroMQ_event_notifications_enable` to `true`

*Restart all MISP services*

Expand Down
56 changes: 30 additions & 26 deletions plugins/apps/threatbus-misp/threatbus_misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,33 @@ def publish_sightings(outq):
lock.release()


def forward_raw(msg_str, inq):
"""Gracefully tries to parse and forward a message string. If the message
cannot be parsed, an error is logged and None is returned.
@param msg_str The message (JSON string) to forward
@param inq The queue to forward successfully parsed message to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@param inq The queue to forward successfully parsed message to
@param inq The queue to forward the successfully parsed message to.

"""
global logger
try:
msg = json.loads(msg_str)
except Exception as e:
logger.error(f"Error decoding message {msg_str}: {e}")
return
all_intel = []
if msg.get("Attribute", None):
intel = map_to_internal(msg["Attribute"], msg.get("action", None), logger)
if intel:
all_intel.append(intel)
elif msg.get("Event", None) and msg.get("action", None) == "delete":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that you are preparing for MISP/MISP#4450 here. While you're at it, how about logging actions other than delete.

When we add an event, I think we don't have to do anything because we get all the contained attributes as well. Is that correct?

Then the only other action left is updating an event. I haven't check this, but I would assume that event updates don't matter for Threat Bus right now, because at this point we only care about attribute updates.

# TODO: find all deleted attributes, add to all_intel
# See https://github.com/MISP/MISP/issues/4450
pass
if not all_intel:
return
for intel in all_intel:
inq.put(intel)


def receive_kafka(kafka_config, inq):
"""Binds a Kafka consumer to the the given host/port. Forwards all received messages to the inq.
@param kafka_config A configuration object for Kafka binding
Expand All @@ -76,19 +103,7 @@ def receive_kafka(kafka_config, inq):
if message.error():
logger.error(f"Kafka error: {message.error()}")
continue
try:
msg = json.loads(message.value())
except Exception as e:
logger.error(f"Error decoding Kafka message: {e}")
continue
if not msg.get("Attribute", None):
logger.debug("Skipping message without MISP Attribute")
continue
intel = map_to_internal(msg["Attribute"], msg.get("action", None), logger)
if not intel:
logger.debug(f"Discarding unparsable intel {msg['Attribute']}")
else:
inq.put(intel)
forward_raw(message.value(), inq)


def receive_zmq(zmq_config, inq):
Expand All @@ -99,8 +114,8 @@ def receive_zmq(zmq_config, inq):

socket = zmq.Context().socket(zmq.SUB)
socket.connect(f"tcp://{zmq_config['host']}:{zmq_config['port']}")
# TODO: allow reception of more topics, i.e. handle events.
socket.setsockopt(zmq.SUBSCRIBE, b"misp_json_attribute")
socket.setsockopt(zmq.SUBSCRIBE, b"misp_json_event")
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

Expand All @@ -109,18 +124,7 @@ def receive_zmq(zmq_config, inq):
if socket in socks and socks[socket] == zmq.POLLIN:
raw = socket.recv()
_, message = raw.decode("utf-8").split(" ", 1)
try:
msg = json.loads(message)
except Exception as e:
logger.error(f"Erro decoding message {message}: {e}")
continue
if not msg.get("Attribute", None):
continue
intel = map_to_internal(msg["Attribute"], msg.get("action", None), logger)
if not intel:
logger.debug(f"Discarding unparsable intel {msg['Attribute']}")
else:
inq.put(intel)
forward_raw(message, inq)


@threatbus.app
Expand Down