Skip to content

Commit

Permalink
move session instrumentation into its own instrumentation module.
Browse files Browse the repository at this point in the history
  • Loading branch information
breedx-splk committed Dec 11, 2024
1 parent dd58005 commit ce8fc85
Show file tree
Hide file tree
Showing 19 changed files with 307 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import android.app.Application;
import android.util.Log;
import androidx.annotation.NonNull;
import androidx.annotation.VisibleForTesting;
import io.opentelemetry.android.common.RumConstants;
import io.opentelemetry.android.config.OtelRumConfig;
import io.opentelemetry.android.features.diskbuffering.DiskBufferingConfiguration;
Expand All @@ -31,6 +32,7 @@
import io.opentelemetry.android.internal.services.ServiceManagerImpl;
import io.opentelemetry.android.internal.services.periodicwork.PeriodicWorkService;
import io.opentelemetry.android.session.SessionManager;
import io.opentelemetry.android.session.SessionManagerImpl;
import io.opentelemetry.android.session.SessionProvider;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
Expand Down Expand Up @@ -95,6 +97,7 @@ public final class OpenTelemetryRumBuilder {
private Resource resource;

@Nullable private ServiceManager serviceManager;
@Nullable private SessionManager sessionManager;
@Nullable private ExportScheduleHandler exportScheduleHandler;

private static TextMapPropagator buildDefaultPropagator() {
Expand Down Expand Up @@ -306,8 +309,10 @@ public OpenTelemetryRum build() {
}
initializationEvents.spanExporterInitialized(spanExporter);

SessionManager sessionManager =
SessionManager.create(timeoutHandler, config.getSessionTimeout().toNanos());
if (sessionManager == null) {
sessionManager =
SessionManagerImpl.create(timeoutHandler, config.getSessionTimeout().toNanos());
}

OpenTelemetrySdk sdk =
OpenTelemetrySdk.builder()
Expand Down Expand Up @@ -348,6 +353,12 @@ public OpenTelemetryRumBuilder setServiceManager(ServiceManager serviceManager)
return this;
}

@VisibleForTesting
OpenTelemetryRumBuilder setSessionManager(SessionManager sessionManager) {
this.sessionManager = sessionManager;
return this;
}

/**
* Sets a scheduler that will take care of periodically read data stored in disk and export it.
* If not specified, the default schedule exporter will be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import io.opentelemetry.android.instrumentation.AndroidInstrumentationLoader
import io.opentelemetry.android.instrumentation.InstallationContext
import io.opentelemetry.android.internal.services.ServiceManager
import io.opentelemetry.android.session.SessionManager
import io.opentelemetry.android.session.SessionManagerImpl
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.logs.internal.SdkEventLoggerProvider

class SdkPreconfiguredRumBuilder
@JvmOverloads
internal constructor(
private val application: Application,
private val sdk: OpenTelemetrySdk,
private val timeoutHandler: SessionIdTimeoutHandler = SessionIdTimeoutHandler(),
private val sessionManager: SessionManager = SessionManager(timeoutHandler = timeoutHandler),
private val sessionManager: SessionManager = SessionManagerImpl(timeoutHandler = timeoutHandler),
private val discoverInstrumentations: Boolean,
private val serviceManager: ServiceManager,
) {
Expand Down Expand Up @@ -52,22 +52,23 @@ class SdkPreconfiguredRumBuilder
// might turn off/on additional telemetry depending on whether the app is active or not
appLifecycleService.registerListener(timeoutHandler)

val eventLogger =
SdkEventLoggerProvider.create(sdk.logsBridge)
.get(OpenTelemetryRum::class.java.simpleName)

sessionManager.addObserver(SessionIdEventSender(eventLogger))
// After addObserver(), we call getSessionId() to trigger a session.start event
sessionManager.getSessionId()

val openTelemetryRum = OpenTelemetryRumImpl(sdk, sessionManager)

// Install instrumentations
val ctx = InstallationContext(application, openTelemetryRum.openTelemetry, serviceManager)
val ctx =
InstallationContext(
application = application,
openTelemetry = openTelemetryRum.openTelemetry,
sessionManager = sessionManager,
serviceManager = serviceManager,
)
for (instrumentation in getInstrumentations()) {
instrumentation.install(ctx)
}

// After installing all instrumentations, we call getSessionId() to trigger the session start
sessionManager.getSessionId()

return openTelemetryRum
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ package io.opentelemetry.android.instrumentation

import android.app.Application
import io.opentelemetry.android.internal.services.ServiceManager
import io.opentelemetry.android.session.SessionManager
import io.opentelemetry.api.OpenTelemetry

data class InstallationContext(
val application: Application,
val openTelemetry: OpenTelemetry,
val sessionManager: SessionManager,
val serviceManager: ServiceManager,
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,76 +5,9 @@

package io.opentelemetry.android.session

import io.opentelemetry.android.SessionIdTimeoutHandler
import io.opentelemetry.sdk.common.Clock
import java.util.Collections.synchronizedList
import java.util.concurrent.TimeUnit

internal class SessionManager(
private val clock: Clock = Clock.getDefault(),
private val sessionStorage: SessionStorage = SessionStorage.InMemory(),
private val timeoutHandler: SessionIdTimeoutHandler,
private val idGenerator: SessionIdGenerator = SessionIdGenerator.DEFAULT,
private val sessionLifetimeNanos: Long = TimeUnit.HOURS.toNanos(4),
) : SessionProvider, SessionPublisher {
// TODO: Make thread safe / wrap with AtomicReference?
private var session: Session = Session.NONE
private val observers = synchronizedList(ArrayList<SessionObserver>())

init {
sessionStorage.save(session)
}

override fun addObserver(observer: SessionObserver) {
observers.add(observer)
}

override fun getSessionId(): String {
// value will never be null
var newSession = session

if (sessionHasExpired() || timeoutHandler.hasTimedOut()) {
val newId = idGenerator.generateSessionId()

// TODO FIXME: This is not threadsafe -- if two threads call getSessionId()
// at the same time while timed out, two new sessions are created
// Could require SessionStorage impls to be atomic/threadsafe or
// do the locking in this class?

newSession = Session.DefaultSession(newId, clock.now())
sessionStorage.save(newSession)
}

timeoutHandler.bump()

// observers need to be called after bumping the timer because it may
// create a new span
if (newSession != session) {
val oldSession = session
session = newSession
observers.forEach {
it.onSessionEnded(oldSession)
it.onSessionStarted(session, oldSession)
}
}
return session.getId()
}

private fun sessionHasExpired(): Boolean {
val elapsedTime = clock.now() - session.getStartTimestamp()
return elapsedTime >= sessionLifetimeNanos
}

companion object {
@JvmStatic
fun create(
timeoutHandler: SessionIdTimeoutHandler,
sessionLifetimeNanos: Long,
): SessionManager {
return SessionManager(
timeoutHandler = timeoutHandler,
sessionLifetimeNanos = sessionLifetimeNanos,
)
}
}
}
/**
* The SessionManager is a public-facing tag interface that brings together
* the SessionProvider and SessionPublisher interfaces under a common
* name.
*/
interface SessionManager : SessionProvider, SessionPublisher
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.session

import io.opentelemetry.android.SessionIdTimeoutHandler
import io.opentelemetry.sdk.common.Clock
import java.util.Collections.synchronizedList
import java.util.concurrent.TimeUnit

internal class SessionManagerImpl(
private val clock: Clock = Clock.getDefault(),
private val sessionStorage: SessionStorage = SessionStorage.InMemory(),
private val timeoutHandler: SessionIdTimeoutHandler,
private val idGenerator: SessionIdGenerator = SessionIdGenerator.DEFAULT,
private val sessionLifetimeNanos: Long = TimeUnit.HOURS.toNanos(4),
) : SessionManager {
// TODO: Make thread safe / wrap with AtomicReference?
private var session: Session = Session.NONE
private val observers = synchronizedList(ArrayList<SessionObserver>())

init {
sessionStorage.save(session)
}

override fun addObserver(observer: SessionObserver) {
observers.add(observer)
}

override fun getSessionId(): String {
// value will never be null
var newSession = session

if (sessionHasExpired() || timeoutHandler.hasTimedOut()) {
val newId = idGenerator.generateSessionId()

// TODO FIXME: This is not threadsafe -- if two threads call getSessionId()
// at the same time while timed out, two new sessions are created
// Could require SessionStorage impls to be atomic/threadsafe or
// do the locking in this class?

newSession = Session.DefaultSession(newId, clock.now())
sessionStorage.save(newSession)
}

timeoutHandler.bump()

// observers need to be called after bumping the timer because it may
// create a new span
if (newSession != session) {
val oldSession = session
session = newSession
observers.forEach {
it.onSessionEnded(oldSession)
it.onSessionStarted(session, oldSession)
}
}
return session.getId()
}

private fun sessionHasExpired(): Boolean {
val elapsedTime = clock.now() - session.getStartTimestamp()
return elapsedTime >= sessionLifetimeNanos
}

companion object {
@JvmStatic
fun create(
timeoutHandler: SessionIdTimeoutHandler,
sessionLifetimeNanos: Long,
): SessionManagerImpl {
return SessionManagerImpl(
timeoutHandler = timeoutHandler,
sessionLifetimeNanos = sessionLifetimeNanos,
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.semconv.incubating.EventIncubatingAttributes.EVENT_NAME;
import static io.opentelemetry.semconv.incubating.SessionIncubatingAttributes.SESSION_ID;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.anyCollection;
Expand Down Expand Up @@ -43,6 +42,7 @@
import io.opentelemetry.android.internal.services.applifecycle.AppLifecycleService;
import io.opentelemetry.android.internal.services.applifecycle.ApplicationStateListener;
import io.opentelemetry.android.internal.services.visiblescreen.VisibleScreenService;
import io.opentelemetry.android.session.SessionManager;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.KeyValue;
Expand Down Expand Up @@ -175,21 +175,16 @@ public void shouldBuildLogRecordProvider() {
eventLogger.builder("test.event").put("body.field", "foo").setAttributes(attrs).emit();

List<LogRecordData> logs = logsExporter.getFinishedLogRecordItems();
assertThat(logs).hasSize(2);
assertThat(logs).hasSize(1);
assertThat(logs.get(0))
.hasAttributesSatisfyingExactly(
equalTo(SESSION_ID, openTelemetryRum.getRumSessionId()),
equalTo(SCREEN_NAME_KEY, CUR_SCREEN_NAME),
equalTo(stringKey("event.name"), "session.start"));
assertThat(logs.get(1))
.hasAttributesSatisfyingExactly(
equalTo(SESSION_ID, openTelemetryRum.getRumSessionId()),
equalTo(stringKey("event.name"), "test.event"),
equalTo(SCREEN_NAME_KEY, CUR_SCREEN_NAME),
equalTo(stringKey("mega"), "hit"))
.hasResource(resource);

Value<?> bodyValue = logs.get(1).getBodyValue();
Value<?> bodyValue = logs.get(0).getBodyValue();
List<KeyValue> payload = (List<KeyValue>) bodyValue.getValue();
assertThat(payload).hasSize(1);
KeyValue expected = KeyValue.of("body.field", Value.of("foo"));
Expand All @@ -199,6 +194,7 @@ public void shouldBuildLogRecordProvider() {
@Test
public void shouldInstallInstrumentation() {
ServiceManager serviceManager = createServiceManager();
SessionManager sessionManager = mock(SessionManager.class);
SessionIdTimeoutHandler timeoutHandler = mock();
AndroidInstrumentation localInstrumentation = mock();
AndroidInstrumentation classpathInstrumentation = mock();
Expand All @@ -210,19 +206,23 @@ public void shouldInstallInstrumentation() {
new OpenTelemetryRumBuilder(application, buildConfig(), timeoutHandler)
.addInstrumentation(localInstrumentation)
.setServiceManager(serviceManager)
.setSessionManager(sessionManager)
.build();

verify(serviceManager.getAppLifecycleService()).registerListener(timeoutHandler);

InstallationContext expectedCtx =
new InstallationContext(application, rum.getOpenTelemetry(), serviceManager);
new InstallationContext(
application, rum.getOpenTelemetry(), sessionManager, serviceManager);

verify(localInstrumentation).install(eq(expectedCtx));
verify(classpathInstrumentation).install(eq(expectedCtx));
}

@Test
public void shouldInstallInstrumentation_excludingClasspathImplsWhenRequestedInConfig() {
ServiceManager serviceManager = createServiceManager();
SessionManager sessionManager = mock(SessionManager.class);
SessionIdTimeoutHandler timeoutHandler = mock();
AndroidInstrumentation localInstrumentation = mock();
AndroidInstrumentation classpathInstrumentation = mock();
Expand All @@ -237,12 +237,14 @@ public void shouldInstallInstrumentation_excludingClasspathImplsWhenRequestedInC
timeoutHandler)
.addInstrumentation(localInstrumentation)
.setServiceManager(serviceManager)
.setSessionManager(sessionManager)
.build();

verify(serviceManager.getAppLifecycleService()).registerListener(timeoutHandler);

InstallationContext expectedCtx =
new InstallationContext(application, rum.getOpenTelemetry(), serviceManager);
new InstallationContext(
application, rum.getOpenTelemetry(), sessionManager, serviceManager);
verify(localInstrumentation).install(eq(expectedCtx));
verifyNoInteractions(classpathInstrumentation);
}
Expand Down Expand Up @@ -325,13 +327,8 @@ public void setLogRecordExporterCustomizer() {
() -> assertThat(logsExporter.getFinishedLogRecordItems()).isNotEmpty());
assertThat(wasCalled.get()).isTrue();
Collection<LogRecordData> logs = logsExporter.getFinishedLogRecordItems();
assertThat(logs).hasSize(2);
assertThat(logs).hasSize(1);
Iterator<LogRecordData> iter = logs.iterator();
assertThat(iter.next())
.hasAttributesSatisfyingExactly(
equalTo(SESSION_ID, rum.getRumSessionId()),
equalTo(SCREEN_NAME_KEY, CUR_SCREEN_NAME),
equalTo(stringKey("event.name"), "session.start"));
assertThat(iter.next())
.hasBody("foo")
.hasAttributesSatisfyingExactly(
Expand Down Expand Up @@ -444,14 +441,8 @@ public void verifyGlobalAttrsForLogs() {
logger.logRecordBuilder().setAttribute(stringKey("localAttrKey"), "localAttrValue").emit();

List<LogRecordData> recordedLogs = logRecordExporter.getFinishedLogRecordItems();
assertThat(recordedLogs).hasSize(2); // session start, the the above log
assertThat(recordedLogs).hasSize(1); // session start, the the above log
assertThat(recordedLogs.get(0))
.hasAttributesSatisfyingExactly(
equalTo(EVENT_NAME, "session.start"),
equalTo(globalKey, "someGlobalValue"),
equalTo(SESSION_ID, rum.getRumSessionId()),
equalTo(SCREEN_NAME_KEY, CUR_SCREEN_NAME));
assertThat(recordedLogs.get(1))
.hasAttributesSatisfyingExactly(
equalTo(SESSION_ID, rum.getRumSessionId()),
equalTo(globalKey, "someGlobalValue"),
Expand Down
Loading

0 comments on commit ce8fc85

Please sign in to comment.