diff --git a/application-mode/build.gradle b/application-mode/build.gradle index 5a5b7827e61..bdba10e4bc9 100644 --- a/application-mode/build.gradle +++ b/application-mode/build.gradle @@ -4,7 +4,9 @@ plugins { } dependencies { + implementation project(':Base') implementation project(':Integrations') + implementation project(':engine-updategraph') Classpaths.inheritImmutables(project) diff --git a/application-mode/src/main/java/io/deephaven/appmode/ApplicationState.java b/application-mode/src/main/java/io/deephaven/appmode/ApplicationState.java index caeb9804bef..7550ec2171e 100644 --- a/application-mode/src/main/java/io/deephaven/appmode/ApplicationState.java +++ b/application-mode/src/main/java/io/deephaven/appmode/ApplicationState.java @@ -11,7 +11,11 @@ import java.util.Objects; import java.util.ServiceLoader; -public class ApplicationState { +import io.deephaven.engine.liveness.LivenessReferent; +import io.deephaven.engine.liveness.LivenessScope; +import io.deephaven.engine.updategraph.DynamicNode; + +public class ApplicationState extends LivenessScope { public interface Factory { @@ -75,12 +79,17 @@ public synchronized void setField(String name, T value, String description) } public synchronized void setField(Field field) { - Field oldField = fields.remove(field.name()); - if (oldField != null) { - listener.onRemoveField(this, field); + // manage the new value before release the old value + final Object newValue = field.value(); + if ((newValue instanceof LivenessReferent) && DynamicNode.notDynamicOrIsRefreshing(newValue)) { + manage((LivenessReferent) newValue); } - listener.onNewField(this, field); + + // remove and release the old value + removeField(field.name()); + fields.put(field.name(), field); + listener.onNewField(this, field); } public synchronized void setFields(Field... fields) { @@ -97,6 +106,10 @@ public synchronized void removeField(String name) { Field field = fields.remove(name); if (field != null) { listener.onRemoveField(this, field); + Object oldValue = field.value(); + if ((oldValue instanceof LivenessReferent) && DynamicNode.notDynamicOrIsRefreshing(oldValue)) { + unmanage((LivenessReferent) oldValue); + } } } diff --git a/engine/updategraph/build.gradle b/engine/updategraph/build.gradle index 1194f5ed0a9..538a007fc60 100644 --- a/engine/updategraph/build.gradle +++ b/engine/updategraph/build.gradle @@ -6,6 +6,7 @@ plugins { description 'Engine Update Graph: Core utilities for maintaining a DAG for update processing' dependencies { + api project(':Util') api project(':qst') implementation project(':engine-chunk') diff --git a/plugin/gc-app/src/main/java/io/deephaven/app/GcApplication.java b/plugin/gc-app/src/main/java/io/deephaven/app/GcApplication.java index 3a2daf65cb7..f7e1b8513b2 100644 --- a/plugin/gc-app/src/main/java/io/deephaven/app/GcApplication.java +++ b/plugin/gc-app/src/main/java/io/deephaven/app/GcApplication.java @@ -4,12 +4,9 @@ import io.deephaven.appmode.ApplicationState; import io.deephaven.appmode.ApplicationState.Listener; import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.liveness.LivenessScope; -import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.sources.ring.RingTableTools; import io.deephaven.stream.StreamToBlinkTableAdapter; -import io.deephaven.util.SafeCloseable; import javax.management.ListenerNotFoundException; import javax.management.Notification; @@ -46,14 +43,12 @@ public final class GcApplication implements ApplicationState.Factory, Notificati private static final String POOLS = "pools"; private static final String POOLS_STATS = "pools_stats"; - private static final String ENABLED = "io.deephaven.app.GcApplication.enabled"; - private static final String NOTIFICATION_INFO_ENABLED = "io.deephaven.app.GcApplication.notification_info.enabled"; - private static final String NOTIFICATION_INFO_STATS_ENABLED = - "io.deephaven.app.GcApplication.notification_info_stats.enabled"; - private static final String NOTIFICATION_INFO_RING_ENABLED = - "io.deephaven.app.GcApplication.notification_info_ring.enabled"; - private static final String POOLS_ENABLED = "io.deephaven.app.GcApplication.pools.enabled"; - private static final String POOLS_STATS_ENABLED = "io.deephaven.app.GcApplication.pools_stats.enabled"; + private static final String ENABLED = APP_ID + ".enabled"; + private static final String NOTIFICATION_INFO_ENABLED = APP_ID + ".notification_info.enabled"; + private static final String NOTIFICATION_INFO_STATS_ENABLED = APP_ID + ".notification_info_stats.enabled"; + private static final String NOTIFICATION_INFO_RING_ENABLED = APP_ID + ".notification_info_ring.enabled"; + private static final String POOLS_ENABLED = APP_ID + ".pools.enabled"; + private static final String POOLS_STATS_ENABLED = APP_ID + ".pools_stats.enabled"; /** * Looks up the system property {@value ENABLED}, defaults to {@code false}. @@ -113,8 +108,6 @@ public static boolean poolStatsEnabled() { private GcNotificationPublisher notificationInfoPublisher; private GcPoolsPublisher poolsPublisher; - @SuppressWarnings("FieldCanBeLocal") - private LivenessScope scope; @Override public void handleNotification(Notification notification, Object handback) { @@ -146,14 +139,11 @@ public ApplicationState create(Listener listener) { if (!notificationInfoEnabled() && !poolsEnabled()) { return state; } - scope = new LivenessScope(); - try (final SafeCloseable ignored = LivenessScopeStack.open(scope, false)) { - if (notificationInfoEnabled) { - setNotificationInfo(state); - } - if (poolsEnabled) { - setPools(state); - } + if (notificationInfoEnabled) { + setNotificationInfo(state); + } + if (poolsEnabled) { + setPools(state); } install(); return state; diff --git a/server/src/main/java/io/deephaven/server/appmode/ApplicationInjector.java b/server/src/main/java/io/deephaven/server/appmode/ApplicationInjector.java index e5ab2edbbde..25553a696b3 100644 --- a/server/src/main/java/io/deephaven/server/appmode/ApplicationInjector.java +++ b/server/src/main/java/io/deephaven/server/appmode/ApplicationInjector.java @@ -85,7 +85,7 @@ private void loadApplication(final Path applicationDir, final ApplicationConfig log.info().append("Starting application '").append(config.toString()).append('\'').endl(); final ApplicationState app; - try (final SafeCloseable ignored = LivenessScopeStack.open(); + try (final SafeCloseable ignored1 = LivenessScopeStack.open(); final SafeCloseable ignored2 = scriptSessionProvider.get().getExecutionContext().open()) { app = ApplicationFactory.create(applicationDir, config, scriptSessionProvider.get(), applicationListener); } diff --git a/server/src/main/java/io/deephaven/server/appmode/ApplicationTicketResolver.java b/server/src/main/java/io/deephaven/server/appmode/ApplicationTicketResolver.java index e67a2f27447..7aaa4a4aaad 100644 --- a/server/src/main/java/io/deephaven/server/appmode/ApplicationTicketResolver.java +++ b/server/src/main/java/io/deephaven/server/appmode/ApplicationTicketResolver.java @@ -84,7 +84,7 @@ private SessionState.ExportObject resolve(final AppFieldId id, final Stri } Object value = authorization.transform(field.value()); // noinspection unchecked - return SessionState.wrapAsExport((T) value); + return SessionState.wrapAsExport((T) value, "ApplicationTicketResolver#resolve"); } @Override @@ -113,7 +113,7 @@ public SessionState.ExportObject flightInfoFor( } } - return SessionState.wrapAsExport(info); + return SessionState.wrapAsExport(info, "ApplicationTicketResolver#flightInfoFor"); } @Override diff --git a/server/src/main/java/io/deephaven/server/appmode/ApplicationsModule.java b/server/src/main/java/io/deephaven/server/appmode/ApplicationsModule.java index 616cd217607..e2725d341c2 100644 --- a/server/src/main/java/io/deephaven/server/appmode/ApplicationsModule.java +++ b/server/src/main/java/io/deephaven/server/appmode/ApplicationsModule.java @@ -8,6 +8,7 @@ import dagger.multibindings.IntoSet; import io.deephaven.app.GcApplication; import io.deephaven.appmode.ApplicationState; +import io.deephaven.server.grpc_api_app.GrpcApiApplication; @Module public interface ApplicationsModule { @@ -17,4 +18,10 @@ public interface ApplicationsModule { static ApplicationState.Factory providesGcApplication() { return new GcApplication(); } + + @Provides + @IntoSet + static ApplicationState.Factory providesGrpcApiApplication() { + return new GrpcApiApplication(); + } } diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index 3b33ad46e57..222fd364166 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -53,8 +53,6 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.util.*; import static io.deephaven.extensions.barrage.util.BarrageUtil.DEFAULT_SNAPSHOT_DESER_OPTIONS; @@ -80,6 +78,7 @@ public static void DoGetCustom( final long queueStartTm = System.nanoTime(); session.nonExport() + .description("FlightService#DoGet") .require(export) .onError(observer) .submit(() -> { @@ -154,6 +153,7 @@ public void onNext(final InputStream request) { flightDescriptor = mi.descriptor; resultExportBuilder = ticketRouter .publish(session, mi.descriptor, "Flight.Descriptor", null) + .description("ArrowFlight#DoPut") .onError(observer); } } @@ -463,6 +463,7 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) { final long queueStartTm = System.nanoTime(); session.nonExport() + .description("FlightService#DoExchange(snapshot)") .require(parent) .onError(listener) .submit(() -> { @@ -563,6 +564,7 @@ public void handleMessage(@NotNull final MessageInfo message) { synchronized (this) { onExportResolvedContinuation = session.nonExport() + .description("FlightService#DoExchange(subscription)") .require(parent) .onErrorHandler(DoExchangeMarshaller.this::onError) .submit(() -> onExportResolved(parent)); diff --git a/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java index 66a7de0d37a..6a29a66820c 100644 --- a/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java @@ -175,6 +175,7 @@ public void getFlightInfo( if (session != null) { session.nonExport() + .description("FlightService#getFlightInfo") .require(export) .onError(responseObserver) .submit(() -> { @@ -209,6 +210,7 @@ public void getSchema( if (session != null) { session.nonExport() + .description("FlightService#getSchema") .require(export) .onError(responseObserver) .submit(() -> { diff --git a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java index e99e14f1efb..70900956d79 100644 --- a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java @@ -128,6 +128,7 @@ public void startConsole( } session.newExport(request.getResultId(), "resultId") + .description("ConsoleService#startConsole") .onError(responseObserver) .submit(() -> { final ScriptSession scriptSession = new DelegatingScriptSession(scriptSessionProvider.get()); @@ -167,25 +168,33 @@ public void executeCommand( SessionState.ExportObject exportedConsole = ticketRouter.resolve(session, consoleId, "consoleId"); session.nonExport() + .description("ConsoleService#executeCommand") .requiresSerialQueue() .require(exportedConsole) .onError(responseObserver) .submit(() -> { - ScriptSession scriptSession = exportedConsole.get(); - ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode()); - ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder(); - FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder(); - changes.created.entrySet() - .forEach(entry -> fieldChanges.addCreated(makeVariableDefinition(entry))); - changes.updated.entrySet() - .forEach(entry -> fieldChanges.addUpdated(makeVariableDefinition(entry))); - changes.removed.entrySet() - .forEach(entry -> fieldChanges.addRemoved(makeVariableDefinition(entry))); - if (changes.error != null) { - diff.setErrorMessage(Throwables.getStackTraceAsString(changes.error)); - log.error().append("Error running script: ").append(changes.error).endl(); + // since we are on the serial queue, we can safely stash our session id someplace accessible + SessionService.CURRENT_SESSION_ID = session.getSessionId(); + + try { + ScriptSession scriptSession = exportedConsole.get(); + ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode()); + ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder(); + FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder(); + changes.created.entrySet() + .forEach(entry -> fieldChanges.addCreated(makeVariableDefinition(entry))); + changes.updated.entrySet() + .forEach(entry -> fieldChanges.addUpdated(makeVariableDefinition(entry))); + changes.removed.entrySet() + .forEach(entry -> fieldChanges.addRemoved(makeVariableDefinition(entry))); + if (changes.error != null) { + diff.setErrorMessage(Throwables.getStackTraceAsString(changes.error)); + log.error().append("Error running script: ").append(changes.error).endl(); + } + safelyComplete(responseObserver, diff.setChanges(fieldChanges).build()); + } finally { + SessionService.CURRENT_SESSION_ID = "NO SESSION IN USE"; } - safelyComplete(responseObserver, diff.setChanges(fieldChanges).build()); }); } @@ -244,6 +253,7 @@ public void bindTableToVariable( final SessionState.ExportObject exportedConsole; ExportBuilder exportBuilder = session.nonExport() + .description("ConsoleService#bindTableToVariable") .requiresSerialQueue() .onError(responseObserver); diff --git a/server/src/main/java/io/deephaven/server/console/ScopeTicketResolver.java b/server/src/main/java/io/deephaven/server/console/ScopeTicketResolver.java index 585b5534b67..89b2eef5875 100644 --- a/server/src/main/java/io/deephaven/server/console/ScopeTicketResolver.java +++ b/server/src/main/java/io/deephaven/server/console/ScopeTicketResolver.java @@ -8,6 +8,7 @@ import io.deephaven.base.string.EncodingInfo; import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.liveness.LivenessReferent; +import io.deephaven.engine.liveness.LivenessStateException; import io.deephaven.engine.table.Table; import io.deephaven.engine.updategraph.DynamicNode; import io.deephaven.engine.util.ScriptSession; @@ -75,7 +76,7 @@ public SessionState.ExportObject flightInfoFor( "Could not resolve '" + logId + "': no variable exists with name '" + scopeName + "'"); }); - return SessionState.wrapAsExport(flightInfo); + return SessionState.wrapAsExport(flightInfo, "ScopeTicketResolver#flightInfoFor"); } @Override @@ -106,6 +107,7 @@ private SessionState.ExportObject resolve( // fetch the variable from the scope right now T export = gss.getExecutionContext().getUpdateGraph().sharedLock().computeLocked(() -> { T scopeVar = null; + // note we do not need to synchronize on gss here because it cannot change while holding the ugp lock try { // noinspection unchecked scopeVar = (T) gss.unwrapObject(gss.getVariable(scopeName)); @@ -119,9 +121,20 @@ private SessionState.ExportObject resolve( if (export == null) { return SessionState.wrapAsFailedExport(Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not resolve '" + logId + "': no variable exists with name '" + scopeName + "'")); + } else if (export instanceof SessionState.ExportObject) { + // noinspection unchecked + final SessionState.ExportObject asExportObject = (SessionState.ExportObject) export; + // ensure the result is live until the caller uses it + try { + asExportObject.manageWithCurrentScope(); + return asExportObject; + } catch (LivenessStateException ignored) { + return SessionState.wrapAsFailedExport(Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, + "Could not resolve '" + logId + "': no variable exists with name '" + scopeName + "'")); + } } - return SessionState.wrapAsExport(export); + return SessionState.wrapAsExport(export, "ScopeTicketResolver#resolve"); } @Override @@ -153,11 +166,16 @@ private SessionState.ExportBuilder publish( final SessionState.ExportObject resultExport = resultBuilder.getExport(); final SessionState.ExportBuilder publishTask = session.nonExport(); + // if we receive requests to read from this variable before the client has finished publishing, we will + // give the user the result export object to wait on as a dependency. + final ScriptSession gss = scriptSessionProvider.get(); + gss.setVariable(varName, resultExport); + publishTask + .description("ScopeTicketResolver#publish(" + varName + ")") .requiresSerialQueue() .require(resultExport) .submit(() -> { - final ScriptSession gss = scriptSessionProvider.get(); T value = resultExport.get(); if (value instanceof LivenessReferent && DynamicNode.notDynamicOrIsRefreshing(value)) { gss.manage((LivenessReferent) value); @@ -168,6 +186,8 @@ private SessionState.ExportBuilder publish( } }); + publishTask.setStateToPublishing(); + resultBuilder.setStateToPublishing(); return resultBuilder; } diff --git a/server/src/main/java/io/deephaven/server/grpc_api_app/GrpcApiApplication.java b/server/src/main/java/io/deephaven/server/grpc_api_app/GrpcApiApplication.java new file mode 100644 index 00000000000..65071057a38 --- /dev/null +++ b/server/src/main/java/io/deephaven/server/grpc_api_app/GrpcApiApplication.java @@ -0,0 +1,78 @@ +package io.deephaven.server.grpc_api_app; + +import io.deephaven.appmode.ApplicationState; +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.sources.ring.RingTableTools; +import io.deephaven.engine.util.TableTools; +import io.deephaven.proto.backplane.grpc.ExportNotification; +import io.deephaven.server.session.SessionState; +import io.deephaven.stream.StreamToBlinkTableAdapter; +import io.grpc.stub.StreamObserver; + +/** + * The {@value APP_NAME}, application id {@value APP_ID}, produces stream {@link io.deephaven.engine.table.Table tables} + * {@value NOTIFICATION_INFO}; and derived table {@value NOTIFICATION_INFO_RING}. This data is modeled after the + * {@link ExportNotification} event information from {@link SessionState#addExportListener(StreamObserver)}. + * + * @see #ENABLED + * @see #RING_SIZE + */ +public final class GrpcApiApplication implements ApplicationState.Factory { + private static final String APP_ID = "io.deephaven.server.grpc_api_app.GrpcApiApplication"; + private static final String APP_NAME = "GRPC API Application"; + private static final String NOTIFICATION_INFO = "session_export_notification_info"; + private static final String NOTIFICATION_INFO_RING = "session_export_notification_info_ring"; + + private static final String ENABLED = "enabled"; + private static final String RING_SIZE = "ringSize"; + + private static Table blinkTable = TableTools.emptyTable(0); + + public static Table getBlinkTable() { + return blinkTable; + } + + /** + * Looks up the system property {@value ENABLED}, defaults to {@code false}. + * + * @return if the gRPC API application is enabled + */ + private static boolean enabled() { + return "true".equals(System.getProperty(APP_ID + "." + ENABLED)); + } + + /** + * Looks up the system property {@value RING_SIZE}, defaults to {@code 1024}. The + * {@value NOTIFICATION_INFO_RING} table is disabled when {@code 0} or less. + * + * @return the {@value NOTIFICATION_INFO_RING} table size + */ + private static int ringSize() { + return Integer.getInteger(APP_ID + "." + RING_SIZE, 1024); + } + + @Override + public ApplicationState create(final ApplicationState.Listener listener) { + final ApplicationState state = new ApplicationState(listener, APP_ID, APP_NAME); + if (!enabled()) { + return state; + } + + final SessionStateExportObjectUpdateStreamPublisher updateStream = new SessionStateExportObjectUpdateStreamPublisher(); + SessionState.LISTENER = updateStream::onExportObjectStateUpdate; + + // noinspection resource + final StreamToBlinkTableAdapter adapter = new StreamToBlinkTableAdapter( + SessionStateExportObjectUpdateStreamPublisher.definition(), updateStream, + ExecutionContext.getContext().getUpdateGraph(), NOTIFICATION_INFO); + blinkTable = adapter.table(); + state.setField(NOTIFICATION_INFO, blinkTable); + + final int ringSize = ringSize(); + if (ringSize > 0) { + state.setField(NOTIFICATION_INFO_RING, RingTableTools.of(blinkTable, ringSize)); + } + return state; + } +} diff --git a/server/src/main/java/io/deephaven/server/grpc_api_app/SessionStateExportObjectUpdateStreamPublisher.java b/server/src/main/java/io/deephaven/server/grpc_api_app/SessionStateExportObjectUpdateStreamPublisher.java new file mode 100644 index 00000000000..24e34c7d84b --- /dev/null +++ b/server/src/main/java/io/deephaven/server/grpc_api_app/SessionStateExportObjectUpdateStreamPublisher.java @@ -0,0 +1,84 @@ +package io.deephaven.server.grpc_api_app; + +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource; +import io.deephaven.proto.backplane.grpc.ExportNotification; +import io.deephaven.qst.type.Type; +import io.deephaven.stream.StreamChunkUtils; +import io.deephaven.stream.StreamConsumer; +import io.deephaven.stream.StreamPublisher; +import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.BooleanUtils; +import org.jetbrains.annotations.NotNull; + +import java.util.List; +import java.util.Objects; + +final class SessionStateExportObjectUpdateStreamPublisher implements StreamPublisher { + + private static final TableDefinition DEFINITION = TableDefinition.of( + ColumnDefinition.ofTime("Time"), + ColumnDefinition.ofString("SessionId"), + ColumnDefinition.ofString("ExportId"), + ColumnDefinition.ofBoolean("IsNonExport"), + ColumnDefinition.ofString("State"), + ColumnDefinition.ofBoolean("IsLive"), + ColumnDefinition.ofString("Description"), + ColumnDefinition.of("Dependencies", Type.find(String[].class))); + private static final int CHUNK_SIZE = ArrayBackedColumnSource.BLOCK_SIZE; + + public static TableDefinition definition() { + return DEFINITION; + } + + private WritableChunk[] chunks; + private StreamConsumer consumer; + + SessionStateExportObjectUpdateStreamPublisher() { + chunks = StreamChunkUtils.makeChunksForDefinition(DEFINITION, CHUNK_SIZE); + } + + @Override + public void register(@NotNull StreamConsumer consumer) { + if (this.consumer != null) { + throw new IllegalStateException("Can not register multiple StreamConsumers."); + } + this.consumer = Objects.requireNonNull(consumer); + } + + public synchronized void onExportObjectStateUpdate( + String sessionId, String exportId, String description, ExportNotification.State state, boolean isLive, + boolean isNonExport, List remainingParents) { + chunks[0].asWritableLongChunk().add(DateTimeUtils.epochNanos(DateTimeUtils.now())); + chunks[1].asWritableObjectChunk().add(sessionId); + chunks[2].asWritableObjectChunk().add(exportId); + chunks[3].asWritableByteChunk().add(BooleanUtils.booleanAsByte(isNonExport)); + chunks[4].asWritableObjectChunk().add(state.name()); + chunks[5].asWritableByteChunk().add(BooleanUtils.booleanAsByte(isLive)); + chunks[6].asWritableObjectChunk().add(description); + chunks[7].asWritableObjectChunk().add(remainingParents.toArray(String[]::new)); + + if (chunks[0].size() == CHUNK_SIZE) { + flushInternal(); + } + } + + @Override + public synchronized void flush() { + if (chunks[0].size() == 0) { + return; + } + flushInternal(); + } + + @Override + public void shutdown() {} + + private void flushInternal() { + consumer.accept(chunks); + chunks = StreamChunkUtils.makeChunksForDefinition(DEFINITION, CHUNK_SIZE); + } +} diff --git a/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java index ec5ea4fa947..78312970ffa 100644 --- a/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java @@ -269,6 +269,7 @@ public void fetchObject( final SessionState.ExportObject object = ticketRouter.resolve( session, request.getSourceId().getTicket(), "sourceId"); session.nonExport() + .description("ObjectService#fetchObject") .require(object) .onError(responseObserver) .submit(() -> { @@ -366,7 +367,8 @@ public void onData(ByteBuffer message, Object[] references) throws ObjectCommuni for (Object reference : references) { final String type = typeLookup.type(reference).orElse(null); - final ExportObject exportObject = sessionState.newServerSideExport(reference); + final ExportObject exportObject = + sessionState.newServerSideExport(reference, "pluginMessage#ondata"); exports.add(exportObject); TypedTicket typedTicket = ticketForExport(exportObject, type); payload.addExportedReferences(typedTicket); diff --git a/server/src/main/java/io/deephaven/server/partitionedtable/PartitionedTableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/partitionedtable/PartitionedTableServiceGrpcImpl.java index 91effc96849..2c006ea6af8 100644 --- a/server/src/main/java/io/deephaven/server/partitionedtable/PartitionedTableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/partitionedtable/PartitionedTableServiceGrpcImpl.java @@ -59,6 +59,7 @@ public void partitionBy( ticketRouter.resolve(session, request.getTableId(), "tableId"); session.newExport(request.getResultId(), "resultId") + .description("PartitionedTableService#partitionBy") .require(targetTable) .onError(responseObserver) .submit(() -> { @@ -81,6 +82,7 @@ public void merge( ticketRouter.resolve(session, request.getPartitionedTable(), "partitionedTable"); session.newExport(request.getResultId(), "resultId") + .description("PartitionedTableService#merge") .require(partitionedTable) .onError(responseObserver) .submit(() -> { @@ -113,6 +115,7 @@ public void getTable( ticketRouter.resolve(session, request.getKeyTableTicket(), "keyTableTicket"); session.newExport(request.getResultId(), "resultId") + .description("PartitionedTableService#getTable") .require(partitionedTable, keys) .onError(responseObserver) .submit(() -> { diff --git a/server/src/main/java/io/deephaven/server/session/ExportTicketResolver.java b/server/src/main/java/io/deephaven/server/session/ExportTicketResolver.java index 126d1fba760..9d18f8ae84e 100644 --- a/server/src/main/java/io/deephaven/server/session/ExportTicketResolver.java +++ b/server/src/main/java/io/deephaven/server/session/ExportTicketResolver.java @@ -47,6 +47,7 @@ public SessionState.ExportObject flightInfoFor( final SessionState.ExportObject export = resolve(session, descriptor, logId); return session.nonExport() + .description("ExportTicketResolver#flightInfoFor") .require(export) .submit(() -> { if (export.get() instanceof Table) { @@ -94,6 +95,7 @@ public SessionState.ExportBuilder publish( @Nullable final Runnable onPublish) { final SessionState.ExportBuilder toPublish = session.newExport(ExportTicketHelper.ticketToExportId(ticket, logId)); + toPublish.setStateToPublishing(); if (onPublish != null) { session.nonExport() .require(toPublish.getExport()) @@ -110,6 +112,7 @@ public SessionState.ExportBuilder publish( @Nullable final Runnable onPublish) { final SessionState.ExportBuilder toPublish = session.newExport(FlightExportTicketHelper.descriptorToExportId(descriptor, logId)); + toPublish.setStateToPublishing(); if (onPublish != null) { session.nonExport() .require(toPublish.getExport()) diff --git a/server/src/main/java/io/deephaven/server/session/SessionService.java b/server/src/main/java/io/deephaven/server/session/SessionService.java index 52bcf46f2e1..a990171e735 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionService.java +++ b/server/src/main/java/io/deephaven/server/session/SessionService.java @@ -41,6 +41,9 @@ @Singleton public class SessionService { + // This is set and unset via ConsoleSericeGrpcImpl#executeCommand under the serial executor. + public static volatile String CURRENT_SESSION_ID; + private static final Logger log = LoggerFactory.getLogger(SessionService.class); /** diff --git a/server/src/main/java/io/deephaven/server/session/SessionServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/session/SessionServiceGrpcImpl.java index cd4ce627a59..b49298434cf 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/session/SessionServiceGrpcImpl.java @@ -168,6 +168,7 @@ public void exportFromTicket( final SessionState.ExportObject source = ticketRouter.resolve( session, request.getSourceId(), "sourceId"); session.newExport(request.getResultId(), "resultId") + .description("SessionService#exportFromTicket") .require(source) .onError(responseObserver) .submit(() -> { diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index 2835627f091..9482403df44 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -32,6 +32,7 @@ import io.deephaven.proto.flight.util.FlightExportTicketHelper; import io.deephaven.proto.util.Exceptions; import io.deephaven.proto.util.ExportTicketHelper; +import io.deephaven.server.grpc_api_app.GrpcApiApplication; import io.deephaven.server.util.Scheduler; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.util.SafeCloseable; @@ -49,7 +50,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -57,6 +57,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.stream.Collectors; import static io.deephaven.base.log.LogOutput.MILLIS_FROM_EPOCH_FORMATTER; import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete; @@ -90,9 +91,19 @@ * */ public class SessionState { + public interface Listener { + void onExportObjectStateChange( + String sessionId, String exportId, String description, ExportNotification.State state, boolean isLive, + boolean isNonExport, List remainingParents); + } + + public static Listener LISTENER = null; + // Some work items will be dependent on other exports, but do not export anything themselves. public static final int NON_EXPORT_ID = 0; + private static final String EXPORT_DESCRIPTION_NOT_SET = "{no description set}"; + @AssistedFactory public interface Factory { SessionState create(AuthContext authContext); @@ -102,11 +113,14 @@ public interface Factory { * Wrap an object in an ExportObject to make it conform to the session export API. * * @param export the object to wrap + * @param description a terse description of the export * @param the type of the object * @return a sessionless export object */ - public static ExportObject wrapAsExport(final T export) { - return new ExportObject<>(export); + public static ExportObject wrapAsExport(final T export, final String description) { + ExportObject exportObject = new ExportObject<>(export); + exportObject.setDescription(description); + return exportObject; } /** @@ -229,7 +243,14 @@ public SessionService.TokenExpiration getExpiration() { } /** - * @return whether or not this session is expired + * @return the session id + */ + public String getSessionId() { + return sessionId; + } + + /** + * @return whether this session is expired */ public boolean isExpired() { final SessionService.TokenExpiration currToken = expiration; @@ -332,10 +353,11 @@ public ExportObject getExportIfExists(final Ticket ticket, final String l * known in advance by the requesting client. * * @param export the result of the export + * @param description a terse description of the export * @param the export type * @return the ExportObject for this item for ease of access to the export */ - public ExportObject newServerSideExport(final T export) { + public ExportObject newServerSideExport(final T export, final String description) { if (isExpired()) { throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, "session has expired"); } @@ -345,6 +367,7 @@ public ExportObject newServerSideExport(final T export) { // noinspection unchecked final ExportObject result = (ExportObject) exportMap.putIfAbsent(exportId, EXPORT_OBJECT_VALUE_FACTORY); result.setResult(export); + result.setDescription(description); return result; } @@ -533,6 +556,9 @@ public final static class ExportObject extends LivenessArtifact { /** Indicates whether this export has already been well defined. This prevents export object reuse. */ private boolean hasHadWorkSet = false; + /** a terse description of the export to aid in client-side debugging */ + private String description = EXPORT_DESCRIPTION_NOT_SET; + /** This indicates whether or not this export should use the serial execution queue. */ private boolean requiresSerialQueue; @@ -636,6 +662,19 @@ private synchronized void setDependencies(final List> parents) { } } + /** + * Sets the description of the export. The description is to aid in debugging and is visible on the + * {@link GrpcApiApplication#getBlinkTable()} stream table. + * + * @param description a terse description of the export + */ + private synchronized void setDescription(final String description) { + if (description != null) { + this.description = description; + updateStreamListener(); + } + } + /** * Sets the dependencies and initializes the relevant data structures to include this export as a child for * each. @@ -770,6 +809,7 @@ private synchronized void setState(final ExportNotification.State state) { } this.state = state; + updateStreamListener(); // Send an export notification before possibly notifying children of our state change. if (exportId != NON_EXPORT_ID) { log.debug().append(session.logPrefix).append("export '").append(logIdentity) @@ -816,6 +856,23 @@ private synchronized void setState(final ExportNotification.State state) { } } + private void updateStreamListener() { + Assert.holdsLock(this, "must hold lock to export object"); + if (LISTENER == null) { + return; + } + final boolean isLive = tryRetainReference(); + List parentIds = parents.stream() + .map(p -> p.logIdentity) + .collect(Collectors.toList()); + final String sessionId = session == null ? "session-less" : session.sessionId; + LISTENER.onExportObjectStateChange( + sessionId, logIdentity, description, state, isLive, isNonExport(), parentIds); + if (isLive) { + dropReference(); + } + } + /** * Decrements parent counter and kicks off the export if that was the last dependency. * @@ -869,6 +926,9 @@ private void onResolveOne(@Nullable final ExportObject parent) { final int newDepCount = DEPENDENT_COUNT_UPDATER.decrementAndGet(this); if (newDepCount > 0) { + synchronized (this) { + updateStreamListener(); + } return; // either more dependencies to wait for or this export has already failed } Assert.eqZero(newDepCount, "newDepCount"); @@ -1018,7 +1078,9 @@ public synchronized void release() { } setState(ExportNotification.State.RELEASED); } else if (!isExportStateTerminal(state)) { - session.nonExport().require(this).submit(this::release); + session.nonExport() + .description("SessionState#release") + .require(this).submit(this::release); } } @@ -1047,6 +1109,7 @@ protected synchronized void destroy() { if (!(caughtException instanceof StatusRuntimeException)) { caughtException = null; } + updateStreamListener(); } /** @@ -1186,7 +1249,7 @@ private void initialize(final int versionId) { notify(ExportNotification.newBuilder() .setTicket(ExportTicketHelper.wrapExportIdInTicket(NON_EXPORT_ID)) .setExportState(ExportNotification.State.EXPORTED) - .setContext("run is complete") + .setContext("export listener refresh is complete") .build()); log.debug().append(logPrefix).append("run complete for listener ").append(id).endl(); } @@ -1351,6 +1414,11 @@ public ExportBuilder onError(StreamObserver streamObserver) { }); } + public ExportBuilder description(final String description) { + export.setDescription(description); + return this; + } + /** * This method is the final method for submitting an export to the session. The provided callable is enqueued on * the scheduler when all dependencies have been satisfied. Only the dependencies supplied to the builder are @@ -1400,6 +1468,16 @@ public ExportObject getExport() { public int getExportId() { return exportId; } + + public void setStateToPublishing() { + synchronized (export) { + if (export.getState() != ExportNotification.State.UNKNOWN) { + throw new IllegalStateException( + "cannot set state to publishing, state is already: " + export.getState()); + } + export.setState(ExportNotification.State.PUBLISHING); + } + } } private static final KeyedIntObjectKey> EXPORT_OBJECT_ID_KEY = diff --git a/server/src/main/java/io/deephaven/server/table/inputtables/InputTableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/table/inputtables/InputTableServiceGrpcImpl.java index 4a52597c444..c42c19edea2 100644 --- a/server/src/main/java/io/deephaven/server/table/inputtables/InputTableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/table/inputtables/InputTableServiceGrpcImpl.java @@ -58,6 +58,7 @@ public void addTableToInputTable( ticketRouter.resolve(session, request.getTableToAdd(), "tableToAdd"); session.nonExport() + .description("InputTableServiceGrpcImpl#addTableToInputTable") .requiresSerialQueue() .onError(responseObserver) .require(targetTable, tableToAddExport) @@ -106,6 +107,7 @@ public void deleteTableFromInputTable( ticketRouter.resolve(session, request.getTableToRemove(), "tableToDelete"); session.nonExport() + .description("InputTableServiceGrpcImpl#deleteTableFromInputTable") .requiresSerialQueue() .onError(responseObserver) .require(targetTable, tableToDeleteExport) diff --git a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java index 16d41bfef6f..b72d0cd43a9 100644 --- a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java @@ -573,6 +573,7 @@ public void getExportedTableCreationResponse( final SessionState.ExportObject export = ticketRouter.resolve(session, request, "request"); session.nonExport() + .description("TableService#getExportedTableCreationResponse") .require(export) .onError(responseObserver) .submit(() -> { @@ -617,6 +618,7 @@ private void oneShotOperationWrapper( .collect(Collectors.toList()); session.newExport(resultId, "resultId") + .description("TableService#" + op.name()) .require(dependencies) .onError(responseObserver) .submit(() -> { @@ -661,6 +663,7 @@ private BatchExportBuilder createBatchExportBuilder(SessionState session, final Ticket resultId = operation.getResultTicket(request); final ExportBuilder
exportBuilder = resultId.getTicket().isEmpty() ? session.nonExport() : session.newExport(resultId, "resultId"); + exportBuilder.description("TableService#" + op.getOpCase().name() + "(batch)"); return new BatchExportBuilder<>(operation, request, exportBuilder); } diff --git a/server/src/main/java/io/deephaven/server/util/GrpcServiceOverrideBuilder.java b/server/src/main/java/io/deephaven/server/util/GrpcServiceOverrideBuilder.java index 8e0dc7868da..29c87fbaa87 100644 --- a/server/src/main/java/io/deephaven/server/util/GrpcServiceOverrideBuilder.java +++ b/server/src/main/java/io/deephaven/server/util/GrpcServiceOverrideBuilder.java @@ -243,6 +243,7 @@ public void invokeOpen( // if this isn't a half-close, we should export it for later calls - if it is, the client won't send // more messages session.newExport(streamData.getRpcTicket(), "rpcTicket") + .description("BrowserStream#Open") // not setting an onError here, failure can only happen if the session ends .submit(() -> browserStream); } @@ -262,6 +263,7 @@ public void invokeNext( session.getExport(streamData.getRpcTicket(), "rpcTicket"); session.nonExport() + .description("BrowserStream#Next") .require(browserStream) .onError(responseObserver) .submit(() -> { diff --git a/server/src/test/java/io/deephaven/server/session/SessionStateTest.java b/server/src/test/java/io/deephaven/server/session/SessionStateTest.java index d77de8205ac..17baaacf611 100644 --- a/server/src/test/java/io/deephaven/server/session/SessionStateTest.java +++ b/server/src/test/java/io/deephaven/server/session/SessionStateTest.java @@ -107,7 +107,7 @@ public void testServerExportDestroyOnExportRelease() { final CountingLivenessReferent export = new CountingLivenessReferent(); final SessionState.ExportObject exportObj; try (final SafeCloseable ignored = LivenessScopeStack.open()) { - exportObj = session.newServerSideExport(export); + exportObj = session.newServerSideExport(export, "test"); } // better have ref count @@ -154,7 +154,7 @@ public void testServerExportDestroyOnSessionRelease() { final CountingLivenessReferent export = new CountingLivenessReferent(); final SessionState.ExportObject exportObj; try (final SafeCloseable ignored = LivenessScopeStack.open()) { - exportObj = session.newServerSideExport(export); + exportObj = session.newServerSideExport(export, "test"); } // better have ref count @@ -484,7 +484,7 @@ public void testExpiredNewNonExport() { @Test public void testExpiredServerSideExport() { final CountingLivenessReferent export = new CountingLivenessReferent(); - final SessionState.ExportObject exportObj = session.newServerSideExport(export); + final SessionState.ExportObject exportObj = session.newServerSideExport(export, "test"); session.onExpired(); expectException(StatusRuntimeException.class, exportObj::get); } @@ -492,7 +492,7 @@ public void testExpiredServerSideExport() { @Test public void testExpiresBeforeExport() { session.onExpired(); - expectException(StatusRuntimeException.class, () -> session.newServerSideExport(new Object())); + expectException(StatusRuntimeException.class, () -> session.newServerSideExport(new Object(), "test")); expectException(StatusRuntimeException.class, () -> session.nonExport()); expectException(StatusRuntimeException.class, () -> session.newExport(nextExportId++)); expectException(StatusRuntimeException.class, () -> session.getExport(nextExportId++)); @@ -659,7 +659,7 @@ public void testExpiredByTime() { session.updateExpiration( new SessionService.TokenExpiration(UUID.randomUUID(), scheduler.currentTimeMillis(), session)); Assert.eqNull(session.getExpiration(), "session.getExpiration()"); // already expired - expectException(StatusRuntimeException.class, () -> session.newServerSideExport(new Object())); + expectException(StatusRuntimeException.class, () -> session.newServerSideExport(new Object(), "test")); expectException(StatusRuntimeException.class, () -> session.nonExport()); expectException(StatusRuntimeException.class, () -> session.newExport(nextExportId++)); expectException(StatusRuntimeException.class, () -> session.getExport(nextExportId++)); @@ -1207,9 +1207,9 @@ public void testExportListenerNewExportAfterRefreshComplete() { @Test public void testExportListenerServerSideExports() { final QueueingExportListener listener = new QueueingExportListener(); - final SessionState.ExportObject e1 = session.newServerSideExport(session); + final SessionState.ExportObject e1 = session.newServerSideExport(session, "test"); session.addExportListener(listener); - final SessionState.ExportObject e2 = session.newServerSideExport(session); + final SessionState.ExportObject e2 = session.newServerSideExport(session, "test"); listener.validateIsRefreshComplete(1); listener.validateNotificationQueue(e1, EXPORTED); diff --git a/server/src/test/java/io/deephaven/server/table/ExportTableUpdateListenerTest.java b/server/src/test/java/io/deephaven/server/table/ExportTableUpdateListenerTest.java index e6eff349461..25b2002c8c6 100644 --- a/server/src/test/java/io/deephaven/server/table/ExportTableUpdateListenerTest.java +++ b/server/src/test/java/io/deephaven/server/table/ExportTableUpdateListenerTest.java @@ -82,7 +82,7 @@ public void testLifeCycleStaticTable() { // create and export the table final QueryTable src = TstUtils.testTable(RowSetFactory.flat(100).toTracking()); - final SessionState.ExportObject t1 = session.newServerSideExport(src); + final SessionState.ExportObject t1 = session.newServerSideExport(src, "test"); // validate we receive an initial table size update expectSizes(t1.getExportId(), 100); @@ -96,7 +96,7 @@ public void testLifeCycleStaticTable() { public void testRefreshStaticTable() { // create and export the table final QueryTable src = TstUtils.testTable(RowSetFactory.flat(1024).toTracking()); - final SessionState.ExportObject t1 = session.newServerSideExport(src); + final SessionState.ExportObject t1 = session.newServerSideExport(src, "test"); // now add the listener final ExportedTableUpdateListener listener = new ExportedTableUpdateListener(session, observer); @@ -124,7 +124,7 @@ public void testLifeCycleTickingTable() { final QueryTable src = TstUtils.testRefreshingTable(RowSetFactory.flat(42).toTracking()); final SessionState.ExportObject t1; try (final SafeCloseable scope = LivenessScopeStack.open()) { - t1 = session.newServerSideExport(src); + t1 = session.newServerSideExport(src, "test"); } // validate we receive an initial table size update @@ -146,7 +146,7 @@ public void testRefreshTickingTable() { final QueryTable src = TstUtils.testRefreshingTable(RowSetFactory.flat(42).toTracking()); final SessionState.ExportObject t1; try (final SafeCloseable scope = LivenessScopeStack.open()) { - t1 = session.newServerSideExport(src); + t1 = session.newServerSideExport(src, "test"); } // now add the listener @@ -173,7 +173,7 @@ public void testSessionClose() { // create and export the table final QueryTable src = TstUtils.testRefreshingTable(RowSetFactory.flat(42).toTracking()); // create t1 in global query scope - final SessionState.ExportObject t1 = session.newServerSideExport(src); + final SessionState.ExportObject t1 = session.newServerSideExport(src, "test"); // now add the listener final ExportedTableUpdateListener listener = new ExportedTableUpdateListener(session, observer); @@ -205,7 +205,7 @@ public void testPropagatesError() { final QueryTable src = TstUtils.testRefreshingTable(RowSetFactory.flat(42).toTracking()); final SessionState.ExportObject t1; try (final SafeCloseable scope = LivenessScopeStack.open()) { - t1 = session.newServerSideExport(src); + t1 = session.newServerSideExport(src, "test"); } // now add the listener @@ -238,7 +238,7 @@ public void testListenerClosed() { final QueryTable src = TstUtils.testRefreshingTable(RowSetFactory.flat(42).toTracking()); final SessionState.ExportObject t1; try (final SafeCloseable scope = LivenessScopeStack.open()) { - t1 = session.newServerSideExport(src); + t1 = session.newServerSideExport(src, "test"); } // now add the listener @@ -298,7 +298,7 @@ public void testTableSizeUsesPrev() { // Must be off-thread to use concurrent instantiation final Thread thread = new Thread(() -> { try (final SafeCloseable scope = LivenessScopeStack.open()) { - t1.setValue(session.newServerSideExport(src)); + t1.setValue(session.newServerSideExport(src, "test")); } }); thread.start(); diff --git a/server/src/test/java/io/deephaven/server/table/ops/GrpcTableOperationTestBase.java b/server/src/test/java/io/deephaven/server/table/ops/GrpcTableOperationTestBase.java index ea92c579481..8e28b0ee75a 100644 --- a/server/src/test/java/io/deephaven/server/table/ops/GrpcTableOperationTestBase.java +++ b/server/src/test/java/io/deephaven/server/table/ops/GrpcTableOperationTestBase.java @@ -45,7 +45,7 @@ public void tearDown() throws Exception { } public TableReference ref(Table table) { - final ExportObject
export = authenticatedSessionState().newServerSideExport(table); + final ExportObject
export = authenticatedSessionState().newServerSideExport(table, "test"); exports.add(export); return ref(export); }