Skip to content

Commit

Permalink
BaseTable: Remove Swap Listener and Add Atomic addUpdateListener
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Oct 17, 2023
1 parent 4f43d53 commit 78fdc01
Show file tree
Hide file tree
Showing 22 changed files with 415 additions and 485 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ public Table apply(final Table wholeTable) {
// TODO restore this to support non-QueryTable types
// if (wholeTable instanceof BaseTable) {
// BaseTable baseTable = (BaseTable) wholeTable;
// final SwapListener swapListener =
// baseTable.createSwapListenerIfRefreshing(SwapListener::new);
// final SimpleSnapshotControl snapshotControl =
// baseTable.createSnapshotControlIfRefreshing(SimpleSnapshotControl::new);
//
// final Mutable<QueryTable> result = new MutableObject<>();
//
// baseTable.initializeWithSnapshot("downsample", swapListener, (prevRequested, beforeClock) -> {
// baseTable.initializeWithSnapshot("downsample", snapshotControl, (prevRequested, beforeClock) -> {
// final boolean usePrev = prevRequested && baseTable.isRefreshing();
// final WritableRowSet rowSetToUse = usePrev ? baseTable.build().copyPrev() : baseTable.build();
//
Expand All @@ -136,20 +136,20 @@ public Table apply(final Table wholeTable) {
}

private static Table makeDownsampledQueryTable(final QueryTable wholeQueryTable, final DownsampleKey memoKey) {
final SwapListener swapListener =
wholeQueryTable.createSwapListenerIfRefreshing(SwapListener::new);
final SimpleSnapshotControl snapshotControl =
wholeQueryTable.createSnapshotControlIfRefreshing(SimpleSnapshotControl::new);

final Mutable<Table> result = new MutableObject<>();

BaseTable.initializeWithSnapshot("downsample", swapListener, (prevRequested, beforeClock) -> {
BaseTable.initializeWithSnapshot("downsample", snapshotControl, (prevRequested, beforeClock) -> {
final boolean usePrev = prevRequested && wholeQueryTable.isRefreshing();

final DownsamplerListener downsampleListener = DownsamplerListener.of(wholeQueryTable, memoKey);
downsampleListener.init(usePrev);
result.setValue(downsampleListener.resultTable);

if (swapListener != null) {
swapListener.setListenerAndResult(downsampleListener, downsampleListener.resultTable);
if (snapshotControl != null) {
snapshotControl.setListenerAndResult(downsampleListener, downsampleListener.resultTable);
downsampleListener.resultTable.addParentReference(downsampleListener);
}

Expand Down Expand Up @@ -694,7 +694,7 @@ private void performRescans(final DownsampleChunkContext context) {
/**
* Indicates that a change has probably happened and we should notify the result table. The contents of the
* change will be our state map (i.e. there is
*
*
* @param upstream the change that happened upstream
* @param lastRowSet the base rowSet to use when considering what items to tell the result table changed. if
* this.rowSet, then update it normally, otherwise this.rowSet must be empty and this.rowSet should be
Expand Down
10 changes: 10 additions & 0 deletions engine/api/src/main/java/io/deephaven/engine/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,16 @@ RollupTable rollup(Collection<? extends Aggregation> aggregations, boolean inclu
*/
void addUpdateListener(TableUpdateListener listener);

/**
* Subscribe for updates to this table. {@code listener} will be invoked via the {@link NotificationQueue}
* associated with this Table.
*
* @param requiredLastNotificationStep fail to add update listener if the last notification step is not this
* @param listener listener for updates
* @return true if the listener was added, false if the last notification step requirement was not met
*/
boolean addUpdateListener(final long requiredLastNotificationStep, final TableUpdateListener listener);

/**
* Unsubscribe the supplied listener.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ public static Table toBlink(@NotNull final Table table) {
final MutableObject<QueryTable> resultHolder = new MutableObject<>();
final MutableObject<AddOnlyToBlinkListener> listenerHolder = new MutableObject<>();
final BaseTable<?> coalesced = (BaseTable<?>) table.coalesce();
final SwapListener swapListener = coalesced.createSwapListenerIfRefreshing(SwapListener::new);
final SimpleSnapshotControl snapshotControl =
coalesced.createSnapshotControlIfRefreshing(SimpleSnapshotControl::new);

// noinspection DataFlowIssue swapListener cannot be null here, since we know the table is refreshing
ConstructSnapshot.callDataSnapshotFunction("addOnlyToBlink", swapListener.makeSnapshotControl(),
ConstructSnapshot.callDataSnapshotFunction("addOnlyToBlink", snapshotControl,
(final boolean usePrev, final long beforeClockValue) -> {
// Start with the same rows as the original table
final TrackingRowSet resultRowSet = usePrev
Expand All @@ -69,7 +70,7 @@ public static Table toBlink(@NotNull final Table table) {
final AddOnlyToBlinkListener listener = new AddOnlyToBlinkListener(recorder, result);
recorder.setMergedListener(listener);
result.addParentReference(listener);
swapListener.setListenerAndResult(recorder, result);
snapshotControl.setListenerAndResult(recorder, result);

listenerHolder.setValue(listener);
resultHolder.setValue(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ public void addUpdateListener(final ShiftObliviousListener listener, final boole
}

@Override
public void addUpdateListener(final TableUpdateListener listener) {
public void addUpdateListener(@NotNull final TableUpdateListener listener) {
if (isFailed) {
throw new IllegalStateException("Can not listen to failed table " + description);
}
Expand All @@ -570,14 +570,38 @@ public void addUpdateListener(final TableUpdateListener listener) {
}
}

@Override
public boolean addUpdateListener(
final long requiredLastNotificationStep, @NotNull final TableUpdateListener listener) {
if (isFailed) {
throw new IllegalStateException("Can not listen to failed table " + description);
}

if (!isRefreshing()) {
return false;
}

synchronized (this) {
if (this.lastNotificationStep != requiredLastNotificationStep) {
return false;
}

// ensure that listener is in the same update graph if applicable
if (listener instanceof NotificationQueue.Dependency) {
getUpdateGraph((NotificationQueue.Dependency) listener);
}
ensureChildListenerReferences().add(listener);

return true;
}
}

private SimpleReferenceManager<TableUpdateListener, ? extends SimpleReference<TableUpdateListener>> ensureChildListenerReferences() {
// noinspection unchecked
return FieldUtils.ensureField(this, CHILD_LISTENER_REFERENCES_UPDATER, EMPTY_CHILD_LISTENER_REFERENCES,
() -> new SimpleReferenceManager<>((final TableUpdateListener tableUpdateListener) -> {
if (tableUpdateListener instanceof LegacyListenerAdapter) {
return (LegacyListenerAdapter) tableUpdateListener;
} else if (tableUpdateListener instanceof SwapListener) {
return ((SwapListener) tableUpdateListener).getReferenceForSource();
} else {
return new WeakSimpleReference<>(tableUpdateListener);
}
Expand Down Expand Up @@ -701,12 +725,14 @@ public final void notifyListeners(final TableUpdate update) {
validateUpdateOverlaps(update);
}

lastNotificationStep = currentStep;

// notify children
final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach(
(listenerRef, listener) -> notificationQueue.addNotification(listener.getNotification(update)));
synchronized (this) {
lastNotificationStep = currentStep;

final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach(
(listenerRef, listener) -> notificationQueue.addNotification(listener.getNotification(update)));
}

update.release();
}
Expand Down Expand Up @@ -813,11 +839,14 @@ public final void notifyListenersOnError(final Throwable e, @Nullable final Tabl

isFailed = true;
maybeSignal();
lastNotificationStep = currentStep;

final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach((listenerRef, listener) -> notificationQueue
.addNotification(listener.getErrorNotification(e, sourceEntry)));
synchronized (this) {
lastNotificationStep = currentStep;

final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach((listenerRef, listener) -> notificationQueue
.addNotification(listener.getErrorNotification(e, sourceEntry)));
}
}

/**
Expand Down Expand Up @@ -1261,34 +1290,35 @@ public Table setTotalsTable(String directive) {
}

public static void initializeWithSnapshot(
String logPrefix, SwapListener swapListener, ConstructSnapshot.SnapshotFunction snapshotFunction) {
if (swapListener == null) {
@NotNull final String logPrefix,
@Nullable final ConstructSnapshot.SnapshotControl snapshotControl,
@NotNull final ConstructSnapshot.SnapshotFunction snapshotFunction) {
if (snapshotControl == null) {
snapshotFunction.call(false, LogicalClock.NULL_CLOCK_VALUE);
return;
}
ConstructSnapshot.callDataSnapshotFunction(logPrefix, swapListener.makeSnapshotControl(), snapshotFunction);
ConstructSnapshot.callDataSnapshotFunction(logPrefix, snapshotControl, snapshotFunction);
}

public interface SwapListenerFactory<T extends SwapListener> {
T newListener(BaseTable<?> sourceTable);
public interface SnapshotControlFactory<T extends ConstructSnapshot.SnapshotControl> {
T newControl(BaseTable<?> sourceTable);
}

/**
* If we are a refreshing table, then we should create a swap listener that listens for updates to this table.
*
* If we are a refreshing table, then we should create a snapshot control to validate the snapshot.
* <p>
* Otherwise, we return null.
*
* @return a swap listener for this table (or null)
* @return a snapshot control to snapshot this table (or null)
*/
@Nullable
public <T extends SwapListener> T createSwapListenerIfRefreshing(final SwapListenerFactory<T> factory) {
public <T extends SimpleSnapshotControl> T createSnapshotControlIfRefreshing(
final SnapshotControlFactory<T> factory) {
if (!isRefreshing()) {
return null;
}

final T swapListener = factory.newListener(this);
swapListener.subscribeForUpdates();
return swapListener;
return factory.newControl(this);
}

// ------------------------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ private static Table internalBlinkToAppendOnly(final Table blinkTable, long size
}

final BaseTable<?> baseBlinkTable = (BaseTable<?>) blinkTable.coalesce();
final SwapListener swapListener = baseBlinkTable.createSwapListenerIfRefreshing(SwapListener::new);
final SimpleSnapshotControl snapshotControl =
baseBlinkTable.createSnapshotControlIfRefreshing(SimpleSnapshotControl::new);
// blink tables must tick
Assert.neqNull(swapListener, "swapListener");
Assert.neqNull(snapshotControl, "snapshotControl");

final Mutable<QueryTable> resultHolder = new MutableObject<>();

ConstructSnapshot.callDataSnapshotFunction("blinkToAppendOnly", swapListener.makeSnapshotControl(),
ConstructSnapshot.callDataSnapshotFunction("blinkToAppendOnly", snapshotControl,
(boolean usePrev, long beforeClockValue) -> {
final Map<String, WritableColumnSource<?>> columns = new LinkedHashMap<>();
final Map<String, ? extends ColumnSource<?>> columnSourceMap =
Expand Down Expand Up @@ -119,7 +120,7 @@ private static Table internalBlinkToAppendOnly(final Table blinkTable, long size

Assert.leq(result.size(), "result.size()", sizeLimit, "sizeLimit");

swapListener.setListenerAndResult(new BaseTable.ListenerImpl("streamToAppendOnly",
snapshotControl.setListenerAndResult(new BaseTable.ListenerImpl("streamToAppendOnly",
baseBlinkTable, result) {
@Override
public void onUpdate(TableUpdate upstream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.deephaven.engine.table.impl;

/**
* Used by {@link SwapListener swap listeners} to set the notification step of elements in our DAG.
* Used by {@link SimpleSnapshotControl snapshot control} to set the notification step of elements in our DAG.
*/
public interface NotificationStepReceiver {

Expand Down
Loading

0 comments on commit 78fdc01

Please sign in to comment.