Skip to content

Commit

Permalink
Subscribe to RevisionRecord instead of clock
Browse files Browse the repository at this point in the history
  • Loading branch information
cbaker6 committed Apr 22, 2023
1 parent feb8751 commit db8fa3e
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 28 deletions.
4 changes: 2 additions & 2 deletions Sources/ParseCareKit/Models/RemoteSynchronizing.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ actor RemoteSynchronizing {
self.clock = clock
}

func hasNewerRevision(_ vector: OCKRevisionRecord.KnowledgeVector, for uuid: UUID) -> Bool {
func hasNewerRevision(_ logicalClock: Int, for uuid: UUID) -> Bool {
guard !isSynchronizing else {
return false
}
guard let currentClock = knowledgeVector?.clock(for: uuid) else {
return true
}
return vector.clock(for: uuid) > currentClock
return logicalClock > currentClock
}
}
11 changes: 11 additions & 0 deletions Sources/ParseCareKit/ParseCareKitConstants.swift
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,14 @@ public enum ClockKey {
/// vector key.
public static let vector = "vector"
}

// MARK: RevisionRecord Class
/// Keys for `RevisionRecord` objects. These keys can be used for querying Parse objects.
enum RevisionRecordKey {
/// className key.
static let className = "RevisionRecord"
/// clockUUID key.
static let clockUUID = "clockUUID"
/// logicalClock key.
static let logicalClock = "logicalClock"
}
3 changes: 2 additions & 1 deletion Sources/ParseCareKit/ParseCareKitLog.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ extension Logger {
static let pushRevisions = Logger(subsystem: subsystem, category: "\(category).pushRevisions")
static let syncProgress = Logger(subsystem: subsystem, category: "\(category).syncProgress")
static let clock = Logger(subsystem: subsystem, category: "\(category).clock")
static let clockSubscription = Logger(subsystem: subsystem, category: "\(category).clockSubscription")
static let revisionRecordSubscription = Logger(subsystem: subsystem,
category: "\(category).revisionRecordSubscription")
static let defaultACL = Logger(subsystem: subsystem, category: "\(category).defaultACL")
static let initializer = Logger(subsystem: subsystem, category: "\(category).initializer")
static let deinitializer = Logger(subsystem: subsystem, category: "\(category).deinitializer")
Expand Down
48 changes: 23 additions & 25 deletions Sources/ParseCareKit/ParseRemote.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ public class ParseRemote: OCKRemoteSynchronizable {
public var pckStoreClassesToSynchronize: [PCKStoreClass: any PCKVersionable]!

private weak var parseDelegate: ParseRemoteDelegate?
private var clockSubscription: SubscriptionCallback<PCKClock>?
private var revisionRecordSubscription: SubscriptionCallback<PCKRevisionRecord>?
private var subscribeToServerUpdates: Bool
private let remoteStatus = RemoteSynchronizing()
private let clockQuery: Query<PCKClock>
private let revisionRecordQuery: Query<PCKRevisionRecord>

/**
Creates an instance of ParseRemote.
Expand All @@ -67,7 +67,7 @@ public class ParseRemote: OCKRemoteSynchronizable {
self.pckStoreClassesToSynchronize = try PCKStoreClass.patient.getConcrete()
self.customClassesToSynchronize = nil
self.uuid = uuid
self.clockQuery = PCKClock.query(ClockKey.uuid == uuid)
self.revisionRecordQuery = PCKRevisionRecord.query(RevisionRecordKey.clockUUID == uuid)
self.automaticallySynchronizes = auto
self.subscribeToServerUpdates = subscribeToServerUpdates
if let currentUser = try? await PCKUser.current() {
Expand Down Expand Up @@ -144,7 +144,7 @@ public class ParseRemote: OCKRemoteSynchronizable {
deinit {
Task {
do {
try await clockQuery.unsubscribe()
try await revisionRecordQuery.unsubscribe()
Logger.deinitializer.error("Unsubscribed from clock query")
} catch {
Logger.deinitializer.error("Couldn't unsubscribe from clock query")
Expand Down Expand Up @@ -189,37 +189,36 @@ public class ParseRemote: OCKRemoteSynchronizable {
do {
_ = try await PCKUser.current()
guard self.subscribeToServerUpdates,
self.clockSubscription == nil else {
self.revisionRecordSubscription == nil else {
return
}

do {
self.clockSubscription = try await self.clockQuery.subscribeCallback()
self.clockSubscription?.handleEvent { (_, event) in
self.revisionRecordSubscription = try await self.revisionRecordQuery.subscribeCallback()
self.revisionRecordSubscription?.handleEvent { (_, event) in
switch event {
case .entered(let updatedClock), .updated(let updatedClock):
do {
let updatedVector = try PCKClock.decodeVector(updatedClock)
Task {
guard await self.remoteStatus.hasNewerRevision(updatedVector, for: self.uuid) else {
return
}
self.parseDelegate?.didRequestSynchronization(self)
Logger
.clockSubscription
.log("Parse subscription is notifying that there are updates on the server")
case .entered(let updatedRevision), .updated(let updatedRevision):
guard let logicalClock = updatedRevision.logicalClock else {
Logger
.revisionRecordSubscription
.error("RevisionRecord missing required \"logicalClock\" key: \(updatedRevision)")
return
}
Task {
guard await self.remoteStatus.hasNewerRevision(logicalClock, for: self.uuid) else {
return
}
} catch {
self.parseDelegate?.didRequestSynchronization(self)
Logger
.clockSubscription
.error("Could not decode server clock: \(error)")
.revisionRecordSubscription
.log("Parse remote has updates available")
}
default:
return
}
}
} catch {
Logger.clockSubscription.error("Couldn't subscribe to clock query")
Logger.revisionRecordSubscription.error("Couldn't subscribe to RevisionRecord query")
return
}
} catch {
Expand Down Expand Up @@ -331,8 +330,8 @@ public class ParseRemote: OCKRemoteSynchronizable {
completion(ParseCareKitError.requiredValueCantBeUnwrapped)
return
}

guard await !self.remoteStatus.hasNewerRevision(parseVector, for: self.uuid) else {
let logicalClock = parseVector.clock(for: self.uuid)
guard await !self.remoteStatus.hasNewerRevision(logicalClock, for: self.uuid) else {
let errorString = "New knowledge on server. Pull first then try again"
Logger.pushRevisions.error("\(errorString)")
await self.remoteStatus.notSynchronzing()
Expand All @@ -350,7 +349,6 @@ public class ParseRemote: OCKRemoteSynchronizable {
}

// 8. Push conflict resolutions + local changes to remote
let logicalClock = deviceKnowledge.clock(for: self.uuid)
self.notifyRevisionProgress(0,
total: deviceRevisions.count)

Expand Down

0 comments on commit db8fa3e

Please sign in to comment.