Skip to content

Commit

Permalink
refactor and extract client to long lived factory
Browse files Browse the repository at this point in the history
Change-Id: Iaa489f68fa359e353a7e6bfe8289e5c290232bed
  • Loading branch information
sbaeurle committed Mar 29, 2023
1 parent ba11324 commit 777500c
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 69 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.softwarefactory.keycloak.providers.events.models;

public class MQTTMessageOptions {
public boolean retained;
public boolean cleanSession;
public int qos;
public String topic;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,74 +18,63 @@
package org.softwarefactory.keycloak.providers.events.mqtt;

import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.simple.JSONObject;
import org.keycloak.events.Event;
import org.keycloak.events.EventListenerProvider;
import org.keycloak.events.EventType;
import org.keycloak.events.admin.AdminEvent;
import org.softwarefactory.keycloak.providers.events.models.Configuration;
import org.keycloak.events.admin.OperationType;
import org.softwarefactory.keycloak.providers.events.models.MQTTMessageOptions;

/**
* @author <a href="mailto:[email protected]">Matthieu Huin</a>
*/
public class MQTTEventListenerProvider implements EventListenerProvider {
private static final Logger logger = Logger.getLogger(MQTTEventListenerProvider.class.getName());

private Configuration configuration;
public static final String PUBLISHER_ID = "keycloak";
private IMqttClient client;

private Set<EventType> excludedEvents;
private Set<OperationType> excludedAdminEvents;
private MQTTMessageOptions messageOptions;

public MQTTEventListenerProvider(Configuration configuration) {
this.configuration = configuration;

public MQTTEventListenerProvider(Set<EventType> excludedEvents, Set<OperationType> excludedAdminEvents, MQTTMessageOptions messageOptions, IMqttClient client) {
this.excludedEvents = excludedEvents;
this.excludedAdminEvents = excludedAdminEvents;
this.client = client;
this.messageOptions = messageOptions;
}

@Override
public void onEvent(Event event) {
// Ignore excluded events
if (configuration.excludedEvents == null || !configuration.excludedEvents.contains(event.getType())) {
if (excludedEvents == null || !excludedEvents.contains(event.getType())) {
sendMqttMessage(convertEvent(event));
}
}

@Override
public void onEvent(AdminEvent event, boolean includeRepresentation) {
// Ignore excluded operations
if (configuration.excludedAdminOperations == null
|| !configuration.excludedAdminOperations.contains(event.getOperationType())) {
if (excludedAdminEvents == null || !excludedAdminEvents.contains(event.getOperationType())) {
sendMqttMessage(convertAdminEvent(event));
}
}

private void sendMqttMessage(String event) {
MemoryPersistence persistence = null;
if (configuration.usePersistence) {
persistence = new MemoryPersistence();
}

try (IMqttClient client = new MqttClient(configuration.serverUri, PUBLISHER_ID, persistence)) {
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(configuration.cleanSession);
options.setConnectionTimeout(10);

if (configuration.username != null && configuration.password != null) {
options.setUserName(configuration.username);
options.setPassword(configuration.password.toCharArray());
}

client.connect(options);
try {
logger.log(Level.FINE, "Event: {0}", event);
MqttMessage payload = toPayload(event);
payload.setQos(configuration.qos);
payload.setRetained(configuration.retained);
client.publish(configuration.topic, payload);
client.disconnect();
payload.setQos(messageOptions.qos);
payload.setRetained(messageOptions.retained);
client.publish(messageOptions.topic, payload);
} catch (Exception e) {
logger.log(Level.SEVERE, "Publishing failed!", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,91 @@
package org.softwarefactory.keycloak.providers.events.mqtt;

import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.keycloak.Config;
import org.keycloak.events.EventListenerProvider;
import org.keycloak.events.EventListenerProviderFactory;
import org.keycloak.events.EventType;
import org.keycloak.events.admin.OperationType;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.softwarefactory.keycloak.providers.events.models.Configuration;
import org.softwarefactory.keycloak.providers.events.models.MQTTMessageOptions;

/**
* @author <a href="mailto:[email protected]">Matthieu Huin</a>
*/
public class MQTTEventListenerProviderFactory implements EventListenerProviderFactory {
private static final Logger logger = Logger.getLogger(MQTTEventListenerProviderFactory.class.getName());
private static final String PUBLISHER_ID = "keycloak";

private Configuration configuration;
private IMqttClient client;
private Set<EventType> excludedEvents;
private Set<OperationType> excludedAdminOperations;
private MQTTMessageOptions messageOptions;

@Override
public EventListenerProvider create(KeycloakSession session) {
return new MQTTEventListenerProvider(configuration);
return new MQTTEventListenerProvider(excludedEvents, excludedAdminOperations, messageOptions, client);
}

@Override
public void init(Config.Scope config) {
configuration = new Configuration();
String[] excludes = config.getArray("exclude-events");
var excludes = config.getArray("excludeEvents");
if (excludes != null) {
configuration.excludedEvents = new HashSet<>();
excludedEvents = new HashSet<EventType>();
for (String e : excludes) {
configuration.excludedEvents.add(EventType.valueOf(e));
excludedEvents.add(EventType.valueOf(e));
}
}

String[] excludesOperations = config.getArray("excludesOperations");
if (excludesOperations != null) {
configuration.excludedAdminOperations = new HashSet<>();
excludedAdminOperations = new HashSet<OperationType>();
for (String e : excludesOperations) {
configuration.excludedAdminOperations.add(OperationType.valueOf(e));
excludedAdminOperations.add(OperationType.valueOf(e));
}
}

configuration.serverUri = config.get("serverUri", "tcp://localhost:1883");
configuration.username = config.get("username", null);
configuration.password = config.get("password", null);
configuration.topic = config.get("topic", "keycloak/events");
configuration.usePersistence = config.getBoolean("usePersistence", false);
configuration.retained = config.getBoolean("retained", true);
configuration.cleanSession = config.getBoolean("cleanSession", true);
configuration.qos = config.getInt("qos", 0);
MqttConnectOptions options = new MqttConnectOptions();
var serverUri = config.get("serverUri", "tcp://localhost:1883");

MemoryPersistence persistence = null;
if (config.getBoolean("usePersistence", false)) {
persistence = new MemoryPersistence();
}

var username = config.get("username", null);
var password = config.get("password", null);
if (username != null && password != null) {
options.setUserName(username);
options.setPassword(password.toCharArray());
}
options.setAutomaticReconnect(true);
options.setCleanSession(config.getBoolean("cleanSession", true));
options.setConnectionTimeout(10);

messageOptions = new MQTTMessageOptions();
messageOptions.topic = config.get("topic", "keycloak/events");
messageOptions.retained = config.getBoolean("retained", true);
messageOptions.qos = config.getInt("qos", 0);

try {
client = new MqttClient(serverUri, PUBLISHER_ID, persistence);
client.connect(options);
} catch (MqttSecurityException e){
logger.log(Level.SEVERE, "Connection not secure!", e);
} catch (MqttException e){
logger.log(Level.SEVERE, "Connection could not be established!", e);
}
}

@Override
Expand All @@ -75,7 +112,11 @@ public void postInit(KeycloakSessionFactory factory) {

@Override
public void close() {
// not needed
try {
client.disconnect();
} catch (MqttException e) {
logger.log(Level.SEVERE, "Connection could not be closed!", e);
}
}

@Override
Expand Down

0 comments on commit 777500c

Please sign in to comment.