Skip to content

Commit

Permalink
Redesign kernel events dispatching facility
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Pinčuk <[email protected]>
  • Loading branch information
avpinchuk committed Dec 12, 2024
1 parent 2bff8c3 commit 3075558
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 67 deletions.
4 changes: 4 additions & 0 deletions nucleus/core/kernel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-osgi</artifactId>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>osgi.core</artifactId>
</dependency>

<dependency>
<groupId>org.glassfish.main.admin</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public static Logger getLogger() {
level = "SEVERE")
public static final String cantDetermineLocation = LOGMSG_PREFIX + "-00040";

@LogMessageInfo(
@LogMessageInfo(
message = "Application deployment failed: {0}",
cause = "The deployment command for an application failed as indicated in the message.",
action = "Check the application and redeploy.",
Expand Down Expand Up @@ -453,11 +453,11 @@ public static Logger getLogger() {
public static final String exceptionAutodeployment = LOGMSG_PREFIX + "-00067";

@LogMessageInfo(
message = "Exception while sending an event.",
cause = "An exception occurred while sending an event.",
message = "Exception while registering an event listener.",
cause = "An exception occurred while registering an event listener.",
action = "Check the system logs and contact support.",
level = "SEVERE")
public static final String exceptionSendEvent = LOGMSG_PREFIX + "-00068";
public static final String exceptionRegisterEventListener = LOGMSG_PREFIX + "-00068";

@LogMessageInfo(
message = "Exception while dispatching an event",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.kernel.event;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;

import org.glassfish.api.event.Events;
import org.glassfish.hk2.api.Factory;
import org.glassfish.hk2.api.ServiceLocator;
import org.jvnet.hk2.annotations.Service;

/**
* HK2 factory for the {@link Events} implementations.
*/
@Service
public class EventsFactory implements Factory<Events> {

private static final String BUNDLE_REFERENCE_CLASS_NAME = "org.osgi.framework.BundleReference";

@Inject
private ServiceLocator serviceLocator;

@Override
@Singleton
public Events provide() {
if (isOSGiEnv()) {
return serviceLocator.createAndInitialize(OSGiAwareEventsImpl.class);
}
return serviceLocator.createAndInitialize(EventsImpl.class);
}

@Override
public void dispose(Events events) {
serviceLocator.preDestroy(events);
}

/**
* Determine if we are operating in OSGi environment.
*
* <p>We do this by checking what class loader is used to this class.
*
* @return {@code true} if we are called in the context of OSGi framework, {@code false} otherwise
*/
private boolean isOSGiEnv() {
return isBundleReference(getClass().getClassLoader());
}

/**
* Determines if the specified object is OSGi bundle reference.
*
* @param obj the object to check
* @return {@code true} if the {@code obj} is OSGi bundle reference, {@code false} otherwise
*/
private boolean isBundleReference(Object obj) {
Queue<Class<?>> interfaces = new ArrayDeque<>();
Class<?> c = obj.getClass();
while (c != null && c != Object.class) {
interfaces.addAll(Arrays.asList(c.getInterfaces()));
while (!interfaces.isEmpty()) {
Class<?> interfaceClass = interfaces.poll();
if (BUNDLE_REFERENCE_CLASS_NAME.equals(interfaceClass.getName())) {
return true;
}
interfaces.addAll(Arrays.asList(interfaceClass.getInterfaces()));
}
c = c.getSuperclass();
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
import jakarta.inject.Inject;

import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -34,26 +35,46 @@
import org.glassfish.api.event.RestrictTo;
import org.glassfish.deployment.common.DeploymentException;
import org.glassfish.kernel.KernelLoggerInfo;
import org.jvnet.hk2.annotations.Service;

/**
* Simple implementation of the events dispatching facility.
*
* @author Jerome Dochez
*/
@Service
public class EventsImpl implements Events {

private final static Logger logger = KernelLoggerInfo.getLogger();
private final static Logger LOG = KernelLoggerInfo.getLogger();

private final AtomicLong sequenceGenerator = new AtomicLong(0L);

/**
* Use skip list based map to preserve listeners in registration order.
*/
final Map<Listener, EventMatcher> listeners = new ConcurrentSkipListMap<>();

@Inject
private ExecutorService executor;

private final Queue<EventListener> listeners = new ConcurrentLinkedQueue<>();

@Override
public void register(EventListener listener) {
listeners.add(listener);
try {
Method eventMethod = listener.getClass().getMethod("event", Event.class);
RestrictTo[] restrictTo = eventMethod.getParameters()[0].getAnnotationsByType(RestrictTo.class);
EventTypes<?>[] eventTypes = Arrays.stream(restrictTo)
.map(restrict -> EventTypes.create(restrict.value())).toArray(EventTypes[]::new);
listeners.putIfAbsent(new Listener(listener, sequenceGenerator.getAndIncrement()), new EventMatcher(eventTypes));
} catch (Throwable t) {
// We need to catch Throwable, otherwise we can server not to
// shutdown when the following happens:
// Assume a bundle which has registered an event listener
// has been uninstalled without unregistering the listener.
// listener.getClass() refers to a class of such an uninstalled
// bundle. If framework has been refreshed, then the
// classloader can't be used further to load any classes.
// As a result, an exception like NoClassDefFoundError is thrown
// from getMethod.
LOG.log(Level.SEVERE, KernelLoggerInfo.exceptionRegisterEventListener, t);
}
}

@Override
Expand All @@ -63,69 +84,104 @@ public void send(final Event<?> event) {

@Override
public void send(final Event<?> event, boolean asynchronously) {
Iterator<EventListener> iterator = listeners.iterator();
while (iterator.hasNext()) {
EventListener listener = iterator.next();

Method eventMethod;
try {
// Check if the listener is interested with his event.
eventMethod = listener.getClass().getMethod("event", Event.class);
} catch (Throwable t) {
// We need to catch Throwable, otherwise we can server not to
// shutdown when the following happens:
// Assume a bundle which has registered a event listener
// has been uninstalled without unregistering the listener.
// listener.getClass() refers to a class of such an uninstalled
// bundle. If framework has been refreshed, then the
// classloader can't be used further to load any classes.
// As a result, an exception like NoClassDefFoundError is thrown
// from getMethod.
logger.log(Level.SEVERE, KernelLoggerInfo.exceptionSendEvent, t);
iterator.remove();
continue;
}

RestrictTo[] restrictTo = eventMethod.getParameters()[0].getAnnotationsByType(RestrictTo.class);
if (restrictTo.length > 0) {
boolean isInterested = false;
for (RestrictTo restrict : restrictTo) {
EventTypes<?> interestedEvent = EventTypes.create(restrict.value());
if (event.is(interestedEvent)) {
isInterested = true;
break;
}
}

if (!isInterested) {
continue;
}
}

if (asynchronously) {
executor.submit(() -> {
for (Map.Entry<Listener, EventMatcher> entry : listeners.entrySet()) {
EventMatcher matcher = entry.getValue();
// Check if the listener is interested with his event.
if (matcher.matches(event)) {
Listener listener = entry.getKey();
if (asynchronously) {
executor.submit(() -> {
try {
listener.event(event);
} catch (Throwable t) {
LOG.log(Level.WARNING, KernelLoggerInfo.exceptionDispatchEvent, t);
}
});
} else {
try {
listener.event(event);
} catch(Throwable t) {
logger.log(Level.WARNING, KernelLoggerInfo.exceptionDispatchEvent, t);
} catch (DeploymentException e) {
// When synchronous listener throws DeploymentException
// we re-throw the exception to abort the deployment
throw e;
} catch (Throwable t) {
LOG.log(Level.WARNING, KernelLoggerInfo.exceptionDispatchEvent, t);
}
});
} else {
try {
listener.event(event);
} catch (DeploymentException e) {
// When synchronous listener throws DeploymentException
// we re-throw the exception to abort the deployment
throw e;
} catch (Throwable t) {
logger.log(Level.WARNING, KernelLoggerInfo.exceptionDispatchEvent, t);
}
}
}
}

@Override
public boolean unregister(EventListener listener) {
return listeners.remove(listener);
return listeners.remove(new Listener(listener)) != null;
}

/**
* Comparable listener wrapper.
*
* <p>Need to dispatch events in the listener registration order.
*/
static class Listener implements Comparable<Listener> {

private final EventListener eventListener;
private final long sequenceNumber;

Listener(EventListener eventListener) {
this(eventListener, -1L);
}

Listener(EventListener eventListener, long sequenceNumber) {
this.eventListener = eventListener;
this.sequenceNumber = sequenceNumber;
}

void event(Event<?> event) {
eventListener.event(event);
}

EventListener unwrap() {
return eventListener;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof Listener)) {
return false;
}
return eventListener.equals(((Listener) obj).eventListener);
}

@Override
public int hashCode() {
return eventListener.hashCode();
}

@Override
public int compareTo(Listener listener) {
return Long.compare(sequenceNumber, listener.sequenceNumber);
}
}

/**
* A class that perform match operations on events.
*/
static class EventMatcher {

private final EventTypes<?>[] eventTypes;

EventMatcher(EventTypes<?>[] eventTypes) {
this.eventTypes = eventTypes;
}

boolean matches(Event<?> event) {
if (eventTypes.length == 0) {
return true;
}
return Arrays.stream(eventTypes).anyMatch(event::is);
}
}
}
Loading

0 comments on commit 3075558

Please sign in to comment.