Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stork observability support #681

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion api/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,15 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.stork.api.Service::<init>(java.lang.String, io.smallrye.stork.api.LoadBalancer, io.smallrye.stork.api.ServiceDiscovery, io.smallrye.stork.api.ServiceRegistrar, boolean)",
"new": "method void io.smallrye.stork.api.Service::<init>(java.lang.String, java.lang.String, java.lang.String, io.smallrye.stork.api.observability.ObservationCollector, io.smallrye.stork.api.LoadBalancer, io.smallrye.stork.api.ServiceDiscovery, io.smallrye.stork.api.ServiceRegistrar<?>, boolean)",
"justification": "Implementing observability"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand Down
64 changes: 56 additions & 8 deletions api/src/main/java/io/smallrye/stork/api/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.util.concurrent.Semaphore;

import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.observability.ObservationCollector;
import io.smallrye.stork.api.observability.StorkObservation;

/**
* Represents a <em>Service</em>.
Expand All @@ -16,23 +18,34 @@ public class Service {
private final Semaphore instanceSelectionLock;
private final LoadBalancer loadBalancer;
private final ServiceDiscovery serviceDiscovery;
private final ServiceRegistrar serviceRegistrar;
private final ServiceRegistrar<?> serviceRegistrar;
private final String serviceName;
private final String serviceDiscoveryType;
private final String serviceSelectionType;
private final ObservationCollector observations;

/**
* Creates a new Service.
*
* @param serviceName the name, must not be {@code null}, must not be blank
* @param serviceDiscoveryType the type of the service discovery (for observability purpose)
* @param serviceSelectionType the type of the service selection (for observability purpose)
* @param collector the observation collector, must not be {@code null}
* @param loadBalancer the load balancer, can be {@code null}
* @param serviceDiscovery the service discovery, must not be {@code null}
* @param serviceRegistrar the service registrar, can be {@code null}
* @param requiresStrictRecording whether strict recording must be enabled
*/
public Service(String serviceName, LoadBalancer loadBalancer, ServiceDiscovery serviceDiscovery,
ServiceRegistrar serviceRegistrar, boolean requiresStrictRecording) {
public Service(String serviceName,
String serviceSelectionType, String serviceDiscoveryType, ObservationCollector collector,
LoadBalancer loadBalancer, ServiceDiscovery serviceDiscovery,
ServiceRegistrar<?> serviceRegistrar, boolean requiresStrictRecording) {
this.loadBalancer = loadBalancer;
this.serviceDiscovery = serviceDiscovery;
this.serviceRegistrar = serviceRegistrar;
this.serviceDiscoveryType = serviceDiscoveryType;
this.serviceSelectionType = serviceSelectionType;
this.observations = collector;
this.serviceName = serviceName;
this.instanceSelectionLock = requiresStrictRecording ? new Semaphore(1) : null;
}
Expand All @@ -41,21 +54,37 @@ public Service(String serviceName, LoadBalancer loadBalancer, ServiceDiscovery s
* Selects a service instance.
* <p>
* The selection looks for the service instances and select the one to use using the load balancer.
*
* <p>
* <b>Note:</b> this method doesn't record a start of an operation using this load balancer and does not
* synchronize load balancer invocations even if the load balancer is not thread safe
*
* @return a Uni with a ServiceInstance, or with {@link NoServiceInstanceFoundException} if the load balancer failed to find
* a service instance capable of handling a call
*/
public Uni<ServiceInstance> selectInstance() {
StorkObservation observationPoints = observations.create(serviceName, serviceDiscoveryType,
serviceSelectionType);
return serviceDiscovery.getServiceInstances()
.map(this::selectInstance);
.onItemOrFailure().invoke((list, failure) -> {
if (failure != null) {
observationPoints.onServiceDiscoveryFailure(failure);
} else {
observationPoints.onServiceDiscoverySuccess(list);
}
})
.map(this::selectInstance)
.onItemOrFailure().invoke((selected, failure) -> {
if (failure != null) {
observationPoints.onServiceSelectionFailure(failure);
} else {
observationPoints.onServiceSelectionSuccess(selected.getId());
}
});
}

/**
* Using the underlying load balancer, select a service instance from the collection of service instances.
*
* <p>
* <b>Note:</b> this method doesn't record a start of an operation using this load balancer and does not
* synchronize load balancer invocations even if the load balancer is not thread safe
*
Expand All @@ -80,8 +109,23 @@ public ServiceInstance selectInstance(Collection<ServiceInstance> instances) {
* @see LoadBalancer#requiresStrictRecording()
*/
public Uni<ServiceInstance> selectInstanceAndRecordStart(boolean measureTime) {
return serviceDiscovery.getServiceInstances()
.map(list -> selectInstanceAndRecordStart(list, measureTime));
StorkObservation observationPoints = observations.create(serviceName, serviceDiscoveryType,
serviceSelectionType);
return serviceDiscovery.getServiceInstances().onItemOrFailure().invoke((list, failure) -> {
if (failure != null) {
observationPoints.onServiceDiscoveryFailure(failure);
} else {
observationPoints.onServiceDiscoverySuccess(list);
}
})
.map(list -> selectInstanceAndRecordStart(list, measureTime))
.onItemOrFailure().invoke((selected, failure) -> {
if (failure != null) {
observationPoints.onServiceSelectionFailure(failure);
} else {
observationPoints.onServiceSelectionSuccess(selected.getId());
}
});
}

/**
Expand Down Expand Up @@ -158,6 +202,10 @@ public ServiceRegistrar getServiceRegistrar() {
return serviceRegistrar;
}

public ObservationCollector getObservations() {
return observations;
}

/**
* @return the service name.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.smallrye.stork.api.observability;

import java.util.List;

import io.smallrye.stork.api.ServiceInstance;

public class NoopObservationCollector implements ObservationCollector {

private static final StorkEventHandler NOOP_HANDLER = ev -> {
// NOOP
};

public static final StorkObservation NOOP_STORK_EVENT = new StorkObservation(
null, null,
null, NOOP_HANDLER) {
@Override
public void onServiceDiscoverySuccess(List<ServiceInstance> instances) {
// Noop
}

@Override
public void onServiceDiscoveryFailure(Throwable throwable) {
// Noop
}

@Override
public void onServiceSelectionSuccess(long id) {
// Noop
}

@Override
public void onServiceSelectionFailure(Throwable throwable) {
// Noop
}
};

@Override
public StorkObservation create(String serviceName, String serviceDiscoveryType,
String serviceSelectionType) {
return NOOP_STORK_EVENT;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.smallrye.stork.api.observability;

public interface ObservationCollector {

StorkObservation create(String serviceName, String serviceDiscoveryType, String serviceSelectionType);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.smallrye.stork.api.observability;

public interface StorkEventHandler {
void complete(StorkObservation event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.smallrye.stork.api.observability;

import java.time.Duration;
import java.util.List;

import io.smallrye.stork.api.ServiceInstance;

public class StorkObservation {
// Handler / Reporter
private final StorkEventHandler handler;

// Metadata
private final String serviceName;
private final String serviceDiscoveryType;
private final String serviceSelectionType;

// Time
private final long begin;
private volatile long endOfServiceDiscovery;
private volatile long endOfServiceSelection;

// Service discovery data
private volatile int instancesCount = -1;

// Service selection data
private volatile long selectedInstanceId = -1L;

// Overall status
private volatile boolean done;
private volatile boolean serviceDiscoverySuccessful = false;
private volatile Throwable failure;

public StorkObservation(String serviceName, String serviceDiscoveryType, String serviceSelectionType,
StorkEventHandler handler) {
this.handler = handler;
this.serviceName = serviceName;
this.serviceDiscoveryType = serviceDiscoveryType;
this.serviceSelectionType = serviceSelectionType;
this.begin = System.nanoTime();
}

public void onServiceDiscoverySuccess(List<ServiceInstance> instances) {
this.endOfServiceDiscovery = System.nanoTime();
this.serviceDiscoverySuccessful = true;
if (instances != null) {
this.instancesCount = instances.size();
} else {
this.instancesCount = 0;
}
}

public void onServiceDiscoveryFailure(Throwable throwable) {
this.endOfServiceDiscovery = System.nanoTime();
this.failure = throwable;
}

public void onServiceSelectionSuccess(long id) {
this.endOfServiceSelection = System.nanoTime();
this.selectedInstanceId = id;
this.done = true;
this.handler.complete(this);
}

public void onServiceSelectionFailure(Throwable throwable) {
this.endOfServiceSelection = System.nanoTime();
if (failure != throwable) {
this.failure = throwable;
}
this.handler.complete(this);
}

public boolean isDone() {
return done || failure != null;
}

public Duration getOverallDuration() {
if (!isDone()) {
return null;
}
return Duration.ofNanos(endOfServiceSelection - begin);
}

public Duration getServiceDiscoveryDuration() {
return Duration.ofNanos(endOfServiceDiscovery - begin);
}

public Duration getServiceSelectionDuration() {
if (!isDone()) {
return null;
}
return Duration.ofNanos(endOfServiceSelection - endOfServiceDiscovery);
}

public String getServiceName() {
return serviceName;
}

public String getServiceDiscoveryType() {
return serviceDiscoveryType;
}

public String getServiceSelectionType() {
return serviceSelectionType;
}

public int getDiscoveredInstancesCount() {
return instancesCount;
}

public Throwable failure() {
return failure;
}

public boolean isServiceDiscoverySuccessful() {
return serviceDiscoverySuccessful;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import java.util.function.Supplier;

import io.smallrye.stork.api.observability.NoopObservationCollector;
import io.smallrye.stork.api.observability.ObservationCollector;

/**
* A provider for "utility" objects used by service discovery and load balancer implementations.
*
Expand All @@ -24,4 +27,8 @@ public interface StorkInfrastructure {
* @throws NullPointerException if utilityClass or defaultSupplier are null
*/
<T> T get(Class<T> utilityClass, Supplier<T> defaultSupplier);

default ObservationCollector getObservationCollector() {
return new NoopObservationCollector();
}
}
10 changes: 7 additions & 3 deletions core/src/main/java/io/smallrye/stork/Stork.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,15 @@ private Service createService(Map<String, LoadBalancerLoader> loadBalancerLoader

final var loadBalancerConfig = serviceConfig.loadBalancer();
final LoadBalancer loadBalancer;
String loadBalancerType;
if (loadBalancerConfig == null) {
// no load balancer, use round-robin
LOGGER.debug("No load balancer configured for type {}, using {}", serviceDiscoveryType,
RoundRobinLoadBalancerProvider.ROUND_ROBIN_TYPE);
loadBalancerType = RoundRobinLoadBalancerProvider.ROUND_ROBIN_TYPE;
loadBalancer = new RoundRobinLoadBalancer();
} else {
String loadBalancerType = loadBalancerConfig.type();
loadBalancerType = loadBalancerConfig.type();
final var loadBalancerProvider = loadBalancerLoaders.get(loadBalancerType);
if (loadBalancerProvider == null) {
throw new IllegalArgumentException("No LoadBalancerProvider for type " + loadBalancerType);
Expand All @@ -223,7 +225,7 @@ private Service createService(Map<String, LoadBalancerLoader> loadBalancerLoader
}

final var serviceRegistrarConfig = serviceConfig.serviceRegistrar();
ServiceRegistrar serviceRegistrar = null;
ServiceRegistrar<?> serviceRegistrar = null;
if (serviceRegistrarConfig == null) {
LOGGER.debug("No service registrar configured for service {}", serviceConfig.serviceName());
} else {
Expand All @@ -237,7 +239,9 @@ private Service createService(Map<String, LoadBalancerLoader> loadBalancerLoader
serviceConfig.serviceName(), infrastructure);
}

return new Service(serviceConfig.serviceName(), loadBalancer, serviceDiscovery, serviceRegistrar,
return new Service(serviceConfig.serviceName(),
loadBalancerType, serviceDiscoveryType, infrastructure.getObservationCollector(),
loadBalancer, serviceDiscovery, serviceRegistrar,
loadBalancer.requiresStrictRecording());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.smallrye.stork.integration;

import io.smallrye.stork.api.observability.ObservationCollector;

public class ObservableStorkInfrastructure extends DefaultStorkInfrastructure {

private final ObservationCollector observationCollector;

public ObservableStorkInfrastructure(ObservationCollector observationCollector) {
this.observationCollector = observationCollector;
}

@Override
public ObservationCollector getObservationCollector() {
return observationCollector;
}
}
Loading