Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add SessionIdEventSender and wire up into lifecycle #571

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import android.app.Application;
import android.util.Log;
import androidx.annotation.NonNull;
import androidx.annotation.VisibleForTesting;
import io.opentelemetry.android.common.RumConstants;
import io.opentelemetry.android.config.OtelRumConfig;
import io.opentelemetry.android.features.diskbuffering.DiskBufferingConfiguration;
Expand All @@ -31,6 +32,7 @@
import io.opentelemetry.android.internal.services.ServiceManagerImpl;
import io.opentelemetry.android.internal.services.periodicwork.PeriodicWorkService;
import io.opentelemetry.android.session.SessionManager;
import io.opentelemetry.android.session.SessionManagerImpl;
import io.opentelemetry.android.session.SessionProvider;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
Expand Down Expand Up @@ -95,6 +97,7 @@ public final class OpenTelemetryRumBuilder {
private Resource resource;

@Nullable private ServiceManager serviceManager;
@Nullable private SessionManager sessionManager;
@Nullable private ExportScheduleHandler exportScheduleHandler;

private static TextMapPropagator buildDefaultPropagator() {
Expand Down Expand Up @@ -306,8 +309,10 @@ public OpenTelemetryRum build() {
}
initializationEvents.spanExporterInitialized(spanExporter);

SessionManager sessionManager =
SessionManager.create(timeoutHandler, config.getSessionTimeout().toNanos());
if (sessionManager == null) {
sessionManager =
SessionManagerImpl.create(timeoutHandler, config.getSessionTimeout().toNanos());
}

OpenTelemetrySdk sdk =
OpenTelemetrySdk.builder()
Expand Down Expand Up @@ -348,6 +353,12 @@ public OpenTelemetryRumBuilder setServiceManager(ServiceManager serviceManager)
return this;
}

@VisibleForTesting
OpenTelemetryRumBuilder setSessionManager(SessionManager sessionManager) {
this.sessionManager = sessionManager;
return this;
}

/**
* Sets a scheduler that will take care of periodically read data stored in disk and export it.
* If not specified, the default schedule exporter will be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.opentelemetry.android.instrumentation.AndroidInstrumentationLoader
import io.opentelemetry.android.instrumentation.InstallationContext
import io.opentelemetry.android.internal.services.ServiceManager
import io.opentelemetry.android.session.SessionManager
import io.opentelemetry.android.session.SessionManagerImpl
import io.opentelemetry.sdk.OpenTelemetrySdk

class SdkPreconfiguredRumBuilder
Expand All @@ -19,7 +20,7 @@ class SdkPreconfiguredRumBuilder
private val application: Application,
private val sdk: OpenTelemetrySdk,
private val timeoutHandler: SessionIdTimeoutHandler = SessionIdTimeoutHandler(),
private val sessionManager: SessionManager = SessionManager(timeoutHandler = timeoutHandler),
private val sessionManager: SessionManager = SessionManagerImpl(timeoutHandler = timeoutHandler),
private val discoverInstrumentations: Boolean,
private val serviceManager: ServiceManager,
) {
Expand Down Expand Up @@ -54,11 +55,20 @@ class SdkPreconfiguredRumBuilder
val openTelemetryRum = OpenTelemetryRumImpl(sdk, sessionManager)

// Install instrumentations
val ctx = InstallationContext(application, openTelemetryRum.openTelemetry, serviceManager)
val ctx =
InstallationContext(
application = application,
openTelemetry = openTelemetryRum.openTelemetry,
sessionManager = sessionManager,
serviceManager = serviceManager,
)
for (instrumentation in getInstrumentations()) {
instrumentation.install(ctx)
}

// After installing all instrumentations, we call getSessionId() to trigger the session start
sessionManager.getSessionId()

return openTelemetryRum
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,75 +5,9 @@

package io.opentelemetry.android.session

import io.opentelemetry.android.SessionIdTimeoutHandler
import io.opentelemetry.sdk.common.Clock
import java.util.Collections.synchronizedList
import java.util.concurrent.TimeUnit

internal class SessionManager(
private val clock: Clock = Clock.getDefault(),
private val sessionStorage: SessionStorage = SessionStorage.InMemory(),
private val timeoutHandler: SessionIdTimeoutHandler,
private val idGenerator: SessionIdGenerator = SessionIdGenerator.DEFAULT,
private val sessionLifetimeNanos: Long = TimeUnit.HOURS.toNanos(4),
) : SessionProvider, SessionPublisher {
// TODO: Make thread safe / wrap with AtomicReference?
private var session: Session = Session.NONE
private val observers = synchronizedList(ArrayList<SessionObserver>())

init {
sessionStorage.save(session)
}

override fun addObserver(observer: SessionObserver) {
observers.add(observer)
}

override fun getSessionId(): String {
// value will never be null
var newSession = session

if (sessionHasExpired() || timeoutHandler.hasTimedOut()) {
val newId = idGenerator.generateSessionId()

// TODO FIXME: This is not threadsafe -- if two threads call getSessionId()
// at the same time while timed out, two new sessions are created
// Could require SessionStorage impls to be atomic/threadsafe or
// do the locking in this class?

newSession = Session.DefaultSession(newId, clock.now())
sessionStorage.save(newSession)
}

timeoutHandler.bump()

// observers need to be called after bumping the timer because it may
// create a new span
if (newSession != session) {
observers.forEach {
it.onSessionEnded(session)
it.onSessionStarted(newSession, session)
}
session = newSession
}
return session.getId()
}

private fun sessionHasExpired(): Boolean {
val elapsedTime = clock.now() - session.getStartTimestamp()
return elapsedTime >= sessionLifetimeNanos
}

companion object {
@JvmStatic
fun create(
timeoutHandler: SessionIdTimeoutHandler,
sessionLifetimeNanos: Long,
): SessionManager {
return SessionManager(
timeoutHandler = timeoutHandler,
sessionLifetimeNanos = sessionLifetimeNanos,
)
}
}
}
/**
* The SessionManager is a public-facing tag interface that brings together
* the SessionProvider and SessionPublisher interfaces under a common
* name.
*/
interface SessionManager : SessionProvider, SessionPublisher
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.session

import io.opentelemetry.android.SessionIdTimeoutHandler
import io.opentelemetry.sdk.common.Clock
import java.util.Collections.synchronizedList
import java.util.concurrent.TimeUnit

internal class SessionManagerImpl(
private val clock: Clock = Clock.getDefault(),
private val sessionStorage: SessionStorage = SessionStorage.InMemory(),
private val timeoutHandler: SessionIdTimeoutHandler,
private val idGenerator: SessionIdGenerator = SessionIdGenerator.DEFAULT,
private val sessionLifetimeNanos: Long = TimeUnit.HOURS.toNanos(4),
) : SessionManager {
// TODO: Make thread safe / wrap with AtomicReference?
private var session: Session = Session.NONE
private val observers = synchronizedList(ArrayList<SessionObserver>())

init {
sessionStorage.save(session)
}

override fun addObserver(observer: SessionObserver) {
observers.add(observer)
}

override fun getSessionId(): String {
// value will never be null
var newSession = session

if (sessionHasExpired() || timeoutHandler.hasTimedOut()) {
val newId = idGenerator.generateSessionId()

// TODO FIXME: This is not threadsafe -- if two threads call getSessionId()
// at the same time while timed out, two new sessions are created
// Could require SessionStorage impls to be atomic/threadsafe or
// do the locking in this class?

newSession = Session.DefaultSession(newId, clock.now())
sessionStorage.save(newSession)
}

timeoutHandler.bump()

// observers need to be called after bumping the timer because it may
// create a new span
if (newSession != session) {
val oldSession = session
session = newSession
observers.forEach {
it.onSessionEnded(oldSession)
it.onSessionStarted(session, oldSession)
}
}
return session.getId()
}

private fun sessionHasExpired(): Boolean {
val elapsedTime = clock.now() - session.getStartTimestamp()
return elapsedTime >= sessionLifetimeNanos
}

companion object {
@JvmStatic
fun create(
timeoutHandler: SessionIdTimeoutHandler,
sessionLifetimeNanos: Long,
): SessionManagerImpl {
return SessionManagerImpl(
timeoutHandler = timeoutHandler,
sessionLifetimeNanos = sessionLifetimeNanos,
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import io.opentelemetry.android.internal.services.applifecycle.AppLifecycleService;
import io.opentelemetry.android.internal.services.applifecycle.ApplicationStateListener;
import io.opentelemetry.android.internal.services.visiblescreen.VisibleScreenService;
import io.opentelemetry.android.session.SessionManager;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.KeyValue;
import io.opentelemetry.api.common.Value;
Expand All @@ -60,7 +62,6 @@
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
import io.opentelemetry.sdk.logs.internal.SdkEventLoggerProvider;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.trace.data.SpanData;
Expand All @@ -69,6 +70,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -192,6 +194,7 @@ public void shouldBuildLogRecordProvider() {
@Test
public void shouldInstallInstrumentation() {
ServiceManager serviceManager = createServiceManager();
SessionManager sessionManager = mock(SessionManager.class);
SessionIdTimeoutHandler timeoutHandler = mock();
AndroidInstrumentation localInstrumentation = mock();
AndroidInstrumentation classpathInstrumentation = mock();
Expand All @@ -203,19 +206,23 @@ public void shouldInstallInstrumentation() {
new OpenTelemetryRumBuilder(application, buildConfig(), timeoutHandler)
.addInstrumentation(localInstrumentation)
.setServiceManager(serviceManager)
.setSessionManager(sessionManager)
.build();

verify(serviceManager.getAppLifecycleService()).registerListener(timeoutHandler);

InstallationContext expectedCtx =
new InstallationContext(application, rum.getOpenTelemetry(), serviceManager);
new InstallationContext(
application, rum.getOpenTelemetry(), sessionManager, serviceManager);

verify(localInstrumentation).install(eq(expectedCtx));
verify(classpathInstrumentation).install(eq(expectedCtx));
}

@Test
public void shouldInstallInstrumentation_excludingClasspathImplsWhenRequestedInConfig() {
ServiceManager serviceManager = createServiceManager();
SessionManager sessionManager = mock(SessionManager.class);
SessionIdTimeoutHandler timeoutHandler = mock();
AndroidInstrumentation localInstrumentation = mock();
AndroidInstrumentation classpathInstrumentation = mock();
Expand All @@ -230,12 +237,14 @@ public void shouldInstallInstrumentation_excludingClasspathImplsWhenRequestedInC
timeoutHandler)
.addInstrumentation(localInstrumentation)
.setServiceManager(serviceManager)
.setSessionManager(sessionManager)
.build();

verify(serviceManager.getAppLifecycleService()).registerListener(timeoutHandler);

InstallationContext expectedCtx =
new InstallationContext(application, rum.getOpenTelemetry(), serviceManager);
new InstallationContext(
application, rum.getOpenTelemetry(), sessionManager, serviceManager);
verify(localInstrumentation).install(eq(expectedCtx));
verifyNoInteractions(classpathInstrumentation);
}
Expand Down Expand Up @@ -319,7 +328,8 @@ public void setLogRecordExporterCustomizer() {
assertThat(wasCalled.get()).isTrue();
Collection<LogRecordData> logs = logsExporter.getFinishedLogRecordItems();
assertThat(logs).hasSize(1);
assertThat(logs.iterator().next())
Iterator<LogRecordData> iter = logs.iterator();
assertThat(iter.next())
.hasBody("foo")
.hasAttributesSatisfyingExactly(
equalTo(stringKey("bing"), "bang"),
Expand Down Expand Up @@ -415,8 +425,8 @@ public void diskBufferingDisabled() {
public void verifyGlobalAttrsForLogs() {
ServiceManager serviceManager = createServiceManager();
OtelRumConfig otelRumConfig = buildConfig();
otelRumConfig.setGlobalAttributes(
() -> Attributes.of(stringKey("someGlobalKey"), "someGlobalValue"));
AttributeKey<String> globalKey = stringKey("someGlobalKey");
otelRumConfig.setGlobalAttributes(() -> Attributes.of(globalKey, "someGlobalValue"));

OpenTelemetryRum rum =
OpenTelemetryRum.builder(application, otelRumConfig)
Expand All @@ -431,16 +441,13 @@ public void verifyGlobalAttrsForLogs() {
logger.logRecordBuilder().setAttribute(stringKey("localAttrKey"), "localAttrValue").emit();

List<LogRecordData> recordedLogs = logRecordExporter.getFinishedLogRecordItems();
assertThat(recordedLogs).hasSize(1);
LogRecordData logRecordData = recordedLogs.get(0);
OpenTelemetryAssertions.assertThat(logRecordData)
.hasAttributes(
Attributes.builder()
.put(SESSION_ID, rum.getRumSessionId())
.put("someGlobalKey", "someGlobalValue")
.put("localAttrKey", "localAttrValue")
.put(SCREEN_NAME_KEY, CUR_SCREEN_NAME)
.build());
assertThat(recordedLogs).hasSize(1); // session start, the the above log
assertThat(recordedLogs.get(0))
.hasAttributesSatisfyingExactly(
equalTo(SESSION_ID, rum.getRumSessionId()),
equalTo(globalKey, "someGlobalValue"),
equalTo(stringKey("localAttrKey"), "localAttrValue"),
equalTo(SCREEN_NAME_KEY, CUR_SCREEN_NAME));
}

@Test
Expand Down
Loading
Loading