From 29d01bf8492b06bfe28cbbbea6c64f9465a7e243 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 9 May 2024 10:57:18 -0400 Subject: [PATCH] feat: Add migration op tracker --- launchdarkly-server-sdk/Cargo.toml | 2 +- launchdarkly-server-sdk/src/client.rs | 12 +- launchdarkly-server-sdk/src/events/event.rs | 22 +- launchdarkly-server-sdk/src/lib.rs | 3 + launchdarkly-server-sdk/src/migrations/mod.rs | 49 +++ .../src/migrations/tracker.rs | 380 ++++++++++++++++++ 6 files changed, 465 insertions(+), 3 deletions(-) create mode 100644 launchdarkly-server-sdk/src/migrations/mod.rs create mode 100644 launchdarkly-server-sdk/src/migrations/tracker.rs diff --git a/launchdarkly-server-sdk/Cargo.toml b/launchdarkly-server-sdk/Cargo.toml index 09d2737..16a15e3 100644 --- a/launchdarkly-server-sdk/Cargo.toml +++ b/launchdarkly-server-sdk/Cargo.toml @@ -23,7 +23,7 @@ lazy_static = "1.4.0" log = "0.4.14" lru = { version = "0.12.0", default-features = false } ring = "0.17.5" -launchdarkly-server-sdk-evaluation = "1.2.0" +launchdarkly-server-sdk-evaluation = { git = "https://github.com/launchdarkly/rust-server-sdk-evaluation", branch = "mk/sc-243562/detail-serialize" } serde = { version = "1.0.132", features = ["derive"] } serde_json = { version = "1.0.73", features = ["float_roundtrip"] } thiserror = "1.0" diff --git a/launchdarkly-server-sdk/src/client.rs b/launchdarkly-server-sdk/src/client.rs index 4e3b70c..c302aa8 100644 --- a/launchdarkly-server-sdk/src/client.rs +++ b/launchdarkly-server-sdk/src/client.rs @@ -17,8 +17,8 @@ use super::evaluation::{FlagDetail, FlagDetailConfig}; use super::stores::store::DataStore; use super::stores::store_builders::BuildError as DataStoreError; use crate::config::BuildError as ConfigBuildError; -use crate::events::event::EventFactory; use crate::events::event::InputEvent; +use crate::events::event::{EventFactory, MigrationOpEvent}; use crate::events::processor::EventProcessor; use crate::events::processor_builders::BuildError as EventProcessorError; @@ -678,6 +678,16 @@ impl Client { Ok(()) } + /// track_migration_op_reports a migration operation event. + /// + /// The measurements included in the event are used by LaunchDarkly to enhance support and + /// visibility during migration-assisted technology migrations. + /// + /// Migration operation events can be created with a [crate::MigrationOpTracker]. + pub fn track_migration_op(_event: MigrationOpEvent) { + // TODO: Implement in a future commit. + } + fn variation_internal + Clone>( &self, context: &Context, diff --git a/launchdarkly-server-sdk/src/events/event.rs b/launchdarkly-server-sdk/src/events/event.rs index 19442de..2a5d756 100644 --- a/launchdarkly-server-sdk/src/events/event.rs +++ b/launchdarkly-server-sdk/src/events/event.rs @@ -1,6 +1,7 @@ use std::cmp::{max, min}; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Display, Formatter}; +use std::time::Duration; use launchdarkly_server_sdk_evaluation::{ Context, ContextAttributes, Detail, Flag, FlagValue, Kind, Reason, Reference, VariationIndex, @@ -8,6 +9,8 @@ use launchdarkly_server_sdk_evaluation::{ use serde::ser::SerializeStruct; use serde::{Serialize, Serializer}; +use crate::migrations::{Operation, Origin, Stage}; + #[derive(Clone, Debug, PartialEq)] pub struct BaseEvent { pub creation_date: u64, @@ -92,6 +95,23 @@ impl BaseEvent { } } +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +/// MigrationOpEventData is generated through the migration op tracker provided through the SDK. +pub struct MigrationOpEvent { + #[serde(flatten)] + pub(crate) base: BaseEvent, + pub(crate) flag: Flag, + pub(crate) operation: Operation, + pub(crate) default_stage: Stage, + pub(crate) evaluation: Detail, + pub(crate) invoked: HashSet, + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) consistency_check: Option, + pub(crate) errors: HashSet, + pub(crate) latency: HashMap, +} + #[derive(Clone, Debug, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct FeatureRequestEvent { @@ -290,7 +310,7 @@ impl EventFactory { Self { send_reason } } - fn now() -> u64 { + pub(crate) fn now() -> u64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() diff --git a/launchdarkly-server-sdk/src/lib.rs b/launchdarkly-server-sdk/src/lib.rs index af46c8a..e2df15d 100644 --- a/launchdarkly-server-sdk/src/lib.rs +++ b/launchdarkly-server-sdk/src/lib.rs @@ -32,6 +32,7 @@ pub use data_source_builders::{ BuildError as DataSourceBuildError, PollingDataSourceBuilder, StreamingDataSourceBuilder, }; pub use evaluation::{FlagDetail, FlagDetailConfig}; +pub use events::event::MigrationOpEvent; pub use events::processor::EventProcessor; pub use events::processor_builders::{ BuildError as EventProcessorBuildError, EventProcessorBuilder, NullEventProcessorBuilder, @@ -40,6 +41,7 @@ pub use feature_requester_builders::{ BuildError as FeatureRequestBuilderError, FeatureRequesterFactory, }; pub use launchdarkly_server_sdk_evaluation::{Flag, Segment, Versioned}; +pub use migrations::{MigrationOpTracker, Operation, Origin, Stage}; pub use service_endpoints::ServiceEndpointsBuilder; pub use stores::persistent_store::{PersistentDataStore, PersistentStoreError}; pub use stores::persistent_store_builders::{ @@ -56,6 +58,7 @@ mod evaluation; mod events; mod feature_requester; mod feature_requester_builders; +mod migrations; mod reqwest; mod service_endpoints; mod stores; diff --git a/launchdarkly-server-sdk/src/migrations/mod.rs b/launchdarkly-server-sdk/src/migrations/mod.rs new file mode 100644 index 0000000..6c5ef3b --- /dev/null +++ b/launchdarkly-server-sdk/src/migrations/mod.rs @@ -0,0 +1,49 @@ +use serde::Serialize; + +#[non_exhaustive] +#[derive(Debug, Clone, Serialize, Eq, Hash, PartialEq)] +#[serde(rename_all = "lowercase")] +/// Origin represents the source of origin for a migration-related operation. +pub enum Origin { + /// Old represents the technology source we are migrating away from. + Old, + /// New represents the technology source we are migrating towards. + New, +} + +#[non_exhaustive] +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "lowercase")] +/// Operation represents a type of migration operation; namely, read or write. +pub enum Operation { + /// Read denotes a read-related migration operation. + Read, + /// Write denotes a write-related migration operation. + Write, +} + +#[non_exhaustive] +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "lowercase")] +/// Stage denotes one of six possible stages a technology migration could be a +/// part of, progressing through the following order. +/// +/// Off -> DualWrite -> Shadow -> Live -> RampDown -> Complete +pub enum Stage { + /// Off - migration hasn't started, "old" is authoritative for reads and writes + Off, + /// DualWrite - write to both "old" and "new", "old" is authoritative for reads + DualWrite, + /// Shadow - both "new" and "old" versions run with a preference for "old" + Shadow, + /// Live - both "new" and "old" versions run with a preference for "new" + Live, + /// RampDown - only read from "new", write to "old" and "new" + Rampdown, + /// Complete - migration is done + Complete, +} + +pub use tracker::MigrationOpTracker; + +mod tracker; diff --git a/launchdarkly-server-sdk/src/migrations/tracker.rs b/launchdarkly-server-sdk/src/migrations/tracker.rs new file mode 100644 index 0000000..7d7ceda --- /dev/null +++ b/launchdarkly-server-sdk/src/migrations/tracker.rs @@ -0,0 +1,380 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::Mutex, + time::Duration, +}; + +use launchdarkly_server_sdk_evaluation::{Context, Detail, Flag, FlagValue}; + +use crate::events::event::{BaseEvent, EventFactory, MigrationOpEvent}; + +use super::{Operation, Origin, Stage}; + +/// An MigrationOpTracker is responsible for managing the collection of measurements that which a user might wish to record +/// throughout a migration-assisted operation. +/// +/// Example measurements include latency, errors, and consistency. +pub struct MigrationOpTracker { + flag: Flag, + context: Context, + detail: Detail, + default_stage: Stage, + + mutex: Mutex<()>, + + operation: Option, + invoked: HashSet, + consistent: Option, + errors: HashSet, + latencies: HashMap, +} + +impl MigrationOpTracker { + // TODO: Temporary until further migration work has been completed + #[allow(dead_code)] + fn new(flag: Flag, context: Context, detail: Detail, default_stage: Stage) -> Self { + Self { + flag, + context, + detail, + default_stage, + mutex: Mutex::new(()), + operation: None, + invoked: HashSet::new(), + consistent: None, + errors: HashSet::new(), + latencies: HashMap::new(), + } + } + + /// Sets the migration related operation associated with these tracking measurements. + pub fn operation(&mut self, operation: Operation) { + match self.mutex.lock() { + Ok(_guard) => self.operation = Some(operation), + Err(e) => { + error!("failed to acquire lock for operation tracking: {}", e); + } + } + } + + /// Allows recording which origins were called during a migration. + pub fn invoked(&mut self, origin: Origin) { + match self.mutex.lock() { + Ok(_guard) => _ = self.invoked.insert(origin), + Err(e) => { + error!("failed to acquire lock for invocation tracking: {}", e); + } + } + } + + /// This method accepts a callable which should take no parameters and return a single boolean + /// to represent the consistency check results for a read operation. + /// + /// A callable is provided in case sampling rules do not require consistency checking to run. + /// In this case, we can avoid the overhead of a function by not using the callable. + pub fn consistent(&mut self, is_consistent: impl Fn() -> bool) { + // TODO: We need to add sampling here at some point. + match self.mutex.lock() { + Ok(_guard) => self.consistent = Some(is_consistent()), + Err(e) => { + error!("failed to acquire lock for consistency tracking: {}", e); + } + } + } + + /// Allows recording which origins were called during a migration. + pub fn error(&mut self, origin: Origin) { + match self.mutex.lock() { + Ok(_guard) => _ = self.errors.insert(origin), + Err(e) => { + error!("failed to acquire lock for invocation tracking: {}", e); + } + } + } + + /// Allows tracking the recorded latency for an individual operation. + pub fn latency(&mut self, origin: Origin, latency: Duration) { + if latency.is_zero() { + return; + } + + match self.mutex.lock() { + Ok(_guard) => _ = self.latencies.insert(origin, latency), + Err(e) => { + error!("failed to acquire lock for latency tracking: {}", e); + } + } + } + + /// Creates an instance of [crate::MigrationOpEvent]. This event data can be + /// provided to the [crate::Client::track_migration_op] method to rely this metric + /// information upstream to LaunchDarkly services. + pub fn build(&self) -> Result { + let _guard = self + .mutex + .lock() + .map_err(|e| format!("failed to acquire lock for building event: {:?}", e))?; + + let operation = self + .operation + .clone() + .ok_or_else(|| "operation not provided".to_string())?; + + self.check_invoked_consistency()?; + + let invoked = self.invoked.clone(); + if invoked.is_empty() { + return Err("no origins were invoked".to_string()); + } + + Ok(MigrationOpEvent { + base: BaseEvent::new(EventFactory::now(), self.context.clone()), + flag: self.flag.clone(), + operation, + default_stage: self.default_stage.clone(), + evaluation: self.detail.clone(), + invoked, + consistency_check: self.consistent, + errors: self.errors.clone(), + latency: self.latencies.clone(), + }) + } + + fn check_invoked_consistency(&self) -> Result<(), String> { + for origin in [Origin::Old, Origin::New].iter() { + if self.invoked.contains(origin) { + continue; + } + + if self.errors.contains(origin) { + return Err(format!( + "provided error for origin {:?} without recording invocation", + origin + )); + } + + if self.latencies.contains_key(origin) { + return Err(format!( + "provided latency for origin {:?} without recording invocation", + origin + )); + } + } + + if self.consistent.is_some() && self.invoked.len() != 2 { + return Err("provided consistency without recording both invocations".to_string()); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use launchdarkly_server_sdk_evaluation::{ContextBuilder, Detail, FlagValue, Reason}; + use test_case::test_case; + + use super::{MigrationOpTracker, Operation, Origin, Stage}; + use crate::test_common::basic_flag; + + fn minimal_tracker() -> MigrationOpTracker { + let mut tracker = MigrationOpTracker::new( + basic_flag("flag-key"), + ContextBuilder::new("user") + .build() + .expect("failed to build context"), + Detail { + value: Some(FlagValue::Bool(true)), + variation_index: Some(1), + reason: Reason::Fallthrough { + in_experiment: false, + }, + }, + Stage::Live, + ); + tracker.operation(Operation::Read); + tracker.invoked(Origin::Old); + tracker.invoked(Origin::New); + + return tracker; + } + + #[test] + fn build_minimal_tracker() { + let tracker = minimal_tracker(); + let result = tracker.build(); + + assert!(result.is_ok()); + } + + #[test_case(Origin::Old)] + #[test_case(Origin::New)] + fn track_invocations_individually(origin: Origin) { + let mut tracker = MigrationOpTracker::new( + basic_flag("flag-key"), + ContextBuilder::new("user") + .build() + .expect("failed to build context"), + Detail { + value: Some(FlagValue::Bool(true)), + variation_index: Some(1), + reason: Reason::Fallthrough { + in_experiment: false, + }, + }, + Stage::Live, + ); + tracker.operation(Operation::Read); + tracker.invoked(origin.clone()); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.invoked.len(), 1); + assert!(event.invoked.contains(&origin)); + } + + #[test] + fn tracks_both_invocations() { + let mut tracker = MigrationOpTracker::new( + basic_flag("flag-key"), + ContextBuilder::new("user") + .build() + .expect("failed to build context"), + Detail { + value: Some(FlagValue::Bool(true)), + variation_index: Some(1), + reason: Reason::Fallthrough { + in_experiment: false, + }, + }, + Stage::Live, + ); + tracker.operation(Operation::Read); + tracker.invoked(Origin::Old); + tracker.invoked(Origin::New); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.invoked.len(), 2); + assert!(event.invoked.contains(&Origin::Old)); + assert!(event.invoked.contains(&Origin::New)); + } + + #[test_case(false)] + #[test_case(true)] + fn tracks_consistency(expectation: bool) { + let mut tracker = minimal_tracker(); + tracker.operation(Operation::Read); + tracker.consistent(|| expectation); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.consistency_check, Some(expectation)); + } + + #[test_case(Origin::Old)] + #[test_case(Origin::New)] + fn track_errors_individually(origin: Origin) { + let mut tracker = minimal_tracker(); + tracker.error(origin.clone()); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.errors.len(), 1); + assert!(event.errors.contains(&origin)); + } + + #[test] + fn tracks_both_errors() { + let mut tracker = minimal_tracker(); + tracker.error(Origin::Old); + tracker.error(Origin::New); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.errors.len(), 2); + assert!(event.errors.contains(&Origin::Old)); + assert!(event.errors.contains(&Origin::New)); + } + + #[test_case(Origin::Old)] + #[test_case(Origin::New)] + fn track_latencies_individually(origin: Origin) { + let mut tracker = minimal_tracker(); + tracker.latency(origin.clone(), std::time::Duration::from_millis(100)); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.latency.len(), 1); + assert_eq!( + event.latency.get(&origin), + Some(&std::time::Duration::from_millis(100)) + ); + } + + #[test] + fn track_both_latencies() { + let mut tracker = minimal_tracker(); + tracker.latency(Origin::Old, std::time::Duration::from_millis(100)); + tracker.latency(Origin::New, std::time::Duration::from_millis(200)); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.latency.len(), 2); + assert_eq!( + event.latency.get(&Origin::Old), + Some(&std::time::Duration::from_millis(100)) + ); + assert_eq!( + event.latency.get(&Origin::New), + Some(&std::time::Duration::from_millis(200)) + ); + } + + #[test] + fn fails_without_calling_invocations() { + let mut tracker = MigrationOpTracker::new( + basic_flag("flag-key"), + ContextBuilder::new("user") + .build() + .expect("failed to build context"), + Detail { + value: Some(FlagValue::Bool(true)), + variation_index: Some(1), + reason: Reason::Fallthrough { + in_experiment: false, + }, + }, + Stage::Live, + ); + tracker.operation(Operation::Read); + + let failure = tracker + .build() + .err() + .expect("tracker should have failed to build event"); + + assert_eq!(failure, "no origins were invoked"); + } + + #[test] + fn fails_without_operation() { + let mut tracker = MigrationOpTracker::new( + basic_flag("flag-key"), + ContextBuilder::new("user") + .build() + .expect("failed to build context"), + Detail { + value: Some(FlagValue::Bool(true)), + variation_index: Some(1), + reason: Reason::Fallthrough { + in_experiment: false, + }, + }, + Stage::Live, + ); + tracker.invoked(Origin::Old); + tracker.invoked(Origin::New); + + let failure = tracker + .build() + .err() + .expect("tracker should have failed to build event"); + + assert_eq!(failure, "operation not provided"); + } +}