Skip to content

Commit

Permalink
Add retry functionality to firestore streams (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
fum36205 authored Feb 22, 2024
1 parent 0e9e1ff commit 4c586dd
Showing 1 changed file with 110 additions and 95 deletions.
205 changes: 110 additions & 95 deletions lib/firestore/firestore_gateway.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,79 +11,12 @@ import '../firedart.dart';
typedef RequestAuthenticator = Future<void>? Function(
Map<String, String> metadata, String uri);

class _FirestoreGatewayStreamCache {
void Function(String userInfo)? onDone;
String userInfo;
void Function(Object e) onError;

StreamController<ListenRequest>? _listenRequestStreamController;
late StreamController<ListenResponse> _listenResponseStreamController;
late Map<String, Document> _documentMap;

late bool _shouldCleanup;

Stream<ListenResponse> get stream => _listenResponseStreamController.stream;

Map<String, Document> get documentMap => _documentMap;

_FirestoreGatewayStreamCache(
{this.onDone, required this.userInfo, Function(Object e)? onError})
: onError = onError ?? _handleErrorStub;

void setListenRequest(
ListenRequest request, FirestoreClient client, String database) {
// Close the request stream if this function is called for a second time;
_listenRequestStreamController?.close();

_documentMap = <String, Document>{};
_listenRequestStreamController = StreamController<ListenRequest>();
_listenResponseStreamController =
StreamController<ListenResponse>.broadcast(
onListen: _handleListenOnResponseStream,
onCancel: _handleCancelOnResponseStream);
_listenResponseStreamController.addStream(client
.listen(_listenRequestStreamController!.stream,
options: CallOptions(
metadata: {'google-cloud-resource-prefix': database}))
.handleError(onError));
_listenRequestStreamController!.add(request);
}

void close() {
_listenRequestStreamController?.close();
_listenResponseStreamController.close();
}

void _handleListenOnResponseStream() {
_shouldCleanup = false;
}

void _handleCancelOnResponseStream() {
// Clean this up in the future
_shouldCleanup = true;
Future.microtask(_handleDone);
}

void _handleDone() {
if (!_shouldCleanup) {
return;
}
onDone?.call(userInfo);
// Clean up stream resources
_listenRequestStreamController!.close();
}

static void _handleErrorStub(e) {
throw e;
}
}

class FirestoreGateway {
final RequestAuthenticator? _authenticator;

final String database;

final Map<String, _FirestoreGatewayStreamCache> _listenRequestStreamMap;
final Map<String, _ListenStreamWrapper> _listenStreamCache;

late FirestoreClient _client;

Expand All @@ -97,7 +30,7 @@ class FirestoreGateway {
}) : _authenticator = authenticator,
database =
'projects/$projectId/databases/${databaseId ?? '(default)'}/documents',
_listenRequestStreamMap = <String, _FirestoreGatewayStreamCache>{} {
_listenStreamCache = <String, _ListenStreamWrapper>{} {
_setupClient(emulator: emulator);
}

Expand All @@ -116,8 +49,8 @@ class FirestoreGateway {
}

Stream<List<Document>> streamCollection(String path) {
if (_listenRequestStreamMap.containsKey(path)) {
return _mapCollectionStream(_listenRequestStreamMap[path]!);
if (_listenStreamCache.containsKey(path)) {
return _mapCollectionStream(_listenStreamCache[path]!);
}

var selector = StructuredQuery_CollectionSelector()
Expand All @@ -131,13 +64,14 @@ class FirestoreGateway {
..database = database
..addTarget = target;

final listenRequestStream = _FirestoreGatewayStreamCache(
onDone: _handleDone, userInfo: path, onError: _handleError);
_listenRequestStreamMap[path] = listenRequestStream;

listenRequestStream.setListenRequest(request, _client, database);
_listenStreamCache[path] = _ListenStreamWrapper.create(
request,
(requestStream) => _client.listen(requestStream,
options: CallOptions(
metadata: {'google-cloud-resource-prefix': database})),
onDone: () => _listenStreamCache.remove(path));

return _mapCollectionStream(listenRequestStream);
return _mapCollectionStream(_listenStreamCache[path]!);
}

Future<Document> createDocument(
Expand Down Expand Up @@ -184,8 +118,8 @@ class FirestoreGateway {
.catchError(_handleError);

Stream<Document?> streamDocument(String path) {
if (_listenRequestStreamMap.containsKey(path)) {
return _mapDocumentStream(_listenRequestStreamMap[path]!);
if (_listenStreamCache.containsKey(path)) {
return _mapDocumentStream(_listenStreamCache[path]!.stream);
}

final documentsTarget = Target_DocumentsTarget()..documents.add(path);
Expand All @@ -194,13 +128,14 @@ class FirestoreGateway {
..database = database
..addTarget = target;

final listenRequestStream = _FirestoreGatewayStreamCache(
onDone: _handleDone, userInfo: path, onError: _handleError);
_listenRequestStreamMap[path] = listenRequestStream;

listenRequestStream.setListenRequest(request, _client, database);
_listenStreamCache[path] = _ListenStreamWrapper.create(
request,
(requestStream) => _client.listen(requestStream,
options: CallOptions(
metadata: {'google-cloud-resource-prefix': database})),
onDone: () => _listenStreamCache.remove(path));

return _mapDocumentStream(listenRequestStream);
return _mapDocumentStream(_listenStreamCache[path]!.stream);
}

Future<List<Document>> runQuery(
Expand All @@ -216,16 +151,16 @@ class FirestoreGateway {
}

void close() {
_listenRequestStreamMap.forEach((_, stream) => stream.close());
_listenRequestStreamMap.clear();
_listenStreamCache.forEach((_, stream) => stream.close());
_listenStreamCache.clear();
_channel.shutdown();
}

void _setupClient({Emulator? emulator}) {
final callOptions = _authenticator != null
? CallOptions(providers: [_authenticator!])
: null;
_listenRequestStreamMap.clear();
_listenStreamCache.clear();
_channel = emulator == null
? ClientChannel(
'firestore.googleapis.com',
Expand Down Expand Up @@ -259,12 +194,8 @@ class FirestoreGateway {
throw e;
}

void _handleDone(String path) {
_listenRequestStreamMap.remove(path);
}

Stream<List<Document>> _mapCollectionStream(
_FirestoreGatewayStreamCache listenRequestStream) {
_ListenStreamWrapper listenRequestStream) {
return listenRequestStream.stream
.where((response) =>
response.hasDocumentChange() ||
Expand All @@ -283,8 +214,8 @@ class FirestoreGateway {
}

Stream<Document?> _mapDocumentStream(
_FirestoreGatewayStreamCache listenRequestStream) {
return listenRequestStream.stream
Stream<ListenResponse> listenRequestStream) {
return listenRequestStream
.where((response) =>
response.hasDocumentChange() ||
response.hasDocumentRemove() ||
Expand All @@ -294,3 +225,87 @@ class FirestoreGateway {
: null);
}
}

/// The number of retries to attempt when a stream throws an error.
const maxStreamReconnectRetries = 5;

class _ListenStreamWrapper {
final void Function() onDone;

final _errors = <_ErrorAndStackTrace>[];
final ListenRequest _listenRequest;
final Stream<ListenResponse> Function(Stream<ListenRequest>)
responseStreamFactory;
late StreamSubscription<ListenResponse>? _responseStreamSubscription;
late StreamController<ListenRequest> _listenRequestStreamController;
late StreamController<ListenResponse> _listenResponseStreamController;
late Map<String, Document> _documentMap;

Stream<ListenResponse> get stream => _listenResponseStreamController.stream;

Map<String, Document> get documentMap => _documentMap;

_ListenStreamWrapper.create(this._listenRequest, this.responseStreamFactory,
{required this.onDone}) {
_documentMap = <String, Document>{};
_listenResponseStreamController =
StreamController<ListenResponse>.broadcast(
onListen: () {
// Only when the response stream is listened to, we start the request stream.
_retry();
},
onCancel: () {
// We close the request stream if there are no more listeners to the response stream.
_errors.clear();
_responseStreamSubscription?.cancel();
close();
},
);
}

void _retry() {
_listenRequestStreamController = StreamController<ListenRequest>();
final responseStream = responseStreamFactory(
_listenRequestStreamController.stream,
);

_responseStreamSubscription = responseStream.listen(
(value) {
// When we receive a new event, we reset the errors, because
// max connection retries are only incremented for consecutive errors.
_errors.clear();
_listenResponseStreamController.add(value);
},
onDone: _listenResponseStreamController.close,
onError: (error, stackTrace) {
_responseStreamSubscription!.cancel();
_responseStreamSubscription = null;

_errors.add(_ErrorAndStackTrace(error, stackTrace));

if (_errors.length == maxStreamReconnectRetries) {
for (var e in _errors) {
_listenResponseStreamController.addError(e.error, e.stackTrace);
}
close();
} else {
_retry();
}
},
);
_listenRequestStreamController.add(_listenRequest);
}

void close() {
_listenRequestStreamController.close();
_listenResponseStreamController.close();
onDone();
}
}

class _ErrorAndStackTrace {
final Object error;
final StackTrace? stackTrace;

_ErrorAndStackTrace(this.error, this.stackTrace);
}

0 comments on commit 4c586dd

Please sign in to comment.