Skip to content

Commit

Permalink
make qos, retained and cleanSession configurable
Browse files Browse the repository at this point in the history
refactor and make qos, retained and cleanSession configurable

Change-Id: I6760d9222199d7fcf184aa5ff128afe8e827d251
  • Loading branch information
sbaeurle committed Mar 24, 2023
1 parent d2f13ea commit 76dc72c
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 135 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

A Keycloak SPI that publishes events to a MQTT broker.

This SPI has been deployed successfully on a containerized Keycloak 15.0.2
This SPI has been deployed successfully on a containerized Keycloak 19.0.2
and on a Keycloak 19.0 server on a kubernetes cluster. It should therefore
work properly on any version of Keycloak above 15.0.2.
work properly on any version of Keycloak above 19.0.2.

# Build

Expand Down Expand Up @@ -36,6 +36,9 @@ And add below:
<property name="password" value="mqtt_password"/>
<property name="topic" value="my_topic"/>
<property name="usePersistence" value="true">
<property name="retained" value="true">
<property name="cleanSession" value="true">
<property name="qos" value="0">
</properties>
</provider>
</spi>
Expand All @@ -58,6 +61,9 @@ kc.sh start
--spi-events-listener-mqtt-password mqtt_password \
--spi-events-listener-mqtt-topic my_topic
--spi-events-listener-mqtt-use-persistence true
--spi-events-listener-mqtt-retained true
--spi-events-listener-mqtt-clean-session true
--spi-events-listener-mqtt-qos 0
```

# Trying it out
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.softwarefactory.keycloak.providers.events.models;

import java.util.Set;

import org.keycloak.events.EventType;
import org.keycloak.events.admin.OperationType;

public class Configuration {
public Set<EventType> excludedEvents;
public Set<OperationType> excludedAdminOperations;
public String serverUri;
public String username;
public String password;
public String topic;
public boolean usePersistence;
public boolean retained;
public boolean cleanSession;
public int qos;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,136 +17,86 @@

package org.softwarefactory.keycloak.providers.events.mqtt;

import org.keycloak.events.Event;
import org.keycloak.events.EventListenerProvider;
import org.keycloak.events.EventType;
import org.keycloak.events.admin.AdminEvent;
import org.keycloak.events.admin.OperationType;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import org.json.simple.JSONObject;

import java.util.Map;
import java.util.Set;
import java.lang.Exception;
import org.keycloak.events.Event;
import org.keycloak.events.EventListenerProvider;
import org.keycloak.events.admin.AdminEvent;
import org.softwarefactory.keycloak.providers.events.models.Configuration;

/**
* @author <a href="mailto:[email protected]">Matthieu Huin</a>
*/
public class MQTTEventListenerProvider implements EventListenerProvider {
private static final Logger logger = Logger.getLogger(MQTTEventListenerProvider.class.getName());

private Configuration configuration;
public static final String PUBLISHER_ID = "keycloak";

private Set<EventType> excludedEvents;
private Set<OperationType> excludedAdminOperations;
private String serverUri;
private String username;
private String password;
public static final String publisherId = "keycloak";
public String TOPIC;
public boolean usePersistence;

public MQTTEventListenerProvider(Set<EventType> excludedEvents, Set<OperationType> excludedAdminOperations, String serverUri, String username, String password, String topic, boolean usePersistence) {
this.excludedEvents = excludedEvents;
this.excludedAdminOperations = excludedAdminOperations;
this.serverUri = serverUri;
this.username = username;
this.password = password;
this.TOPIC = topic;
this.usePersistence = usePersistence;
public MQTTEventListenerProvider(Configuration configuration) {
this.configuration = configuration;
}

@Override
public void onEvent(Event event) {
// Ignore excluded events
if (excludedEvents != null && excludedEvents.contains(event.getType())) {
return;
} else {
String stringEvent = toString(event);
try {
MemoryPersistence persistence = null;
if (this.usePersistence == true) {
persistence = new MemoryPersistence();
}
MqttClient client = new MqttClient(this.serverUri ,publisherId, persistence);
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
if (this.username != null && this.password != null) {
options.setUserName(this.username);
options.setPassword(this.password.toCharArray());
}
client.connect(options);
System.out.println("EVENT: " + stringEvent);
MqttMessage payload = toPayload(stringEvent);
payload.setQos(0);
payload.setRetained(true);
client.publish(this.TOPIC, payload);
client.disconnect();
} catch(Exception e) {
// ?
System.out.println("Caught the following error: " + e.toString());
e.printStackTrace();
return;
}
if (configuration.excludedEvents == null || !configuration.excludedEvents.contains(event.getType())) {
sendMqttMessage(convertEvent(event));
}
}

@Override
public void onEvent(AdminEvent event, boolean includeRepresentation) {
// Ignore excluded operations
if (excludedAdminOperations != null && excludedAdminOperations.contains(event.getOperationType())) {
return;
} else {
String stringEvent = toString(event);
try {
MemoryPersistence persistence = null;
if (this.usePersistence == true) {
persistence = new MemoryPersistence();
}
MqttClient client = new MqttClient(this.serverUri ,publisherId, persistence);
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
if (this.username != null && this.password != null) {
options.setUserName(this.username);
options.setPassword(this.password.toCharArray());
}
client.connect(options);
// System.out.println("EVENT: " + stringEvent);
MqttMessage payload = toPayload(stringEvent);
payload.setQos(0);
payload.setRetained(true);
client.publish(this.TOPIC, payload);
client.disconnect();
} catch(Exception e) {
// ?
System.out.println("Caught the following error: " + e.toString());
e.printStackTrace();
return;
}
if (configuration.excludedAdminOperations == null
|| !configuration.excludedAdminOperations.contains(event.getOperationType())) {
sendMqttMessage(convertAdminEvent(event));
}
}

private void sendMqttMessage(String event) {
MemoryPersistence persistence = null;
if (configuration.usePersistence) {
persistence = new MemoryPersistence();
}

try (IMqttClient client = new MqttClient(configuration.serverUri, PUBLISHER_ID, persistence)) {
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(configuration.cleanSession);
options.setConnectionTimeout(10);

if (configuration.username != null && configuration.password != null) {
options.setUserName(configuration.username);
options.setPassword(configuration.password.toCharArray());
}

client.connect(options);
logger.log(Level.FINE, "Event: {0}", event);
MqttMessage payload = toPayload(event);
payload.setQos(configuration.qos);
payload.setRetained(configuration.retained);
client.publish(configuration.topic, payload);
client.disconnect();
} catch (Exception e) {
logger.log(Level.SEVERE, "Event: {0}", e.getStackTrace());
}
}

private MqttMessage toPayload(String s) {
byte[] payload = s.getBytes();
return new MqttMessage(payload);
}

private String toString(Event event) {
JSONObject obj = toJSON(event);
return obj.toString();

}

private JSONObject toJSON(Event event) {
private String convertEvent(Event event) {
JSONObject ev = new JSONObject();

ev.put("type", event.getType().toString());
Expand All @@ -157,7 +107,7 @@ private JSONObject toJSON(Event event) {
ev.put("time", event.getTime());

ev.put("error", event.getError());

JSONObject evDetails = new JSONObject();
if (event.getDetails() != null) {
for (Map.Entry<String, String> e : event.getDetails().entrySet()) {
Expand All @@ -166,19 +116,13 @@ private JSONObject toJSON(Event event) {
}
ev.put("details", evDetails);

return ev;
}

private String toString(AdminEvent adminEvent) {
JSONObject obj = toJSON(adminEvent);
return obj.toString();

return ev.toString();
}

private JSONObject toJSON(AdminEvent adminEvent) {
private String convertAdminEvent(AdminEvent adminEvent) {
JSONObject ev = new JSONObject();

ev.put("type",adminEvent.getOperationType().toString());
ev.put("type", adminEvent.getOperationType().toString());
ev.put("realmId", adminEvent.getAuthDetails().getRealmId());
ev.put("clientId", adminEvent.getAuthDetails().getClientId());
ev.put("userId", adminEvent.getAuthDetails().getUserId());
Expand All @@ -189,11 +133,11 @@ private JSONObject toJSON(AdminEvent adminEvent) {

ev.put("error", adminEvent.getError());

return ev;
return ev.toString();
}

@Override
public void close() {
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,72 +17,69 @@

package org.softwarefactory.keycloak.providers.events.mqtt;

import java.util.HashSet;
import org.keycloak.Config;
import org.keycloak.events.EventListenerProvider;
import org.keycloak.events.EventListenerProviderFactory;
import org.keycloak.events.EventType;
import org.keycloak.events.admin.OperationType;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;

import java.util.HashSet;
import java.util.Set;
import java.lang.Exception;
import org.softwarefactory.keycloak.providers.events.models.Configuration;

/**
* @author <a href="mailto:[email protected]">Matthieu Huin</a>
*/
public class MQTTEventListenerProviderFactory implements EventListenerProviderFactory {

private Set<EventType> excludedEvents;
private Set<OperationType> excludedAdminOperations;
private String serverUri;
private String username;
private String password;
private String topic;
private boolean usePersistence;
private Configuration configuration;

@Override
public EventListenerProvider create(KeycloakSession session) {
return new MQTTEventListenerProvider(excludedEvents, excludedAdminOperations, serverUri, username, password, topic, usePersistence);
return new MQTTEventListenerProvider(configuration);
}

@Override
public void init(Config.Scope config) {
configuration = new Configuration();
String[] excludes = config.getArray("exclude-events");
if (excludes != null) {
excludedEvents = new HashSet<>();
configuration.excludedEvents = new HashSet<>();
for (String e : excludes) {
excludedEvents.add(EventType.valueOf(e));
configuration.excludedEvents.add(EventType.valueOf(e));
}
}

String[] excludesOperations = config.getArray("excludesOperations");
if (excludesOperations != null) {
excludedAdminOperations = new HashSet<>();
configuration.excludedAdminOperations = new HashSet<>();
for (String e : excludesOperations) {
excludedAdminOperations.add(OperationType.valueOf(e));
configuration.excludedAdminOperations.add(OperationType.valueOf(e));
}
}

serverUri = config.get("serverUri", "tcp://localhost:1883");
username = config.get("username", null);
password = config.get("password", null);
topic = config.get("topic", "keycloak/events");
usePersistence = config.getBoolean("usePersistence", false);
configuration.serverUri = config.get("serverUri", "tcp://localhost:1883");
configuration.username = config.get("username", null);
configuration.password = config.get("password", null);
configuration.topic = config.get("topic", "keycloak/events");
configuration.usePersistence = config.getBoolean("usePersistence", false);
configuration.retained = config.getBoolean("retained", true);
configuration.cleanSession = config.getBoolean("cleanSession", true);
configuration.qos = config.getInt("qos", 0);
}

@Override
public void postInit(KeycloakSessionFactory factory) {

// not needed
}

@Override
public void close() {
// not needed
}

@Override
public String getId() {
return "mqtt";
}

}
}

0 comments on commit 76dc72c

Please sign in to comment.