Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RDART-973: Add support for the new progress notifications #1546

Merged
merged 22 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Add better hint to error message, if opening native library fails. (Issue [#1595](https://github.com/realm/realm-dart/issues/1595))
* Added support for specifying schema version on `Configuration.flexibleSync`. This allows you to take advantage of an upcoming server-side feature that will allow schema migrations for synchronized Realms. (Issue [#1599](https://github.com/realm/realm-dart/issues/1599))
* The default base url in `AppConfiguration` has been updated to point to `services.cloud.mongodb.com`. See https://www.mongodb.com/docs/atlas/app-services/domain-migration/ for more information. (Issue [#1549](https://github.com/realm/realm-dart/issues/1549))
* The download progress estimate reported by `Session.getProgressStream` will now return meaningful estimated values, while previously it always returned 1. (Issue [#1564](https://github.com/realm/realm-dart/issues/1564))

### Fixed
* Using valid const, but non-literal expressions, such as negation of numbers, as an initializer would fail. (Issue [#1606](https://github.com/realm/realm-dart/issues/1606))
Expand Down
2 changes: 1 addition & 1 deletion packages/realm_dart/lib/src/native/realm_core.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2595,7 +2595,7 @@ class _RealmCore {
static void _syncProgressCallback(Object userdata, int transferred, int transferable, double estimate) {
final controller = userdata as ProgressNotificationsController;

controller.onProgress(transferred, transferable);
controller.onProgress(estimate);
}

RealmSyncSessionConnectionStateNotificationTokenHandle sessionRegisterConnectionStateNotifier(Session session, SessionConnectionStateController controller) {
Expand Down
4 changes: 2 additions & 2 deletions packages/realm_dart/lib/src/realm_class.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1002,8 +1002,8 @@ class RealmAsyncOpenProgressNotificationsController implements ProgressNotificat
}

@override
void onProgress(int transferredBytes, int transferableBytes) {
_streamController.add(SessionInternal.createSyncProgress(transferredBytes, transferableBytes));
void onProgress(double progressEstimate) {
_streamController.add(SessionInternal.createSyncProgress(progressEstimate));
}

void _start() {
Expand Down
21 changes: 6 additions & 15 deletions packages/realm_dart/lib/src/session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,7 @@ class SyncProgress {
/// value may either increase or decrease as new data needs to be transferred.
final double progressEstimate;

const SyncProgress._({required this.progressEstimate});

static double _calculateProgress({required int transferred, required int transferable}) {
if (transferable == 0 || transferred > transferable) {
return 1;
}

return transferred / transferable;
}
const SyncProgress({required this.progressEstimate});
}

/// A type containing information about the transition of a connection state from one value to another.
Expand Down Expand Up @@ -117,12 +109,11 @@ extension SessionInternal on Session {
realmCore.raiseError(this, errorCode, isFatal);
}

static SyncProgress createSyncProgress(int transferredBytes, int transferableBytes) =>
SyncProgress._(progressEstimate: SyncProgress._calculateProgress(transferred: transferredBytes, transferable: transferableBytes));
static SyncProgress createSyncProgress(double progressEstimate) => SyncProgress(progressEstimate: progressEstimate);
}

abstract interface class ProgressNotificationsController {
void onProgress(int transferredBytes, int transferableBytes);
void onProgress(double progressEstimate);
}

/// @nodoc
Expand All @@ -142,10 +133,10 @@ class SessionProgressNotificationsController implements ProgressNotificationsCon
}

@override
void onProgress(int transferredBytes, int transferableBytes) {
_streamController.add(SyncProgress._(progressEstimate: SyncProgress._calculateProgress(transferred: transferredBytes, transferable: transferableBytes)));
void onProgress(double progressEstimate) {
_streamController.add(SyncProgress(progressEstimate: progressEstimate));

if (transferredBytes >= transferableBytes && _mode == ProgressMode.forCurrentlyOutstandingWork) {
if (progressEstimate >= 1.0 && _mode == ProgressMode.forCurrentlyOutstandingWork) {
_streamController.close();
}
}
Expand Down
12 changes: 6 additions & 6 deletions packages/realm_dart/test/realm_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1328,17 +1328,17 @@ void main() {
final user = await app.logIn(credentials);
final configuration = Configuration.flexibleSync(user, getSyncSchema());

double progress = -1;
double progressEstimate = -1;
final completer = Completer<void>();
var syncedRealm = await getRealmAsync(configuration, onProgressCallback: (syncProgress) {
progress = syncProgress.progressEstimate;
progressEstimate = syncProgress.progressEstimate;
if (syncProgress.progressEstimate == 1.0) {
completer.complete();
}
});
completer.future.timeout(Duration(milliseconds: 300), onTimeout: () => throw Exception("onProgressCallback did not happen."));
expect(syncedRealm.isClosed, false);
expect(progress, greaterThan(-1));
expect(progressEstimate, 1.0);
});

baasTest('Realm.open (flexibleSync) - download a populated realm', (appConfiguration) async {
Expand All @@ -1357,16 +1357,16 @@ void main() {
final config = await _subscribeForAtlasAddedData(app);

int printCount = 0;
double progress = 0;
double progressEstimate = 0;

final syncedRealm = await getRealmAsync(config, onProgressCallback: (syncProgress) {
printCount++;
progress = syncProgress.progressEstimate;
progressEstimate = syncProgress.progressEstimate;
});

expect(syncedRealm.isClosed, false);
expect(printCount, isNot(0));
expect(progress, 1.0);
expect(progressEstimate, 1.0);
});

baasTest('Realm.open (flexibleSync) - listen and cancel download progress of a populated realm', (appConfiguration) async {
Expand Down
91 changes: 80 additions & 11 deletions packages/realm_dart/test/session_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ void main() {
StreamProgressData subscribeToProgress(Realm realm, ProgressDirection direction, ProgressMode mode) {
final data = StreamProgressData();
final stream = realm.syncSession.getProgressStream(direction, mode);

data.subscription = stream.listen((event) {
if (mode == ProgressMode.forCurrentlyOutstandingWork) {
expect(event.progressEstimate, greaterThanOrEqualTo(data.progressEstimate));
Expand Down Expand Up @@ -191,28 +192,97 @@ void main() {

baasTest('SyncSession.getProgressStream forCurrentlyOutstandingWork', (configuration) async {
final differentiator = ObjectId();
final realmA = await getIntegrationRealm(differentiator: differentiator);
final realmB = await getIntegrationRealm(differentiator: differentiator);
final uploadRealm = await getIntegrationRealm(differentiator: differentiator);

for (var i = 0; i < 10; i++) {
realmA.write(() {
realmA.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
uploadRealm.write(() {
uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
});
}

final uploadData = subscribeToProgress(realmA, ProgressDirection.upload, ProgressMode.forCurrentlyOutstandingWork);

await realmA.syncSession.waitForUpload();
final uploadData = subscribeToProgress(uploadRealm, ProgressDirection.upload, ProgressMode.forCurrentlyOutstandingWork);
await uploadRealm.syncSession.waitForUpload();
await validateData(uploadData, expectDone: true);

// Subscribe immediately after the upload to ensure we get the entire upload message as progress notifications
final downloadData = subscribeToProgress(realmB, ProgressDirection.download, ProgressMode.forCurrentlyOutstandingWork);
final downloadRealm = await getIntegrationRealm(differentiator: differentiator, waitForSync: false);
final downloadData = subscribeToProgress(downloadRealm, ProgressDirection.download, ProgressMode.forCurrentlyOutstandingWork);

await downloadRealm.subscriptions.waitForSynchronization();

await downloadRealm.syncSession.waitForDownload();

await validateData(downloadData, expectDone: true);

// We should not see more updates in either direction
final uploadCallbacks = uploadData.callbacksInvoked;
final downloadCallbacks = downloadData.callbacksInvoked;

uploadRealm.write(() {
uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
});

await uploadRealm.syncSession.waitForUpload();
await downloadRealm.syncSession.waitForDownload();

expect(uploadRealm.all<NullableTypes>().length, downloadRealm.all<NullableTypes>().length);
expect(uploadData.callbacksInvoked, uploadCallbacks);
expect(downloadData.callbacksInvoked, downloadCallbacks);
Comment on lines +217 to +230
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate the explanation of this part a bit? Why will we not see more upload callbacks here, when we add and wait for upload?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the subscription we setup is for ProgressMode.forCurrentlyOutstandingWork, which ends when you've reached 100% and no further callbacks will be invoked as more data comes in/goes out.


await uploadData.subscription.cancel();
await downloadData.subscription.cancel();
});

baasTest('SyncSession.getProgressStream after reconnecting', (configuration) async {
final differentiator = ObjectId();
final uploadRealm = await getIntegrationRealm(differentiator: differentiator);

// Make sure we've caught up, then close the Realm. We'll reopen it later and verify that progress notifications
// are delivered. This is different from "SyncSession.getProgressStream forCurrentlyOutstandingWork" where we're
// testing notifications after change of query.
final user = await getIntegrationUser(appConfig: configuration);
final config = getIntegrationConfig(user);
var downloadRealm = getRealm(config);
downloadRealm.subscriptions.update((mutableSubscriptions) {
mutableSubscriptions.add(downloadRealm.query<NullableTypes>(r'differentiator = $0', [differentiator]));
});

await downloadRealm.subscriptions.waitForSynchronization();
downloadRealm.close();

for (var i = 0; i < 10; i++) {
uploadRealm.write(() {
uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
});
}

final uploadData = subscribeToProgress(uploadRealm, ProgressDirection.upload, ProgressMode.forCurrentlyOutstandingWork);
await uploadRealm.syncSession.waitForUpload();
await validateData(uploadData, expectDone: true);

await realmB.syncSession.waitForDownload();
// Reopen the download realm and subscribe for notifications - those should still be delivered as normal.
downloadRealm = Realm(getIntegrationConfig(user));
final downloadData = subscribeToProgress(downloadRealm, ProgressDirection.download, ProgressMode.forCurrentlyOutstandingWork);

await downloadRealm.syncSession.waitForDownload();

await validateData(downloadData, expectDone: true);

// We should not see more updates in either direction
final uploadCallbacks = uploadData.callbacksInvoked;
final downloadCallbacks = downloadData.callbacksInvoked;

uploadRealm.write(() {
uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
});

await uploadRealm.syncSession.waitForUpload();
await downloadRealm.syncSession.waitForDownload();

expect(uploadRealm.all<NullableTypes>().length, downloadRealm.all<NullableTypes>().length);
expect(uploadData.callbacksInvoked, uploadCallbacks);
expect(downloadData.callbacksInvoked, downloadCallbacks);

await uploadData.subscription.cancel();
await downloadData.subscription.cancel();
});
Expand Down Expand Up @@ -256,7 +326,6 @@ void main() {
expect(downloadData.progressEstimate, 1.0);

expect(uploadData.callbacksInvoked, greaterThan(uploadSnapshot.callbacksInvoked));

expect(downloadData.callbacksInvoked, greaterThan(downloadSnapshot.callbacksInvoked));

await uploadData.subscription.cancel();
Expand Down Expand Up @@ -321,7 +390,7 @@ class StreamProgressData {
bool doneInvoked;
late StreamSubscription<SyncProgress> subscription;

StreamProgressData({this.progressEstimate = 0, this.callbacksInvoked = 0, this.doneInvoked = false});
StreamProgressData({this.progressEstimate = -1, this.callbacksInvoked = 0, this.doneInvoked = false});

StreamProgressData.snapshot(StreamProgressData other)
: this(callbacksInvoked: other.callbacksInvoked, doneInvoked: other.doneInvoked, progressEstimate: other.progressEstimate);
Expand Down
15 changes: 11 additions & 4 deletions packages/realm_dart/test/test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ void setupTests() {

Realm.logger.setLogLevel(LogLevel.detail);
Realm.logger.onRecord.listen((record) {
testing.printOnFailure('${record.category} ${record.level.name}: ${record.message}');
testing.printOnFailure('${DateTime.now().toUtc()} ${record.category} ${record.level.name}: ${record.message}');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

});

// Enable this to print platform info, including current PID
Expand Down Expand Up @@ -606,17 +606,24 @@ Future<User> getAnonymousUser(App app) {
return app.logIn(Credentials.anonymous(reuseCredentials: false));
}

Future<Realm> getIntegrationRealm({App? app, ObjectId? differentiator, AppConfiguration? appConfig}) async {
FlexibleSyncConfiguration getIntegrationConfig(User user) {
return Configuration.flexibleSync(user, getSyncSchema())..sessionStopPolicy = SessionStopPolicy.immediately;
}

Future<Realm> getIntegrationRealm({App? app, ObjectId? differentiator, AppConfiguration? appConfig, bool waitForSync = true}) async {
app ??= App(appConfig ?? await baasHelper!.getAppConfig());
final user = await getIntegrationUser(app: app, appConfig: appConfig);

final config = Configuration.flexibleSync(user, getSyncSchema())..sessionStopPolicy = SessionStopPolicy.immediately;
final config = getIntegrationConfig(user);
final realm = getRealm(config);
if (differentiator != null) {
realm.subscriptions.update((mutableSubscriptions) {
mutableSubscriptions.add(realm.query<NullableTypes>(r'differentiator = $0', [differentiator]));
});

await realm.subscriptions.waitForSynchronization();
if (waitForSync) {
await realm.subscriptions.waitForSynchronization();
}
}

return realm;
Expand Down
Loading