From 29acd6ba6a1dfa4d702be4b77c36df23890674f8 Mon Sep 17 00:00:00 2001 From: Dave Kichler Date: Sun, 2 Jun 2024 15:20:20 -0400 Subject: [PATCH 1/6] feat: Add configuration allowItemToFail to poll mode, and add a gate around each access_token/item update to allow it to fail without failing the entire poll sync --- .../sync/PolledSyncRunner.kt | 25 +++++++++++++------ src/main/resources/application.yml | 3 +++ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt b/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt index aee4f38..073f13a 100644 --- a/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt +++ b/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt @@ -40,6 +40,8 @@ class PolledSyncRunner( private val cursorFileDirectoryPath: String, @Value("\${fireflyPlaidConnector2.plaid.batchSize}") private val plaidBatchSize: Int, + @Value("false") + private val allowItemToFail: Boolean, private val plaidApiWrapper: PlaidApiWrapper, private val fireflyTxApi: TransactionsApi, @@ -152,11 +154,20 @@ class PolledSyncRunner( * * In sync mode we fetch and retain all Plaid transactions that have changed since the last poll. */ - val response = executeTransactionSyncRequest( - accessToken, - cursorMap[accessToken], - plaidBatchSize - ) + var response: TransactionsSyncResponse? = null; + try { + response = executeTransactionSyncRequest( + accessToken, + cursorMap[accessToken], + plaidBatchSize + ) + } catch (cre: ClientRequestException) { + if (allowItemToFail) { + logger.warn("Querying transactions for access token $accessToken failed, continuing on to the next access token") + continue; + } else throw cre + } + cursorMap[accessToken] = response.nextCursor logger.debug( "Received batch of sync updates for access token $accessToken: " + @@ -172,7 +183,7 @@ class PolledSyncRunner( plaidDeletedTxs.addAll(response.removed.mapNotNull { it.transactionId }) // Keep going until we get all the transactions - } while (response.hasMore) + } while (response?.hasMore == true) } /** * Don't write the cursor map here, wait until after we've successfully committed the transactions @@ -333,4 +344,4 @@ class PolledSyncRunner( terminated.set(true) mainJob.cancel() } -} \ No newline at end of file +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 49e75bc..8e177cc 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -33,6 +33,9 @@ fireflyPlaidConnector2: # This path needs to be writeable by the application's user and needs to be persistent. If you're using Docker, # you may want to use a volume or bind mount for this so that cursor state is persisted between runs. cursorFileDirectoryPath: persistence/ + # Configuration element specifying whether the remainder of the poll sync session should continue if one item + # has failed to fetch from PLaid for any reason. + allowItemToFail: false batch: # The number of days in the past to pull data for. maxSyncDays: 5 From 2fe8112a671c23dd9747883f86192d32b138cdf1 Mon Sep 17 00:00:00 2001 From: Dave Kichler Date: Wed, 5 Jun 2024 20:26:03 -0400 Subject: [PATCH 2/6] feat: Fixing autoconfig reference for new allowItemToFail property, moving gate for allowing item updates to fail inside the suspend function to avoid crashing the outer coroutine --- .../sync/PolledSyncRunner.kt | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt b/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt index 073f13a..1249396 100644 --- a/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt +++ b/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt @@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.stereotype.Component import java.time.LocalDate +import java.util.Collections import java.util.concurrent.atomic.AtomicBoolean import kotlin.io.path.Path import kotlin.time.Duration.Companion.minutes @@ -40,7 +41,7 @@ class PolledSyncRunner( private val cursorFileDirectoryPath: String, @Value("\${fireflyPlaidConnector2.plaid.batchSize}") private val plaidBatchSize: Int, - @Value("false") + @Value("\${fireflyPlaidConnector2.polled.allowItemToFail}") private val allowItemToFail: Boolean, private val plaidApiWrapper: PlaidApiWrapper, @@ -154,19 +155,11 @@ class PolledSyncRunner( * * In sync mode we fetch and retain all Plaid transactions that have changed since the last poll. */ - var response: TransactionsSyncResponse? = null; - try { - response = executeTransactionSyncRequest( - accessToken, - cursorMap[accessToken], - plaidBatchSize - ) - } catch (cre: ClientRequestException) { - if (allowItemToFail) { - logger.warn("Querying transactions for access token $accessToken failed, continuing on to the next access token") - continue; - } else throw cre - } + val response = executeTransactionSyncRequest( + accessToken, + cursorMap[accessToken], + plaidBatchSize + ) cursorMap[accessToken] = response.nextCursor logger.debug( @@ -183,7 +176,7 @@ class PolledSyncRunner( plaidDeletedTxs.addAll(response.removed.mapNotNull { it.transactionId }) // Keep going until we get all the transactions - } while (response?.hasMore == true) + } while (response.hasMore) } /** * Don't write the cursor map here, wait until after we've successfully committed the transactions @@ -335,7 +328,10 @@ class PolledSyncRunner( ).body() } catch (cre: ClientRequestException) { logger.error("Error requesting Plaid transactions. Request: $request; ") - throw cre + if (allowItemToFail) { + logger.warn("Querying transactions for access token $accessToken failed, allowing failure and continuing on to the next access token") + return emptyPlaidResponse() + } else throw cre } } @@ -344,4 +340,16 @@ class PolledSyncRunner( terminated.set(true) mainJob.cancel() } + + companion object { + fun emptyPlaidResponse(): TransactionsSyncResponse = + TransactionsSyncResponse( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "", + false, + "" + ) + } } From e75a6d725f2d99f50175db413fd882f56d0540af Mon Sep 17 00:00:00 2001 From: Dave Kichler Date: Tue, 18 Jun 2024 22:11:34 -0400 Subject: [PATCH 3/6] chore: Add inline default for new polled.allowItemToFail config key Co-authored-by: Dan van Kley --- .../net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt b/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt index 1249396..411d1a5 100644 --- a/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt +++ b/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt @@ -41,7 +41,7 @@ class PolledSyncRunner( private val cursorFileDirectoryPath: String, @Value("\${fireflyPlaidConnector2.plaid.batchSize}") private val plaidBatchSize: Int, - @Value("\${fireflyPlaidConnector2.polled.allowItemToFail}") + @Value("\${fireflyPlaidConnector2.polled.allowItemToFail:false}") private val allowItemToFail: Boolean, private val plaidApiWrapper: PlaidApiWrapper, From 4dfaee003e58e45867563df035c8eb1e0a85f769 Mon Sep 17 00:00:00 2001 From: Dave Kichler Date: Tue, 18 Jun 2024 22:17:29 -0400 Subject: [PATCH 4/6] chore: Refactor executeTransactionSyncRequest to return null in cases where an item is allowed to fail rather than an emtpy response --- .../sync/PolledSyncRunner.kt | 45 +++++++++---------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt b/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt index 411d1a5..7cac231 100644 --- a/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt +++ b/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt @@ -84,12 +84,14 @@ class PolledSyncRunner( executeTransactionSyncRequest(accessToken, cursorMap[accessToken], plaidBatchSize) logger.debug( "Received initial batch of sync updates for access token $accessToken. " + - "Updating cursor map to next cursor: ${response.nextCursor}" + "Updating cursor map to next cursor: ${response?.nextCursor}" ) - if (response.nextCursor.isNotBlank()) { - cursorMap[accessToken] = response.nextCursor + response?.nextCursor?.let { + if (it.isNotEmpty()) { + cursorMap[accessToken] = it + } } - } while (response.hasMore) + } while (response?.hasMore == true) } writeCursorMap(cursorMap) @@ -161,22 +163,27 @@ class PolledSyncRunner( plaidBatchSize ) - cursorMap[accessToken] = response.nextCursor + response?.nextCursor?.let { + cursorMap[accessToken] = it + } + logger.debug( "Received batch of sync updates for access token $accessToken: " + - "${response.added.size} created; ${response.modified.size} updated; " + - "${response.removed.size} deleted; next cursor ${response.nextCursor}" + "${response?.added?.size} created; ${response?.modified?.size} updated; " + + "${response?.removed?.size} deleted; next cursor ${response?.nextCursor}" ) /** * The transaction sync endpoint doesn't take accountId as a parameter, so do that filtering here */ - plaidCreatedTxs.addAll(response.added.filter { accountIdSet.contains(it.accountId) }) - plaidUpdatedTxs.addAll(response.modified.filter { accountIdSet.contains(it.accountId) }) - plaidDeletedTxs.addAll(response.removed.mapNotNull { it.transactionId }) + response?.added?.filter { accountIdSet.contains(it.accountId) } + ?.let { plaidCreatedTxs.addAll(it) } + response?.modified?.filter { accountIdSet.contains(it.accountId) } + ?.let { plaidUpdatedTxs.addAll(it) } + response?.removed?.mapNotNull { it.transactionId }?.let { plaidDeletedTxs.addAll(it) } // Keep going until we get all the transactions - } while (response.hasMore) + } while (response?.hasMore == true) } /** * Don't write the cursor map here, wait until after we've successfully committed the transactions @@ -319,7 +326,7 @@ class PolledSyncRunner( accessToken: PlaidAccessToken, cursor: PlaidSyncCursor?, plaidBatchSize: Int - ): TransactionsSyncResponse { + ): TransactionsSyncResponse? { val request = getTransactionSyncRequest(accessToken, cursor, plaidBatchSize) try { return plaidApiWrapper.executeRequest( @@ -330,7 +337,7 @@ class PolledSyncRunner( logger.error("Error requesting Plaid transactions. Request: $request; ") if (allowItemToFail) { logger.warn("Querying transactions for access token $accessToken failed, allowing failure and continuing on to the next access token") - return emptyPlaidResponse() + return null } else throw cre } } @@ -340,16 +347,4 @@ class PolledSyncRunner( terminated.set(true) mainJob.cancel() } - - companion object { - fun emptyPlaidResponse(): TransactionsSyncResponse = - TransactionsSyncResponse( - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - "", - false, - "" - ) - } } From ab3cab49016a880c0c97d0bd77c5c88c74b3eaf0 Mon Sep 17 00:00:00 2001 From: Dave Kichler Date: Wed, 19 Jun 2024 21:46:51 -0400 Subject: [PATCH 5/6] chore: Introduce loop labels and continue statements to short-circuit on failed Plaid request instead of excessive null-safe calls. Fix typo in application.yml describing new allowItemToFail property --- .../sync/PolledSyncRunner.kt | 35 +++++++++---------- src/main/resources/application.yml | 2 +- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt b/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt index 7cac231..174a884 100644 --- a/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt +++ b/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt @@ -67,7 +67,7 @@ class PolledSyncRunner( val cursorMap = readCursorMap() val (accountMap, accountAccessTokenSequence) = syncHelper.getAllPlaidAccessTokenAccountIdSets() logger.debug("Beginning Plaid sync endpoint cursor initialization") - for ((accessToken, _) in accountAccessTokenSequence) { + cursorCatchupLoop@ for ((accessToken, _) in accountAccessTokenSequence) { /** * If we already have a cursor for this access token, then move on */ @@ -82,16 +82,17 @@ class PolledSyncRunner( do { val response = executeTransactionSyncRequest(accessToken, cursorMap[accessToken], plaidBatchSize) + ?: continue@cursorCatchupLoop logger.debug( "Received initial batch of sync updates for access token $accessToken. " + "Updating cursor map to next cursor: ${response?.nextCursor}" ) - response?.nextCursor?.let { - if (it.isNotEmpty()) { - cursorMap[accessToken] = it - } + + if (response.nextCursor.isNotEmpty()) { + cursorMap[accessToken] = response.nextCursor } - } while (response?.hasMore == true) + + } while (response.hasMore) } writeCursorMap(cursorMap) @@ -142,7 +143,7 @@ class PolledSyncRunner( val plaidUpdatedTxs = mutableListOf() val plaidDeletedTxs = mutableListOf() - for ((accessToken, accountIds) in accountAccessTokenSequence) { + accessTokenLoop@ for ((accessToken, accountIds) in accountAccessTokenSequence) { logger.debug( "Querying Plaid transaction sync endpoint for access token $accessToken " + " and account ids ${accountIds.joinToString("; ")}" @@ -161,29 +162,25 @@ class PolledSyncRunner( accessToken, cursorMap[accessToken], plaidBatchSize - ) + ) ?: continue@accessTokenLoop - response?.nextCursor?.let { - cursorMap[accessToken] = it - } + cursorMap[accessToken] = response.nextCursor logger.debug( "Received batch of sync updates for access token $accessToken: " + - "${response?.added?.size} created; ${response?.modified?.size} updated; " + - "${response?.removed?.size} deleted; next cursor ${response?.nextCursor}" + "${response.added.size} created; ${response.modified.size} updated; " + + "${response.removed.size} deleted; next cursor ${response.nextCursor}" ) /** * The transaction sync endpoint doesn't take accountId as a parameter, so do that filtering here */ - response?.added?.filter { accountIdSet.contains(it.accountId) } - ?.let { plaidCreatedTxs.addAll(it) } - response?.modified?.filter { accountIdSet.contains(it.accountId) } - ?.let { plaidUpdatedTxs.addAll(it) } - response?.removed?.mapNotNull { it.transactionId }?.let { plaidDeletedTxs.addAll(it) } + plaidCreatedTxs.addAll(response.added.filter { accountIdSet.contains(it.accountId) }) + plaidUpdatedTxs.addAll(response.modified.filter { accountIdSet.contains(it.accountId) }) + plaidDeletedTxs.addAll(response.removed.mapNotNull { it.transactionId }) // Keep going until we get all the transactions - } while (response?.hasMore == true) + } while (response.hasMore) } /** * Don't write the cursor map here, wait until after we've successfully committed the transactions diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8e177cc..53a29e4 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -34,7 +34,7 @@ fireflyPlaidConnector2: # you may want to use a volume or bind mount for this so that cursor state is persisted between runs. cursorFileDirectoryPath: persistence/ # Configuration element specifying whether the remainder of the poll sync session should continue if one item - # has failed to fetch from PLaid for any reason. + # has failed to fetch from Plaid for any reason. allowItemToFail: false batch: # The number of days in the past to pull data for. From 706d0d344dd2f3fd6c5a18fa57f0b8037800a037 Mon Sep 17 00:00:00 2001 From: Dave Kichler Date: Wed, 19 Jun 2024 21:55:45 -0400 Subject: [PATCH 6/6] chore: Cleaning up whitespace, imports --- .../fireflyPlaidConnector2/sync/PolledSyncRunner.kt | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt b/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt index 174a884..93e93ce 100644 --- a/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt +++ b/src/main/kotlin/net/djvk/fireflyPlaidConnector2/sync/PolledSyncRunner.kt @@ -1,8 +1,6 @@ package net.djvk.fireflyPlaidConnector2.sync -import io.ktor.client.call.* import io.ktor.client.plugins.* -import io.ktor.http.* import kotlinx.coroutines.* import net.djvk.fireflyPlaidConnector2.api.firefly.apis.TransactionsApi import net.djvk.fireflyPlaidConnector2.api.firefly.models.TransactionRead @@ -16,7 +14,6 @@ import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.stereotype.Component import java.time.LocalDate -import java.util.Collections import java.util.concurrent.atomic.AtomicBoolean import kotlin.io.path.Path import kotlin.time.Duration.Companion.minutes @@ -85,13 +82,11 @@ class PolledSyncRunner( ?: continue@cursorCatchupLoop logger.debug( "Received initial batch of sync updates for access token $accessToken. " + - "Updating cursor map to next cursor: ${response?.nextCursor}" + "Updating cursor map to next cursor: ${response.nextCursor}" ) - - if (response.nextCursor.isNotEmpty()) { + if (response.nextCursor.isNotBlank()) { cursorMap[accessToken] = response.nextCursor } - } while (response.hasMore) } writeCursorMap(cursorMap) @@ -163,9 +158,7 @@ class PolledSyncRunner( cursorMap[accessToken], plaidBatchSize ) ?: continue@accessTokenLoop - cursorMap[accessToken] = response.nextCursor - logger.debug( "Received batch of sync updates for access token $accessToken: " + "${response.added.size} created; ${response.modified.size} updated; " +