diff --git a/ClientSupport/src/main/java/io/deephaven/clientsupport/plotdownsampling/RunChartDownsample.java b/ClientSupport/src/main/java/io/deephaven/clientsupport/plotdownsampling/RunChartDownsample.java index f963fb74576..f4d4a765df6 100644 --- a/ClientSupport/src/main/java/io/deephaven/clientsupport/plotdownsampling/RunChartDownsample.java +++ b/ClientSupport/src/main/java/io/deephaven/clientsupport/plotdownsampling/RunChartDownsample.java @@ -109,12 +109,12 @@ public Table apply(final Table wholeTable) { // TODO restore this to support non-QueryTable types // if (wholeTable instanceof BaseTable) { // BaseTable baseTable = (BaseTable) wholeTable; - // final SwapListener swapListener = - // baseTable.createSwapListenerIfRefreshing(SwapListener::new); + // final SimpleSnapshotControl snapshotControl = + // baseTable.createSnapshotControlIfRefreshing(SimpleSnapshotControl::new); // // final Mutable result = new MutableObject<>(); // - // baseTable.initializeWithSnapshot("downsample", swapListener, (prevRequested, beforeClock) -> { + // baseTable.initializeWithSnapshot("downsample", snapshotControl, (prevRequested, beforeClock) -> { // final boolean usePrev = prevRequested && baseTable.isRefreshing(); // final WritableRowSet rowSetToUse = usePrev ? baseTable.build().copyPrev() : baseTable.build(); // @@ -136,20 +136,20 @@ public Table apply(final Table wholeTable) { } private static Table makeDownsampledQueryTable(final QueryTable wholeQueryTable, final DownsampleKey memoKey) { - final SwapListener swapListener = - wholeQueryTable.createSwapListenerIfRefreshing(SwapListener::new); + final SimpleSnapshotControl snapshotControl = + wholeQueryTable.createSnapshotControlIfRefreshing(SimpleSnapshotControl::new); final Mutable result = new MutableObject<>(); - BaseTable.initializeWithSnapshot("downsample", swapListener, (prevRequested, beforeClock) -> { + BaseTable.initializeWithSnapshot("downsample", snapshotControl, (prevRequested, beforeClock) -> { final boolean usePrev = prevRequested && wholeQueryTable.isRefreshing(); final DownsamplerListener downsampleListener = DownsamplerListener.of(wholeQueryTable, memoKey); downsampleListener.init(usePrev); result.setValue(downsampleListener.resultTable); - if (swapListener != null) { - swapListener.setListenerAndResult(downsampleListener, downsampleListener.resultTable); + if (snapshotControl != null) { + snapshotControl.setListenerAndResult(downsampleListener, downsampleListener.resultTable); downsampleListener.resultTable.addParentReference(downsampleListener); } @@ -694,7 +694,7 @@ private void performRescans(final DownsampleChunkContext context) { /** * Indicates that a change has probably happened and we should notify the result table. The contents of the * change will be our state map (i.e. there is - * + * * @param upstream the change that happened upstream * @param lastRowSet the base rowSet to use when considering what items to tell the result table changed. if * this.rowSet, then update it normally, otherwise this.rowSet must be empty and this.rowSet should be diff --git a/engine/api/src/main/java/io/deephaven/engine/table/Table.java b/engine/api/src/main/java/io/deephaven/engine/table/Table.java index 3afd719235e..ac6deca3c66 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/Table.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/Table.java @@ -790,6 +790,16 @@ RollupTable rollup(Collection aggregations, boolean inclu */ void addUpdateListener(TableUpdateListener listener); + /** + * Subscribe for updates to this table. {@code listener} will be invoked via the {@link NotificationQueue} + * associated with this Table. + * + * @param requiredLastNotificationStep fail to add update listener if the last notification step is not this + * @param listener listener for updates + * @return true if the listener was added, false if the last notification step requirement was not met + */ + boolean addUpdateListener(final long requiredLastNotificationStep, final TableUpdateListener listener); + /** * Unsubscribe the supplied listener. * diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AddOnlyToBlinkTableAdapter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AddOnlyToBlinkTableAdapter.java index 8b9c3a1e243..7efa95c9418 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AddOnlyToBlinkTableAdapter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AddOnlyToBlinkTableAdapter.java @@ -51,10 +51,11 @@ public static Table toBlink(@NotNull final Table table) { final MutableObject resultHolder = new MutableObject<>(); final MutableObject listenerHolder = new MutableObject<>(); final BaseTable coalesced = (BaseTable) table.coalesce(); - final SwapListener swapListener = coalesced.createSwapListenerIfRefreshing(SwapListener::new); + final SimpleSnapshotControl snapshotControl = + coalesced.createSnapshotControlIfRefreshing(SimpleSnapshotControl::new); // noinspection DataFlowIssue swapListener cannot be null here, since we know the table is refreshing - ConstructSnapshot.callDataSnapshotFunction("addOnlyToBlink", swapListener.makeSnapshotControl(), + ConstructSnapshot.callDataSnapshotFunction("addOnlyToBlink", snapshotControl, (final boolean usePrev, final long beforeClockValue) -> { // Start with the same rows as the original table final TrackingRowSet resultRowSet = usePrev @@ -69,7 +70,7 @@ public static Table toBlink(@NotNull final Table table) { final AddOnlyToBlinkListener listener = new AddOnlyToBlinkListener(recorder, result); recorder.setMergedListener(listener); result.addParentReference(listener); - swapListener.setListenerAndResult(recorder, result); + snapshotControl.setListenerAndResult(recorder, result); listenerHolder.setValue(listener); resultHolder.setValue(result); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java index 415ddf68670..8c644c7f923 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java @@ -557,7 +557,7 @@ public void addUpdateListener(final ShiftObliviousListener listener, final boole } @Override - public void addUpdateListener(final TableUpdateListener listener) { + public void addUpdateListener(@NotNull final TableUpdateListener listener) { if (isFailed) { throw new IllegalStateException("Can not listen to failed table " + description); } @@ -570,14 +570,38 @@ public void addUpdateListener(final TableUpdateListener listener) { } } + @Override + public boolean addUpdateListener( + final long requiredLastNotificationStep, @NotNull final TableUpdateListener listener) { + if (isFailed) { + throw new IllegalStateException("Can not listen to failed table " + description); + } + + if (!isRefreshing()) { + return false; + } + + synchronized (this) { + if (this.lastNotificationStep != requiredLastNotificationStep) { + return false; + } + + // ensure that listener is in the same update graph if applicable + if (listener instanceof NotificationQueue.Dependency) { + getUpdateGraph((NotificationQueue.Dependency) listener); + } + ensureChildListenerReferences().add(listener); + + return true; + } + } + private SimpleReferenceManager> ensureChildListenerReferences() { // noinspection unchecked return FieldUtils.ensureField(this, CHILD_LISTENER_REFERENCES_UPDATER, EMPTY_CHILD_LISTENER_REFERENCES, () -> new SimpleReferenceManager<>((final TableUpdateListener tableUpdateListener) -> { if (tableUpdateListener instanceof LegacyListenerAdapter) { return (LegacyListenerAdapter) tableUpdateListener; - } else if (tableUpdateListener instanceof SwapListener) { - return ((SwapListener) tableUpdateListener).getReferenceForSource(); } else { return new WeakSimpleReference<>(tableUpdateListener); } @@ -701,12 +725,14 @@ public final void notifyListeners(final TableUpdate update) { validateUpdateOverlaps(update); } - lastNotificationStep = currentStep; - // notify children - final NotificationQueue notificationQueue = getNotificationQueue(); - childListenerReferences.forEach( - (listenerRef, listener) -> notificationQueue.addNotification(listener.getNotification(update))); + synchronized (this) { + lastNotificationStep = currentStep; + + final NotificationQueue notificationQueue = getNotificationQueue(); + childListenerReferences.forEach( + (listenerRef, listener) -> notificationQueue.addNotification(listener.getNotification(update))); + } update.release(); } @@ -813,11 +839,14 @@ public final void notifyListenersOnError(final Throwable e, @Nullable final Tabl isFailed = true; maybeSignal(); - lastNotificationStep = currentStep; - final NotificationQueue notificationQueue = getNotificationQueue(); - childListenerReferences.forEach((listenerRef, listener) -> notificationQueue - .addNotification(listener.getErrorNotification(e, sourceEntry))); + synchronized (this) { + lastNotificationStep = currentStep; + + final NotificationQueue notificationQueue = getNotificationQueue(); + childListenerReferences.forEach((listenerRef, listener) -> notificationQueue + .addNotification(listener.getErrorNotification(e, sourceEntry))); + } } /** @@ -1261,34 +1290,35 @@ public Table setTotalsTable(String directive) { } public static void initializeWithSnapshot( - String logPrefix, SwapListener swapListener, ConstructSnapshot.SnapshotFunction snapshotFunction) { - if (swapListener == null) { + @NotNull final String logPrefix, + @Nullable final ConstructSnapshot.SnapshotControl snapshotControl, + @NotNull final ConstructSnapshot.SnapshotFunction snapshotFunction) { + if (snapshotControl == null) { snapshotFunction.call(false, LogicalClock.NULL_CLOCK_VALUE); return; } - ConstructSnapshot.callDataSnapshotFunction(logPrefix, swapListener.makeSnapshotControl(), snapshotFunction); + ConstructSnapshot.callDataSnapshotFunction(logPrefix, snapshotControl, snapshotFunction); } - public interface SwapListenerFactory { - T newListener(BaseTable sourceTable); + public interface SnapshotControlFactory { + T newControl(BaseTable sourceTable); } /** - * If we are a refreshing table, then we should create a swap listener that listens for updates to this table. - * + * If we are a refreshing table, then we should create a snapshot control to validate the snapshot. + *

* Otherwise, we return null. * - * @return a swap listener for this table (or null) + * @return a snapshot control to snapshot this table (or null) */ @Nullable - public T createSwapListenerIfRefreshing(final SwapListenerFactory factory) { + public T createSnapshotControlIfRefreshing( + final SnapshotControlFactory factory) { if (!isRefreshing()) { return null; } - final T swapListener = factory.newListener(this); - swapListener.subscribeForUpdates(); - return swapListener; + return factory.newControl(this); } // ------------------------------------------------------------------------------------------------------------------ diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/BlinkTableTools.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/BlinkTableTools.java index 549541568f6..0331b70bdbb 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/BlinkTableTools.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/BlinkTableTools.java @@ -69,13 +69,14 @@ private static Table internalBlinkToAppendOnly(final Table blinkTable, long size } final BaseTable baseBlinkTable = (BaseTable) blinkTable.coalesce(); - final SwapListener swapListener = baseBlinkTable.createSwapListenerIfRefreshing(SwapListener::new); + final SimpleSnapshotControl snapshotControl = + baseBlinkTable.createSnapshotControlIfRefreshing(SimpleSnapshotControl::new); // blink tables must tick - Assert.neqNull(swapListener, "swapListener"); + Assert.neqNull(snapshotControl, "snapshotControl"); final Mutable resultHolder = new MutableObject<>(); - ConstructSnapshot.callDataSnapshotFunction("blinkToAppendOnly", swapListener.makeSnapshotControl(), + ConstructSnapshot.callDataSnapshotFunction("blinkToAppendOnly", snapshotControl, (boolean usePrev, long beforeClockValue) -> { final Map> columns = new LinkedHashMap<>(); final Map> columnSourceMap = @@ -119,7 +120,7 @@ private static Table internalBlinkToAppendOnly(final Table blinkTable, long size Assert.leq(result.size(), "result.size()", sizeLimit, "sizeLimit"); - swapListener.setListenerAndResult(new BaseTable.ListenerImpl("streamToAppendOnly", + snapshotControl.setListenerAndResult(new BaseTable.ListenerImpl("streamToAppendOnly", baseBlinkTable, result) { @Override public void onUpdate(TableUpdate upstream) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/NotificationStepReceiver.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/NotificationStepReceiver.java index 5eb4b961a43..31c1396e062 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/NotificationStepReceiver.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/NotificationStepReceiver.java @@ -4,7 +4,7 @@ package io.deephaven.engine.table.impl; /** - * Used by {@link SwapListener swap listeners} to set the notification step of elements in our DAG. + * Used by {@link SimpleSnapshotControl snapshot control} to set the notification step of elements in our DAG. */ public interface NotificationStepReceiver { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 5419f0ced8e..9c50f3d699d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -59,13 +59,10 @@ import io.deephaven.engine.util.*; import io.deephaven.engine.util.systemicmarking.SystemicObject; import io.deephaven.util.annotations.InternalUseOnly; -import io.deephaven.util.annotations.ReferentialIntegrity; import io.deephaven.vector.Vector; -import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker; import io.deephaven.engine.liveness.Liveness; -import io.deephaven.engine.liveness.LivenessReferent; import io.deephaven.engine.table.impl.MemoizedOperationKey.SelectUpdateViewOrUpdateView.Flavor; import io.deephaven.engine.table.impl.by.*; import io.deephaven.engine.table.impl.locations.GroupingProvider; @@ -149,8 +146,8 @@ public Result(@NotNull final T resultNode, */ String getLogPrefix(); - default SwapListener newSwapListener(final QueryTable queryTable) { - return new SwapListener(queryTable); + default SimpleSnapshotControl newSnapshotControl(final QueryTable queryTable) { + return new SimpleSnapshotControl(queryTable); } /** @@ -1214,11 +1211,11 @@ private QueryTable whereInternal(final WhereFilter... filters) { } return memoizeResult(MemoizedOperationKey.filter(filters), () -> { - final SwapListener swapListener = - createSwapListenerIfRefreshing(SwapListener::new); + final SimpleSnapshotControl snapshotControl = + createSnapshotControlIfRefreshing(SimpleSnapshotControl::new); final Mutable result = new MutableObject<>(); - initializeWithSnapshot("where", swapListener, + initializeWithSnapshot("where", snapshotControl, (prevRequested, beforeClock) -> { final boolean usePrev = prevRequested && isRefreshing(); final RowSet rowSetToUse = usePrev ? rowSet.prev() : rowSet; @@ -1298,7 +1295,7 @@ void handleUncaughtException(Exception throwable) { } } - if (swapListener != null) { + if (snapshotControl != null) { final ListenerRecorder recorder = new ListenerRecorder( "where(" + Arrays.toString(filters) + ")", QueryTable.this, filteredTable); @@ -1306,7 +1303,7 @@ void handleUncaughtException(Exception throwable) { log, this, recorder, filteredTable, filters); filteredTable.setWhereListener(whereListener); recorder.setMergedListener(whereListener); - swapListener.setListenerAndResult(recorder, filteredTable); + snapshotControl.setListenerAndResult(recorder, filteredTable); filteredTable.addParentReference(whereListener); } else if (refreshingFilters) { final WhereListener whereListener = new WhereListener( @@ -1648,46 +1645,51 @@ private Table viewOrUpdateView(Flavor flavor, final SelectColumn... viewColumns) updateDescription, sizeForInstrumentation(), () -> { final Mutable

result = new MutableObject<>(); - final SwapListener swapListener = - createSwapListenerIfRefreshing(SwapListener::new); - initializeWithSnapshot(humanReadablePrefix, swapListener, (usePrev, beforeClockValue) -> { - final boolean publishTheseSources = flavor == Flavor.UpdateView; - final SelectAndViewAnalyzerWrapper analyzerWrapper = SelectAndViewAnalyzer.create( - this, SelectAndViewAnalyzer.Mode.VIEW_EAGER, columns, rowSet, - getModifiedColumnSetForUpdates(), publishTheseSources, true, viewColumns); - final SelectColumn[] processedViewColumns = analyzerWrapper.getProcessedColumns() - .toArray(SelectColumn[]::new); - QueryTable queryTable = new QueryTable( - rowSet, analyzerWrapper.getPublishedColumnResources()); - if (swapListener != null) { - final Map effects = analyzerWrapper.calcEffects(); - final TableUpdateListener listener = - new ViewOrUpdateViewListener(updateDescription, this, queryTable, effects); - swapListener.setListenerAndResult(listener, queryTable); - } + final SimpleSnapshotControl snapshotControl = + createSnapshotControlIfRefreshing(SimpleSnapshotControl::new); + initializeWithSnapshot(humanReadablePrefix, snapshotControl, + (usePrev, beforeClockValue) -> { + final boolean publishTheseSources = flavor == Flavor.UpdateView; + final SelectAndViewAnalyzerWrapper analyzerWrapper = + SelectAndViewAnalyzer.create( + this, SelectAndViewAnalyzer.Mode.VIEW_EAGER, columns, rowSet, + getModifiedColumnSetForUpdates(), publishTheseSources, true, + viewColumns); + final SelectColumn[] processedViewColumns = + analyzerWrapper.getProcessedColumns() + .toArray(SelectColumn[]::new); + QueryTable queryTable = new QueryTable( + rowSet, analyzerWrapper.getPublishedColumnResources()); + if (snapshotControl != null) { + final Map effects = analyzerWrapper.calcEffects(); + final TableUpdateListener listener = + new ViewOrUpdateViewListener(updateDescription, this, queryTable, + effects); + snapshotControl.setListenerAndResult(listener, queryTable); + } - propagateFlatness(queryTable); + propagateFlatness(queryTable); - copyAttributes(queryTable, - flavor == Flavor.UpdateView ? CopyAttributeOperation.UpdateView - : CopyAttributeOperation.View); - copySortableColumns(queryTable, processedViewColumns); - if (publishTheseSources) { - maybeCopyColumnDescriptions(queryTable, processedViewColumns); - } else { - maybeCopyColumnDescriptions(queryTable); - } - final SelectAndViewAnalyzerWrapper.UpdateFlavor updateFlavor = - flavor == Flavor.UpdateView - ? SelectAndViewAnalyzerWrapper.UpdateFlavor.UpdateView - : SelectAndViewAnalyzerWrapper.UpdateFlavor.View; - queryTable = analyzerWrapper.applyShiftsAndRemainingColumns( - this, queryTable, updateFlavor); + copyAttributes(queryTable, + flavor == Flavor.UpdateView ? CopyAttributeOperation.UpdateView + : CopyAttributeOperation.View); + copySortableColumns(queryTable, processedViewColumns); + if (publishTheseSources) { + maybeCopyColumnDescriptions(queryTable, processedViewColumns); + } else { + maybeCopyColumnDescriptions(queryTable); + } + final SelectAndViewAnalyzerWrapper.UpdateFlavor updateFlavor = + flavor == Flavor.UpdateView + ? SelectAndViewAnalyzerWrapper.UpdateFlavor.UpdateView + : SelectAndViewAnalyzerWrapper.UpdateFlavor.View; + queryTable = analyzerWrapper.applyShiftsAndRemainingColumns( + this, queryTable, updateFlavor); - result.setValue(queryTable); + result.setValue(queryTable); - return true; - }); + return true; + }); return result.getValue(); })); @@ -1788,10 +1790,10 @@ public Table dropColumns(String... columnNames) { newColumns.remove(columnName); } - final SwapListener swapListener = - createSwapListenerIfRefreshing(SwapListener::new); + final SimpleSnapshotControl snapshotControl = + createSnapshotControlIfRefreshing(SimpleSnapshotControl::new); - initializeWithSnapshot("dropColumns", swapListener, (usePrev, beforeClockValue) -> { + initializeWithSnapshot("dropColumns", snapshotControl, (usePrev, beforeClockValue) -> { final QueryTable resultTable = new QueryTable(rowSet, newColumns); propagateFlatness(resultTable); @@ -1800,7 +1802,7 @@ public Table dropColumns(String... columnNames) { resultTable.getDefinition().getColumnNameMap()::containsKey); maybeCopyColumnDescriptions(resultTable); - if (swapListener != null) { + if (snapshotControl != null) { final ModifiedColumnSet.Transformer mcsTransformer = newModifiedColumnSetTransformer(resultTable, resultTable.getColumnSourceMap().keySet() @@ -1826,7 +1828,7 @@ public void onUpdate(final TableUpdate upstream) { resultTable.notifyListeners(downstream); } }; - swapListener.setListenerAndResult(listener, resultTable); + snapshotControl.setListenerAndResult(listener, resultTable); } result.setValue(resultTable); @@ -3355,16 +3357,17 @@ public QueryTable copy(TableDefinition definition, Predicate shouldCopy) return QueryPerformanceRecorder.withNugget("copy()", sizeForInstrumentation(), () -> { final Mutable result = new MutableObject<>(); - final SwapListener swapListener = createSwapListenerIfRefreshing(SwapListener::new); - initializeWithSnapshot("copy", swapListener, (usePrev, beforeClockValue) -> { + final SimpleSnapshotControl snapshotControl = + createSnapshotControlIfRefreshing(SimpleSnapshotControl::new); + initializeWithSnapshot("copy", snapshotControl, (usePrev, beforeClockValue) -> { final QueryTable resultTable = new CopiedTable(definition, this); propagateFlatness(resultTable); if (shouldCopy != StandardOptions.COPY_NONE) { copyAttributes(resultTable, shouldCopy); } - if (swapListener != null) { + if (snapshotControl != null) { final ListenerImpl listener = new ListenerImpl("copy()", this, resultTable); - swapListener.setListenerAndResult(listener, resultTable); + snapshotControl.setListenerAndResult(listener, resultTable); } result.setValue(resultTable); @@ -3539,23 +3542,22 @@ private T getResultNoMemo(fin return QueryPerformanceRecorder.withNugget(operation.getDescription(), sizeForInstrumentation(), () -> { final Mutable resultTable = new MutableObject<>(); - final SwapListener swapListener; + final SimpleSnapshotControl snapshotControl; if (isRefreshing() && operation.snapshotNeeded()) { - swapListener = operation.newSwapListener(this); - swapListener.subscribeForUpdates(); + snapshotControl = operation.newSnapshotControl(this); } else { - swapListener = null; + snapshotControl = null; } - initializeWithSnapshot(operation.getLogPrefix(), swapListener, (usePrev, beforeClockValue) -> { + initializeWithSnapshot(operation.getLogPrefix(), snapshotControl, (usePrev, beforeClockValue) -> { final Operation.Result result = operation.initialize(usePrev, beforeClockValue); if (result == null) { return false; } resultTable.setValue(result.resultNode); - if (swapListener != null) { - swapListener.setListenerAndResult(Require.neqNull(result.resultListener, "resultListener"), + if (snapshotControl != null) { + snapshotControl.setListenerAndResult(Require.neqNull(result.resultListener, "resultListener"), result.resultNode); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/ReverseOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/ReverseOperation.java index 007e9c0b0cb..702db2b04fc 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/ReverseOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/ReverseOperation.java @@ -4,7 +4,6 @@ package io.deephaven.engine.table.impl; import io.deephaven.base.verify.Assert; -import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.impl.rsp.RspArray; @@ -64,8 +63,8 @@ public MemoizedOperationKey getMemoizedOperationKey() { } @Override - public SwapListener newSwapListener(QueryTable queryTable) { - return new SwapListener(queryTable) { + public SimpleSnapshotControl newSnapshotControl(QueryTable queryTable) { + return new SimpleSnapshotControl(queryTable) { @Override public synchronized boolean end(long clockCycle) { final boolean success = super.end(clockCycle); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SimpleSnapshotControl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SimpleSnapshotControl.java new file mode 100644 index 00000000000..9a04ec0651c --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SimpleSnapshotControl.java @@ -0,0 +1,171 @@ +/** + * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.engine.table.impl; + +import io.deephaven.configuration.Configuration; +import io.deephaven.engine.table.TableUpdateListener; +import io.deephaven.engine.updategraph.LogicalClock; +import io.deephaven.engine.updategraph.*; +import io.deephaven.internal.log.LoggerFactory; +import io.deephaven.io.logger.Logger; +import io.deephaven.engine.table.impl.remote.ConstructSnapshot; +import org.jetbrains.annotations.NotNull; + +import javax.annotation.OverridingMethodsMustInvokeSuper; + +/** + * A simple implementation of {@link ConstructSnapshot.SnapshotControl} that uses the last notification step of the + * source table to determine whether to use previous values during initialization and to evaluate success. + */ +public class SimpleSnapshotControl implements ConstructSnapshot.SnapshotControl { + + static final boolean DEBUG = + Configuration.getInstance().getBooleanWithDefault("SwapListener.debug", false); + + private static final Logger log = LoggerFactory.getLogger(SimpleSnapshotControl.class); + + private TableUpdateListener eventualListener; + private NotificationStepReceiver eventualResult; + boolean success = false; + + /** + * The last clock cycle which the source table produced a notification. + */ + protected long lastNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; + + /** + * The sourceTable, used to get the lastNotificationTime. + */ + final BaseTable sourceTable; + + public SimpleSnapshotControl(final BaseTable sourceTable) { + this.sourceTable = sourceTable; + } + + @Override + public UpdateGraph getUpdateGraph() { + return sourceTable.getUpdateGraph(); + } + + /** + * Starts a snapshot. + * + * @param beforeClockValue the logical clock value we are starting a snapshot on + * @return true if we should use previous values, false if we should use current values. + */ + @Override + public synchronized Boolean usePreviousValues(final long beforeClockValue) { + lastNotificationStep = sourceTable.getLastNotificationStep(); + success = false; + + final long beforeStep = LogicalClock.getStep(beforeClockValue); + final LogicalClock.State beforeState = LogicalClock.getState(beforeClockValue); + + final boolean idle = beforeState == LogicalClock.State.Idle; + final boolean updatedOnThisStep = beforeStep == lastNotificationStep; + final boolean satisfied; + try { + satisfied = idle || updatedOnThisStep || sourceTable.satisfied(beforeStep); + } catch (ClockInconsistencyException e) { + return null; + } + final boolean usePrev = !satisfied; + + if (DEBUG) { + log.info().append("SimpleSnapshotControl {source=").append(System.identityHashCode(sourceTable)) + .append(", control=").append(System.identityHashCode(this)) + .append("} Start: beforeStep=").append(beforeStep) + .append(", beforeState=").append(beforeState.name()) + .append(", lastNotificationStep=").append(lastNotificationStep) + .append(", satisfied=").append(satisfied) + .append(", usePrev=").append(usePrev) + .endl(); + } + return usePrev; + } + + @Override + public boolean snapshotConsistent(long currentClockValue, boolean usingPreviousValues) { + return isInInitialNotificationWindow(); + } + + @Override + public final boolean snapshotCompletedConsistently(long afterClockValue, boolean usedPreviousValues) { + return end(afterClockValue); + } + + /** + * Ends a snapshot. Overriding methods must call {@code super} in order to ensure that the result's last + * notification step is properly set. + * + * @param clockCycle The {@link LogicalClock logical clock} cycle we are ending a snapshot on + * @return true if the snapshot was successful, false if we should try again. + * @throws IllegalStateException If the snapshot was successful (consistent), but the snapshot function failed to + * set the eventual listener or eventual result + */ + @OverridingMethodsMustInvokeSuper + protected synchronized boolean end(@SuppressWarnings("unused") final long clockCycle) { + if (isInInitialNotificationWindow()) { + if (eventualResult == null) { + throw new IllegalStateException("Result has not been set on end!"); + } + success = true; + } else { + success = false; + } + + if (DEBUG) { + log.info().append("SimpleSnapshotControl {source=").append(System.identityHashCode(sourceTable)) + .append(" control=").append(System.identityHashCode(this)) + .append("} End: success=").append(success) + .append(", last=").append(lastNotificationStep) + .endl(); + } + + if (success) { + eventualResult.setLastNotificationStep(lastNotificationStep); + success = subscribeForUpdates(eventualListener); + } + + return success; + } + + /** + * @return Whether we are in the initial notification window and can continue with the snapshot + */ + protected boolean isInInitialNotificationWindow() { + final long newNotificationStep = sourceTable.getLastNotificationStep(); + return lastNotificationStep == newNotificationStep; + } + + /** + * Subscribe for updates from the source table. + * + * @param listener The listener to subscribe + * @return Whether the subscription was successful + */ + protected boolean subscribeForUpdates(@NotNull final TableUpdateListener listener) { + return sourceTable.addUpdateListener(lastNotificationStep, listener); + } + + /** + * Set the listener that will eventually become the listener, if we have a successful snapshot. + * + * @param listener The listener that we will eventually forward all updates to + * @param resultTable The table that will result from this operation + */ + public synchronized void setListenerAndResult( + @NotNull final TableUpdateListener listener, + @NotNull final NotificationStepReceiver resultTable) { + eventualListener = listener; + eventualResult = resultTable; + if (DEBUG) { + log.info().append("SimpleSnapshotControl {source=").append(System.identityHashCode(sourceTable)) + .append(", control=").append(System.identityHashCode(this)) + .append(", result=").append(System.identityHashCode(resultTable)) + .append('}') + .endl(); + } + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListenerEx.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SimpleSnapshotControlEx.java similarity index 70% rename from engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListenerEx.java rename to engine/table/src/main/java/io/deephaven/engine/table/impl/SimpleSnapshotControlEx.java index 05e6b74e881..4333cc3fd9e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListenerEx.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SimpleSnapshotControlEx.java @@ -2,7 +2,6 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.TableUpdateListener; -import io.deephaven.engine.table.impl.remote.ConstructSnapshot; import io.deephaven.engine.updategraph.ClockInconsistencyException; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.updategraph.WaitNotification; @@ -11,35 +10,27 @@ import org.jetbrains.annotations.NotNull; /** - * Variant of {@link SwapListener} that considers an "extra" {@link NotificationStepSource} in addition to the source - * {@link BaseTable} when determining whether to use previous values during initialization or evaluating success. This - * is useful anytime an operation needs to listen to and snapshot one data source while also snapshotting another. + * Variant of {@link SimpleSnapshotControl} that considers an "extra" {@link NotificationStepSource} in addition to the + * source {@link BaseTable} when determining whether to use previous values during initialization or evaluating success. + * This is useful anytime an operation needs to listen to and snapshot one data source while also snapshotting another. */ -public final class SwapListenerEx extends SwapListener { +public final class SimpleSnapshotControlEx extends SimpleSnapshotControl { - private static final Logger log = LoggerFactory.getLogger(SwapListenerEx.class); + private static final Logger log = LoggerFactory.getLogger(SimpleSnapshotControlEx.class); private final NotificationStepSource extra; private long extraLastNotificationStep; - public SwapListenerEx(@NotNull final BaseTable sourceTable, @NotNull final NotificationStepSource extra) { + public SimpleSnapshotControlEx(@NotNull final BaseTable sourceTable, + @NotNull final NotificationStepSource extra) { super(sourceTable); this.extra = extra; } @Override - public ConstructSnapshot.SnapshotControl makeSnapshotControl() { - return ConstructSnapshot.makeSnapshotControl( - sourceTable.getUpdateGraph(), - this::startWithExtra, - (final long currentClockValue, final boolean usingPreviousValues) -> isInInitialNotificationWindow() - && extra.getLastNotificationStep() == extraLastNotificationStep, - (final long afterClockValue, final boolean usedPreviousValues) -> end(afterClockValue)); - } - @SuppressWarnings("AutoBoxing") - public synchronized Boolean startWithExtra(final long beforeClockValue) { + public synchronized Boolean usePreviousValues(final long beforeClockValue) { lastNotificationStep = sourceTable.getLastNotificationStep(); extraLastNotificationStep = extra.getLastNotificationStep(); success = false; @@ -78,7 +69,7 @@ public synchronized Boolean startWithExtra(final long beforeClockValue) { if (DEBUG) { log.info().append("SwapListenerEx {source=").append(System.identityHashCode(sourceTable)) .append(", extra=").append(System.identityHashCode(extra)) - .append(", swap=").append(System.identityHashCode(this)) + .append(", control=").append(System.identityHashCode(this)) .append("} Start: beforeStep=").append(beforeStep) .append(", beforeState=").append(beforeState.name()) .append(", sourceLastNotificationStep=").append(lastNotificationStep) @@ -92,14 +83,9 @@ public synchronized Boolean startWithExtra(final long beforeClockValue) { } @Override - public Boolean start(final long beforeClockValue) { - throw new UnsupportedOperationException("Use startWithExtra"); - } - - @Override - public synchronized boolean end(final long afterClockValue) { + protected synchronized boolean end(final long afterClockValue) { if (DEBUG) { - log.info().append("SwapListenerEx end() swap=").append(System.identityHashCode(this)) + log.info().append("SimpleSnapshotControlEx end() control=").append(System.identityHashCode(this)) .append(", end={").append(LogicalClock.getStep(afterClockValue)).append(",") .append(LogicalClock.getState(afterClockValue).toString()) .append("}, last=").append(sourceTable.getLastNotificationStep()) @@ -114,9 +100,14 @@ public synchronized void setListenerAndResult(@NotNull final TableUpdateListener @NotNull final NotificationStepReceiver resultTable) { super.setListenerAndResult(listener, resultTable); if (DEBUG) { - log.info().append("SwapListenerEx swap=") - .append(System.identityHashCode(SwapListenerEx.this)) + log.info().append("SnapshotControlEx control=") + .append(System.identityHashCode(SimpleSnapshotControlEx.this)) .append(", result=").append(System.identityHashCode(resultTable)).endl(); } } + + @Override + protected boolean isInInitialNotificationWindow() { + return extra.getLastNotificationStep() == extraLastNotificationStep && super.isInInitialNotificationWindow(); + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java index cfd6eaf99bf..83aa73b98be 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java @@ -82,8 +82,8 @@ public MemoizedOperationKey getMemoizedOperationKey() { } @Override - public SwapListener newSwapListener(QueryTable queryTable) { - return new SwapListener(queryTable) { + public SimpleSnapshotControl newSnapshotControl(QueryTable queryTable) { + return new SimpleSnapshotControl(queryTable) { @Override public synchronized boolean end(long clockCycle) { final boolean success = super.end(clockCycle); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java index 363bd1ea1a2..3b544eb7c7e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java @@ -8,6 +8,7 @@ import io.deephaven.engine.rowset.TrackingWritableRowSet; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.TableUpdateListener; import io.deephaven.engine.table.impl.locations.*; import io.deephaven.engine.updategraph.UpdateSourceRegistrar; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; @@ -264,32 +265,25 @@ protected Collection filterLocationKeys( protected final QueryTable doCoalesce() { initialize(); - final SwapListener swapListener = - createSwapListenerIfRefreshing((final BaseTable parent) -> new SwapListener(parent) { + final SimpleSnapshotControl snapshotControl = + createSnapshotControlIfRefreshing((final BaseTable parent) -> new SimpleSnapshotControl(parent) { @Override - public void destroy() { - // NB: We can't call super.destroy() because we don't want to try to remove ourselves from the - // coalesced table (see override for removeUpdateListener), but we are probably not missing - // anything by not having super.destroy() invoke its own super.destroy(). - removeUpdateListenerUncoalesced(this); - } - - @Override - public void subscribeForUpdates() { - addUpdateListenerUncoalesced(this); + public boolean subscribeForUpdates(@NotNull final TableUpdateListener listener) { + return addUpdateListenerUncoalesced(lastNotificationStep, listener); } }); final Mutable result = new MutableObject<>(); - initializeWithSnapshot("SourceTable.coalesce", swapListener, (usePrev, beforeClockValue) -> { + initializeWithSnapshot("SourceTable.coalesce", snapshotControl, (usePrev, beforeClockValue) -> { final QueryTable resultTable = new QueryTable(definition, rowSet, columnSourceManager.getColumnSources()); copyAttributes(resultTable, CopyAttributeOperation.Coalesce); if (rowSet.isEmpty()) { resultTable.setAttribute(INITIALLY_EMPTY_COALESCED_SOURCE_TABLE_ATTRIBUTE, true); } - if (swapListener != null) { + result.setValue(resultTable); + if (snapshotControl != null) { final ListenerImpl listener = new ListenerImpl("SourceTable.coalesce", this, resultTable) { @@ -300,10 +294,9 @@ protected void destroy() { removeUpdateListenerUncoalesced(this); } }; - swapListener.setListenerAndResult(listener, resultTable); + snapshotControl.setListenerAndResult(listener, resultTable); } - result.setValue(resultTable); return true; }); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListener.java deleted file mode 100644 index b2c2f2ad9ca..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SwapListener.java +++ /dev/null @@ -1,278 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -package io.deephaven.engine.table.impl; - -import io.deephaven.base.log.LogOutput; -import io.deephaven.base.reference.SimpleReference; -import io.deephaven.base.reference.SwappableDelegatingReference; -import io.deephaven.base.reference.WeakSimpleReference; -import io.deephaven.configuration.Configuration; -import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.updategraph.LogicalClock; -import io.deephaven.engine.updategraph.NotificationQueue; -import io.deephaven.engine.table.Table; -import io.deephaven.engine.table.TableUpdate; -import io.deephaven.engine.table.TableUpdateListener; -import io.deephaven.engine.updategraph.*; -import io.deephaven.internal.log.LoggerFactory; -import io.deephaven.io.logger.Logger; -import io.deephaven.engine.liveness.LivenessArtifact; -import io.deephaven.engine.table.impl.remote.ConstructSnapshot; -import org.jetbrains.annotations.NotNull; - -import javax.annotation.OverridingMethodsMustInvokeSuper; - -/** - * Watch for ticks and when initialization is complete forward to the eventual listener. - *

- * The SwapListener is attached to a table so that we can listen for updates during the UGP cycle; and if any updates - * occur, we'll be able to notice them and retry initialization. If no ticks were received before the result is ready, - * then we should forward all calls to our eventual listener. - *

- * Callers should use our start and end functions. The start function is called at the beginning of a data snapshot; and - * allows us to setup our state variables. At the end of the snapshot attempt, end() is called; and if there were no - * clock changes, we were not gotNotification, and no notifications were enqueued; then we have a successful snapshot - * and can return true. We then set the {@code eventualListener}, so that all future calls are forwarded to the - * listener, and replace our - */ -public class SwapListener extends LivenessArtifact implements TableUpdateListener { - - static final boolean DEBUG = - Configuration.getInstance().getBooleanWithDefault("SwapListener.debug", false); - static final boolean DEBUG_NOTIFICATIONS = - Configuration.getInstance().getBooleanWithDefault("SwapListener.debugNotifications", false); - - private static final Logger log = LoggerFactory.getLogger(SwapListener.class); - - /** - * The listener that will be called if this operation is successful. If we have a successful snapshot, then success - * is set to true. - */ - private TableUpdateListener eventualListener; - private NotificationStepReceiver eventualResult; - boolean success = false; - - /** - * The last clock cycle which the source table produced a notification. - */ - long lastNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; - - /** - * The sourceTable, used to get the lastNotificationTime. - */ - final BaseTable sourceTable; - - /** - * {@link WeakSimpleReference} to {@code this}, for capturing notifications from {@code sourceTable} before - * successful {@link #end(long)}. - */ - private final SimpleReference initialDelegate = new WeakSimpleReference<>(this); - /** - * {@link SwappableDelegatingReference}, to be swapped to a reference to the {@code eventualListener} upon - * successful {@link #end(long)}. - */ - private final SwappableDelegatingReference referenceForSource = - new SwappableDelegatingReference<>(initialDelegate); - - public SwapListener(final BaseTable sourceTable) { - this.sourceTable = sourceTable; - } - - public ConstructSnapshot.SnapshotControl makeSnapshotControl() { - // noinspection AutoBoxing - return ConstructSnapshot.makeSnapshotControl( - sourceTable.getUpdateGraph(), - this::start, - (final long currentClockValue, final boolean usingPreviousValues) -> isInInitialNotificationWindow(), - (final long afterClockValue, final boolean usedPreviousValues) -> end(afterClockValue)); - } - - /** - * Starts a snapshot. - * - * @param beforeClockValue the logical clock value we are starting a snapshot on - * @return true if we should use previous values, false if we should use current values. - */ - protected synchronized Boolean start(final long beforeClockValue) { - lastNotificationStep = sourceTable.getLastNotificationStep(); - success = false; - - final long beforeStep = LogicalClock.getStep(beforeClockValue); - final LogicalClock.State beforeState = LogicalClock.getState(beforeClockValue); - - final boolean idle = beforeState == LogicalClock.State.Idle; - final boolean updatedOnThisStep = beforeStep == lastNotificationStep; - final boolean satisfied; - try { - satisfied = idle || updatedOnThisStep || sourceTable.satisfied(beforeStep); - } catch (ClockInconsistencyException e) { - return null; - } - final boolean usePrev = !satisfied; - - if (DEBUG) { - log.info().append("SwapListener {source=").append(System.identityHashCode(sourceTable)) - .append(", swap=").append(System.identityHashCode(this)) - .append("} Start: beforeStep=").append(beforeStep) - .append(", beforeState=").append(beforeState.name()) - .append(", lastNotificationStep=").append(lastNotificationStep) - .append(", satisfied=").append(satisfied) - .append(", usePrev=").append(usePrev) - .endl(); - } - return usePrev; - } - - /** - * Ends a snapshot. Overriding methods must call {@code super} in order to ensure that the source's reference to the - * child listener is properly swapped to the eventual listener. - * - * @param clockCycle The {@link LogicalClock logical clock} cycle we are ending a snapshot on - * @return true if the snapshot was successful, false if we should try again. - * @throws IllegalStateException If the snapshot was successful (consistent), but the snapshot function failed to - * set the eventual listener or eventual result - */ - @OverridingMethodsMustInvokeSuper - protected synchronized boolean end(@SuppressWarnings("unused") final long clockCycle) { - if (isInInitialNotificationWindow()) { - if (eventualListener == null) { - throw new IllegalStateException("Listener has not been set on end!"); - } - if (eventualResult == null) { - throw new IllegalStateException("Result has not been set on end!"); - } - success = true; - } else { - success = false; - } - - if (DEBUG) { - log.info().append("SwapListener {source=").append(System.identityHashCode(sourceTable)) - .append(" swap=").append(System.identityHashCode(this)) - .append("} End: success=").append(success) - .append(", last=").append(lastNotificationStep) - .endl(); - } - - if (success) { - eventualResult.setLastNotificationStep(lastNotificationStep); - referenceForSource.swapDelegate(initialDelegate, eventualListener instanceof LegacyListenerAdapter - ? (LegacyListenerAdapter) eventualListener - : new WeakSimpleReference<>(eventualListener)); - forceReferenceCountToZero(); - } - - return success; - } - - /** - * Get a {@link SimpleReference} to be used in the source table's list of child listener references. - * - * @return A (swappable) {@link SimpleReference} to {@code this} - */ - public SimpleReference getReferenceForSource() { - return referenceForSource; - } - - @Override - public synchronized void onFailure( - final Throwable originalException, final Entry sourceEntry) { - // not a direct listener - throw new UnsupportedOperationException(); - } - - @Override - public synchronized NotificationQueue.ErrorNotification getErrorNotification( - final Throwable originalException, final Entry sourceEntry) { - if (!success || isInInitialNotificationWindow()) { - return new EmptyErrorNotification(); - } - return eventualListener.getErrorNotification(originalException, sourceEntry); - } - - @Override - public synchronized void onUpdate(final TableUpdate upstream) { - // not a direct listener - throw new UnsupportedOperationException(); - } - - @Override - public synchronized NotificationQueue.Notification getNotification(final TableUpdate upstream) { - if (!success || isInInitialNotificationWindow()) { - return new EmptyNotification(); - } - - final NotificationQueue.Notification notification = eventualListener.getNotification(upstream); - if (!DEBUG_NOTIFICATIONS) { - return notification; - } - - return new AbstractNotification(notification.isTerminal()) { - - @Override - public boolean canExecute(final long step) { - return notification.canExecute(step); - } - - @Override - public LogOutput append(final LogOutput logOutput) { - return logOutput.append("Wrapped(SwapListener {source=").append(System.identityHashCode(sourceTable)) - .append(" swap=").append(System.identityHashCode(SwapListener.this)) - .append("}, notification=").append(notification).append(")"); - } - - @Override - public void run() { - log.info().append("SwapListener {source=").append(System.identityHashCode(sourceTable)) - .append(" swap=").append(System.identityHashCode(SwapListener.this)) - .append(", clock=").append(ExecutionContext.getContext().getUpdateGraph().clock().currentStep()) - .append("} Firing notification") - .endl(); - notification.run(); - log.info().append("SwapListener {source=").append(System.identityHashCode(sourceTable)) - .append(" swap=").append(System.identityHashCode(SwapListener.this)) - .append("} Complete notification") - .endl(); - } - }; - } - - boolean isInInitialNotificationWindow() { - final long newNotificationStep = sourceTable.getLastNotificationStep(); - return lastNotificationStep == newNotificationStep; - } - - /** - * Set the listener that will eventually become the listener, if we have a successful swap. - * - * @param listener The listener that we will eventually forward all updates to - * @param resultTable The table that will result from this operation - */ - public synchronized void setListenerAndResult( - @NotNull final TableUpdateListener listener, - @NotNull final NotificationStepReceiver resultTable) { - eventualListener = listener; - eventualResult = resultTable; - if (DEBUG) { - log.info().append("SwapListener {source=").append(System.identityHashCode(sourceTable)) - .append(", swap=").append(System.identityHashCode(this)) - .append(", result=").append(System.identityHashCode(resultTable)) - .append('}') - .endl(); - } - } - - /** - * Invoke {@link Table#addUpdateListener(TableUpdateListener) addUpdateListener} on {@code this}. - */ - public void subscribeForUpdates() { - sourceTable.addUpdateListener(this); - } - - @Override - public void destroy() { - super.destroy(); - sourceTable.removeUpdateListener(this); - } -} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableAdapter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableAdapter.java index c11eaaf3ce4..6bd705353a3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableAdapter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableAdapter.java @@ -362,6 +362,11 @@ default void addUpdateListener(TableUpdateListener listener) { throwUnsupported(); } + @Override + default boolean addUpdateListener(final long requiredLastNotificationStep, final TableUpdateListener listener) { + return throwUnsupported(); + } + @Override default void removeUpdateListener(ShiftObliviousListener listener) { throwUnsupported(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/UncoalescedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/UncoalescedTable.java index 6a4fc8a9c5b..96b8dee33d9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/UncoalescedTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/UncoalescedTable.java @@ -101,8 +101,9 @@ protected final void setCoalesced(final Table coalesced) { // region uncoalesced listeners - protected final void addUpdateListenerUncoalesced(@NotNull final TableUpdateListener listener) { - super.addUpdateListener(listener); + protected final boolean addUpdateListenerUncoalesced( + final long requiredLastNotificationStep, @NotNull final TableUpdateListener listener) { + return super.addUpdateListener(requiredLastNotificationStep, listener); } protected final void removeUpdateListenerUncoalesced(@NotNull final TableUpdateListener listener) { @@ -457,10 +458,15 @@ public void addUpdateListener(ShiftObliviousListener listener, boolean replayIni } @Override - public void addUpdateListener(TableUpdateListener listener) { + public void addUpdateListener(@NotNull TableUpdateListener listener) { coalesce().addUpdateListener(listener); } + @Override + public boolean addUpdateListener(long requiredLastNotificationStep, @NotNull TableUpdateListener listener) { + return coalesce().addUpdateListener(requiredLastNotificationStep, listener); + } + @Override public void removeUpdateListener(ShiftObliviousListener listener) { coalesce().removeUpdateListener(listener); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java index dc713dee126..d12c6a11c79 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java @@ -116,11 +116,12 @@ public static QueryTable aggregation( } } final Mutable resultHolder = new MutableObject<>(); - final SwapListener swapListener = input.createSwapListenerIfRefreshing(SwapListener::new); + final SimpleSnapshotControl snapshotControl = + input.createSnapshotControlIfRefreshing(SimpleSnapshotControl::new); BaseTable.initializeWithSnapshot( - "by(" + aggregationContextFactory + ", " + groupByColumns + ")", swapListener, + "by(" + aggregationContextFactory + ", " + groupByColumns + ")", snapshotControl, (usePrev, beforeClockValue) -> { - resultHolder.setValue(aggregation(control, swapListener, aggregationContextFactory, + resultHolder.setValue(aggregation(control, snapshotControl, aggregationContextFactory, input, preserveEmpty, initialKeys, keyNames, usePrev)); return true; }); @@ -129,7 +130,7 @@ public static QueryTable aggregation( private static QueryTable aggregation( @NotNull final AggregationControl control, - @Nullable final SwapListener swapListener, + @Nullable final SimpleSnapshotControl snapshotControl, @NotNull final AggregationContextFactory aggregationContextFactory, @NotNull final QueryTable input, final boolean preserveEmpty, @@ -140,7 +141,7 @@ private static QueryTable aggregation( // This should be checked before this method is called, but let's verify here in case an additional // entry point is added incautiously. Assert.eqNull(initialKeys, "initialKeys"); - return noKeyAggregation(swapListener, aggregationContextFactory, input, preserveEmpty, usePrev); + return noKeyAggregation(snapshotControl, aggregationContextFactory, input, preserveEmpty, usePrev); } final ColumnSource[] keySources = @@ -250,7 +251,7 @@ private static QueryTable aggregation( final TableUpdateListener listener = new BaseTable.ListenerImpl("by(" + aggregationContextFactory + ")", input, result) { @ReferentialIntegrity - final SwapListener swapListenerHardReference = swapListener; + final SimpleSnapshotControl swapListenerHardReference = snapshotControl; final ModifiedColumnSet keysUpstreamModifiedColumnSet = input.newModifiedColumnSet(keyNames); final ModifiedColumnSet[] operatorInputModifiedColumnSets = @@ -296,7 +297,7 @@ public void onFailureInternal(@NotNull final Throwable originalException, Entry } }; - swapListener.setListenerAndResult(listener, result); + snapshotControl.setListenerAndResult(listener, result); } return ac.transformResult(result); @@ -1909,7 +1910,7 @@ private static void copyKeyColumns(ColumnSource[] keyColumnsRaw, WritableColu } private static QueryTable noKeyAggregation( - @Nullable final SwapListener swapListener, + @Nullable final SimpleSnapshotControl snapshotControl, @NotNull final AggregationContextFactory aggregationContextFactory, @NotNull final QueryTable table, final boolean preserveEmpty, @@ -2102,7 +2103,7 @@ public void onFailureInternal(@NotNull final Throwable originalException, super.onFailureInternal(originalException, sourceEntry); } }; - swapListener.setListenerAndResult(listener, result); + snapshotControl.setListenerAndResult(listener, result); } ac.supplyRowLookup(() -> key -> Arrays.equals((Object[]) key, EMPTY_KEY) ? 0 : DEFAULT_UNKNOWN_ROW); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/TreeTableFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/TreeTableFilter.java index c4b58b165b8..36d682c1e93 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/TreeTableFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/TreeTableFilter.java @@ -159,12 +159,11 @@ private TreeTableFilter(@NotNull final TreeTableImpl tree, @NotNull final WhereF if (source.isRefreshing()) { try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph( source.getUpdateGraph()).open()) { - final SwapListenerEx swapListener = new SwapListenerEx(source, sourceRowLookup); - source.addUpdateListener(swapListener); + final SimpleSnapshotControlEx snapshotControl = new SimpleSnapshotControlEx(source, sourceRowLookup); ConstructSnapshot.callDataSnapshotFunction(System.identityHashCode(source) + ": ", - swapListener.makeSnapshotControl(), + snapshotControl, (usePrev, beforeClockValue) -> { - doInitialFilter(swapListener, usePrev); + doInitialFilter(snapshotControl, usePrev); return true; }); } @@ -173,7 +172,7 @@ private TreeTableFilter(@NotNull final TreeTableImpl tree, @NotNull final WhereF } } - private void doInitialFilter(@Nullable final SwapListener swapListener, final boolean usePrev) { + private void doInitialFilter(@Nullable final SimpleSnapshotControl snapshotControl, final boolean usePrev) { try (final RowSet sourcePrevRows = usePrev ? source.getRowSet().copyPrev() : null) { final RowSet sourceRows = usePrev ? sourcePrevRows : source.getRowSet(); @@ -186,9 +185,9 @@ private void doInitialFilter(@Nullable final SwapListener swapListener, final bo } result = source.getSubTable(resultRows); - if (swapListener != null) { + if (snapshotControl != null) { sourceListener = new Listener(); - swapListener.setListenerAndResult(sourceListener, result); + snapshotControl.setListenerAndResult(sourceListener, result); result.addParentReference(sourceListener); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java index 82863b918fa..30d0beae1b2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java @@ -29,9 +29,10 @@ import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.SortingOrder; -import io.deephaven.engine.table.impl.SwapListener; +import io.deephaven.engine.table.impl.SimpleSnapshotControl; import io.deephaven.engine.table.impl.by.AggregationProcessor; import io.deephaven.engine.table.impl.join.dupcompact.DupCompactKernel; +import io.deephaven.engine.table.impl.remote.ConstructSnapshot; import io.deephaven.engine.table.impl.sort.IntSortKernel; import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource; import io.deephaven.engine.table.impl.sources.IntegerSparseArraySource; @@ -233,8 +234,8 @@ public boolean snapshotNeeded() { } @Override - public SwapListener newSwapListener(@NotNull final QueryTable queryTable) { - // Since this operation never needs a snapshot, it does not need to support creating a SwapListener. + public SimpleSnapshotControl newSnapshotControl(@NotNull final QueryTable queryTable) { + // Since this operation never needs a snapshot, it does not need to support creating a SnapshotControl. throw new UnsupportedOperationException(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceWithDictionary.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceWithDictionary.java index ecea08130cd..c1168185dc2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceWithDictionary.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceWithDictionary.java @@ -248,19 +248,19 @@ public final Table getSymbolTable(@NotNull final QueryTable sourceTable, final b return sourceTable.memoizeResult(MemoizedOperationKey.symbolTable(this, useLookupCaching), () -> { final String description = "getSymbolTable(" + sourceTable.getDescription() + ", " + useLookupCaching + ')'; return QueryPerformanceRecorder.withNugget(description, sourceTable.size(), () -> { - final SwapListener swapListener = - sourceTable.createSwapListenerIfRefreshing(SwapListener::new); + final SimpleSnapshotControl snapshotControl = + sourceTable.createSnapshotControlIfRefreshing(SimpleSnapshotControl::new); final Mutable

result = new MutableObject<>(); - BaseTable.initializeWithSnapshot(description, swapListener, + BaseTable.initializeWithSnapshot(description, snapshotControl, (final boolean usePrev, final long beforeClockValue) -> { final QueryTable symbolTable; - if (swapListener == null) { + if (snapshotControl == null) { symbolTable = getStaticSymbolTable(sourceTable.getRowSet(), useLookupCaching); } else { symbolTable = getStaticSymbolTable( usePrev ? sourceTable.getRowSet().copyPrev() : sourceTable.getRowSet(), useLookupCaching); - swapListener.setListenerAndResult( + snapshotControl.setListenerAndResult( new SymbolTableUpdateListener(description, sourceTable, symbolTable), symbolTable); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ring/AddsToRingsListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ring/AddsToRingsListener.java index 01bd9317c34..af460b0a175 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ring/AddsToRingsListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ring/AddsToRingsListener.java @@ -4,7 +4,6 @@ package io.deephaven.engine.table.impl.sources.ring; import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.WritableRowSet; @@ -15,7 +14,7 @@ import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.BaseTable; import io.deephaven.engine.table.impl.QueryTable; -import io.deephaven.engine.table.impl.SwapListener; +import io.deephaven.engine.table.impl.SimpleSnapshotControl; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; import io.deephaven.engine.updategraph.UpdateCommitter; @@ -29,8 +28,8 @@ enum Init { NONE, FROM_PREVIOUS, FROM_CURRENT } - static Table of(SwapListener swapListener, Table parent, int capacity, Init init) { - if (swapListener == null && init == Init.NONE) { + static Table of(SimpleSnapshotControl snapshotControl, Table parent, int capacity, Init init) { + if (snapshotControl == null && init == Init.NONE) { throw new IllegalArgumentException(String.format( "Trying to initialize %s against a static table, but init=NONE; no data will be filled in this case.", AddsToRingsListener.class.getName())); @@ -72,15 +71,11 @@ static Table of(SwapListener swapListener, Table parent, int capacity, Init init final WritableRowSet initialRowSet = init(init, parent, sources, sourceHasUnboundedFillContexts, rings); final QueryTable result = new QueryTable(initialRowSet.toTracking(), resultMap); - if (swapListener == null) { - result.setRefreshing(false); - } else { + if (snapshotControl != null) { result.setRefreshing(true); - } - final AddsToRingsListener listener = new AddsToRingsListener( - "AddsToRingsListener", parent, result, sources, sourceHasUnboundedFillContexts, rings); - if (swapListener != null) { - swapListener.setListenerAndResult(listener, result); + final AddsToRingsListener listener = new AddsToRingsListener( + "AddsToRingsListener", parent, result, sources, sourceHasUnboundedFillContexts, rings); + snapshotControl.setListenerAndResult(listener, result); } return result; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ring/RingTableTools.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ring/RingTableTools.java index 153e8bd8287..00827314db5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ring/RingTableTools.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ring/RingTableTools.java @@ -8,7 +8,7 @@ import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.BaseTable; import io.deephaven.engine.table.impl.BlinkTableTools; -import io.deephaven.engine.table.impl.SwapListener; +import io.deephaven.engine.table.impl.SimpleSnapshotControl; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.remote.ConstructSnapshot.SnapshotFunction; import io.deephaven.engine.table.impl.sources.ring.AddsToRingsListener.Init; @@ -44,9 +44,10 @@ public static Table of(Table parent, int capacity) { */ public static Table of(Table parent, int capacity, boolean initialize) { return QueryPerformanceRecorder.withNugget("RingTableTools.of", () -> { - final BaseTable baseTable = (BaseTable) parent.coalesce(); - final SwapListener swapListener = baseTable.createSwapListenerIfRefreshing(SwapListener::new); - return new RingTableSnapshotFunction(baseTable, capacity, initialize, swapListener).constructResults(); + final BaseTable baseTable = (BaseTable) parent.coalesce(); + final SimpleSnapshotControl snapshotControl = + baseTable.createSnapshotControlIfRefreshing(SimpleSnapshotControl::new); + return new RingTableSnapshotFunction(baseTable, capacity, initialize, snapshotControl).constructResults(); }); } @@ -70,10 +71,11 @@ public static Table of2(Table parent, int capacity, boolean initialize) { return QueryPerformanceRecorder.withNugget("RingTableTools.of2", () -> { // todo: there is probably a better way to do this final int capacityPowerOf2 = capacity == 1 ? 1 : Integer.highestOneBit(capacity - 1) << 1; - final BaseTable baseTable = (BaseTable) parent.coalesce(); - final SwapListener swapListener = baseTable.createSwapListenerIfRefreshing(SwapListener::new); + final BaseTable baseTable = (BaseTable) parent.coalesce(); + final SimpleSnapshotControl snapshotControl = + baseTable.createSnapshotControlIfRefreshing(SimpleSnapshotControl::new); final Table tablePowerOf2 = - new RingTableSnapshotFunction(baseTable, capacityPowerOf2, initialize, swapListener) + new RingTableSnapshotFunction(baseTable, capacityPowerOf2, initialize, snapshotControl) .constructResults(); return capacityPowerOf2 == capacity ? tablePowerOf2 : tablePowerOf2.tail(capacity); }); @@ -83,21 +85,23 @@ private static class RingTableSnapshotFunction implements SnapshotFunction { private final Table parent; private final int capacity; private final boolean initialize; - private final SwapListener swapListener; + private final SimpleSnapshotControl snapshotControl; private Table results; - public RingTableSnapshotFunction(Table parent, int capacity, boolean initialize, SwapListener swapListener) { + public RingTableSnapshotFunction( + Table parent, int capacity, boolean initialize, SimpleSnapshotControl snapshotControl) { this.parent = Objects.requireNonNull(parent); this.capacity = capacity; this.initialize = initialize; - this.swapListener = swapListener; + this.snapshotControl = snapshotControl; } public Table constructResults() { try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(parent.getUpdateGraph()).open()) { - BaseTable.initializeWithSnapshot(RingTableSnapshotFunction.class.getSimpleName(), swapListener, this); + BaseTable.initializeWithSnapshot( + RingTableSnapshotFunction.class.getSimpleName(), snapshotControl, this); } return Objects.requireNonNull(results); } @@ -105,7 +109,7 @@ public Table constructResults() { @Override public boolean call(boolean usePrev, long beforeClockValue) { final Init init = !initialize ? Init.NONE : usePrev ? Init.FROM_PREVIOUS : Init.FROM_CURRENT; - results = AddsToRingsListener.of(swapListener, parent, capacity, init); + results = AddsToRingsListener.of(snapshotControl, parent, capacity, init); return true; } } diff --git a/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java b/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java index abf3956ebd9..1ebbf838b28 100644 --- a/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java +++ b/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java @@ -10,7 +10,7 @@ import io.deephaven.engine.table.impl.BaseTable; import io.deephaven.engine.table.impl.InstrumentedTableUpdateListener; import io.deephaven.engine.table.impl.NotificationStepReceiver; -import io.deephaven.engine.table.impl.SwapListener; +import io.deephaven.engine.table.impl.SimpleSnapshotControl; import io.deephaven.engine.table.impl.UncoalescedTable; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.hash.KeyedLongObjectHashMap; @@ -139,15 +139,14 @@ private synchronized void onNewTableExport(final Ticket ticket, final int export return; } - final SwapListener swapListener = new SwapListener(table); - swapListener.subscribeForUpdates(); + final SimpleSnapshotControl snapshotControl = new SimpleSnapshotControl(table); final ListenerImpl listener = new ListenerImpl(table, exportId); listener.tryRetainReference(); updateListenerMap.put(exportId, listener); final MutableLong initSize = new MutableLong(); - BaseTable.initializeWithSnapshot(logPrefix, swapListener, (usePrev, beforeClockValue) -> { - swapListener.setListenerAndResult(listener, NOOP_NOTIFICATION_STEP_RECEIVER); + BaseTable.initializeWithSnapshot(logPrefix, snapshotControl, (usePrev, beforeClockValue) -> { + snapshotControl.setListenerAndResult(listener, NOOP_NOTIFICATION_STEP_RECEIVER); final TrackingRowSet rowSet = table.getRowSet(); initSize.setValue(usePrev ? rowSet.sizePrev() : rowSet.size()); return true;