Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Nov 3, 2024
1 parent 50ed8d5 commit 77f5fa7
Show file tree
Hide file tree
Showing 21 changed files with 199 additions and 80 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/fluvio-controlplane-metadata/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use_serde = ["serde","semver/serde", "bytesize/serde", "humantime-serde", "serde
k8 = ["use_serde", "fluvio-stream-model/k8"]

[dependencies]
cfg-if = { workspace = true }
thiserror = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-controlplane-metadata/src/mirror/k8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mod test_v1_spec {
type K8RemoteSpec = K8Obj<MirrorSpec>;

#[test]
fn read_k8_mirror_json() {
fn read_k8_mirror_json_v1() {
let reader: BufReader<File> =
BufReader::new(File::open("tests/k8_mirror_v1.json").expect("spec"));
let cluster: K8RemoteSpec = serde_json::from_reader(reader).expect("failed to parse topic");
Expand Down
4 changes: 4 additions & 0 deletions crates/fluvio-controlplane-metadata/src/partition/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ pub struct HomePartitionConfig {
pub remote_cluster: String,
pub remote_replica: String,
// if this is set, home will be mirror instead of
#[cfg_attr(
feature = "use_serde",
serde(default, skip_serializing_if = "crate::is_false")
)]
pub source: bool,
}

Expand Down
28 changes: 27 additions & 1 deletion crates/fluvio-controlplane-metadata/src/topic/k8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,33 @@ mod test_spec {
}

#[test]
fn read_k8_topic_partition_mirror_json() {
fn read_k8_topic_partition_mirror_json_v1() {
let reader: BufReader<File> =
BufReader::new(File::open("tests/k8_topic_mirror_down_v1.json").expect("spec"));
let topic: K8TopicSpec = serde_json::from_reader(reader).expect("failed to parse topic");
assert_eq!(topic.metadata.name, "downstream-topic");
assert_eq!(
topic.spec.replicas().to_owned(),
ReplicaSpec::Mirror(MirrorConfig::Home(
vec![
HomePartitionConfig {
remote_cluster: "boat1".to_string(),
remote_replica: "boats-0".to_string(),
..Default::default()
},
HomePartitionConfig {
remote_cluster: "boat2".to_string(),
remote_replica: "boats-0".to_string(),
..Default::default()
}
]
.into()
))
);
}

#[test]
fn read_k8_topic_partition_mirror_json_v2() {
let reader: BufReader<File> =
BufReader::new(File::open("tests/k8_topic_mirror_down_v2.json").expect("spec"));
let topic: K8TopicSpec = serde_json::from_reader(reader).expect("failed to parse topic");
Expand Down
91 changes: 69 additions & 22 deletions crates/fluvio-controlplane-metadata/src/topic/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use fluvio_types::defaults::{
use fluvio_types::SpuId;
use fluvio_types::{PartitionId, PartitionCount, ReplicationFactor, IgnoreRackAssignment};
use fluvio_protocol::{Encoder, Decoder};
use serde::{Deserialize, Deserializer};

use crate::partition::{HomePartitionConfig, PartitionMirrorConfig, RemotePartitionConfig};

Expand Down Expand Up @@ -279,16 +278,16 @@ impl ReplicaSpec {
Self::Mirror(mirror) => match mirror {
MirrorConfig::Remote(remote_config) => {
if remote_config.target {
"mirror(target)"
"from-home"
} else {
"mirror"
"to-home"
}
}
MirrorConfig::Home(home_config) => {
if home_config.0.source {
"mirror(source)"
"from-remote"
} else {
"mirror"
"to-remote"
}
}
},
Expand Down Expand Up @@ -684,7 +683,7 @@ type Partitions = Vec<HomePartitionConfig>;

#[cfg_attr(
feature = "use_serde",
derive(serde::Deserialize),
derive(serde::Serialize),
serde(rename_all = "camelCase", untagged)
)]
enum MultiHome {
Expand All @@ -698,7 +697,9 @@ enum MultiHome {
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "camelCase")
)]
pub struct HomeMirrorConfig(#[serde(deserialize_with = "from_home_v1")] HomeMirrorInner);
pub struct HomeMirrorConfig(
#[cfg_attr(feature = "use_serde", serde(deserialize_with = "from_home_v1"))] HomeMirrorInner,
);

impl Deref for HomeMirrorConfig {
type Target = HomeMirrorInner;
Expand Down Expand Up @@ -732,22 +733,61 @@ impl HomeMirrorConfig {
}
}

fn from_home_v1<'de, D>(deserializer: D) -> Result<HomeMirrorInner, D::Error>
where
D: Deserializer<'de>,
{
/*
let home: MultiHome = Deserialize::deserialize(deserializer)?;
match home {
MultiHome::V1(v1) => Ok(HomeMirrorInner {
partitions: v1,
source: false,
}),
MultiHome::V2(v2) => Ok(v2),
cfg_if::cfg_if! {
if #[cfg(feature = "use_serde")] {
impl<'de> serde::Deserialize<'de> for MultiHome {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct MultiHomeVisitor;

impl<'de> serde::de::Visitor<'de> for MultiHomeVisitor {
type Value = MultiHome;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("an array or an object")
}

fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let mut elements = vec![];
while let Some(value) = seq.next_element::<HomePartitionConfig>()? {
elements.push(value);
}
Ok(MultiHome::V1(elements))
}

fn visit_map<M>(self, map: M) -> Result<Self::Value, M::Error>
where
M: serde::de::MapAccess<'de>,
{
use serde::de::value::MapAccessDeserializer;
let obj: HomeMirrorInner = serde::Deserialize::deserialize(MapAccessDeserializer::new(map))?;
Ok(MultiHome::V2(obj))
}
}

deserializer.deserialize_any(MultiHomeVisitor)
}
}

fn from_home_v1<'de, D>(deserializer: D) -> Result<HomeMirrorInner, D::Error>
where
D: serde::Deserializer<'de>,
{
let home: MultiHome = serde::Deserialize::deserialize(deserializer)?;
match home {
MultiHome::V1(v1) => Ok(HomeMirrorInner {
partitions: v1,
source: false,
}),
MultiHome::V2(v2) => Ok(v2),
}
}
}
*/
let home: HomeMirrorInner = Deserialize::deserialize(deserializer)?;
Ok(home)
}

#[derive(Default, Debug, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -1268,6 +1308,13 @@ mod test {
let spec2 = ReplicaSpec::new_assigned(p2);
assert_eq!(spec2.partition_map_str(), Some("".to_string()));
}

#[test]
fn test_deserialize_home_mirror_config() {
let data = r#"{"partitions":[{"remoteCluster":"boat1","remoteReplica":"boats-0","source":false},{"remoteCluster":"boat2","remoteReplica":"boats-0","source":false}]}"#;
let home_mirror: HomeMirrorConfig = serde_json::from_str(data).expect("deserialize");
assert_eq!(home_mirror.partitions().len(), 2);
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"apiVersion": "fluvio.infinyon.com/v2",
"kind": "Topic",
"metadata": {
"annotations": {
"kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"fluvio.infinyon.com/v2\",\"kind\":\"Topic\",\"metadata\":{\"annotations\":{},\"name\":\"test3\",\"namespace\":\"default\"},\"spec\":{\"compressionType\":\"Any\",\"replicas\":{\"mirror\":{\"home\":{\"remoteClusters\":[\"boat1\",\"boat2\"]}}}}}\n"
},
"creationTimestamp": "2023-08-17T17:43:09Z",
"generation": 1,
"name": "downstream-topic",
"namespace": "default",
"resourceVersion": "253907",
"uid": "1694c901-e94d-4f0b-a01f-1a09889b14fd"
},
"spec": {
"compressionType": "Any",
"replicas": {
"mirror": {
"home": [
{
"remoteCluster": "boat1",
"remoteReplica": "boats-0"
},
{
"remoteCluster": "boat2",
"remoteReplica": "boats-0"
}
]
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@
"compressionType": "Any",
"replicas": {
"mirror": {
"home": [
"home": {
"partitions": [
{
"remoteCluster": "boat1",
"remoteReplica": "boats-0"
},
{
"remoteCluster": "boat2",
"remoteReplica": "boats-0"
}
]
}],
"source": false
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-sc/src/services/auth/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ mod test {
default_role.insert(
ObjectType::Mirror,
vec![
ActionUrn::new(Action::Read, Some("edge1".to_string())),
ActionUrn::new(Action::Read, Some("edge2".to_string())),
ActionUrn::new(Action::Read, Some("remote1".to_string())),
ActionUrn::new(Action::Read, Some("remote2".to_string())),
],
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ impl<AC: AuthContext, C: MetadataItem> RemoteFetchingFromHomeController<AC, C> {
.into_iter()
.filter_map(|topic| match topic.spec.replicas() {
ReplicaSpec::Mirror(MirrorConfig::Home(h)) => {
println!("mirror config home: {:?}", h);
let partition_id = h
.partitions()
.iter()
Expand Down
8 changes: 4 additions & 4 deletions crates/fluvio-spu/src/mirroring/home/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::core::DefaultSharedGlobalContext;
use crate::mirroring::remote::api_key::MirrorRemoteApiEnum;
use crate::mirroring::remote::remote_api::RemoteMirrorRequest;
use crate::mirroring::remote::sync::{DefaultRemotePartitionSyncRequest, MirrorPartitionSyncRequest};
use crate::mirroring::remote::update_offsets::UpdateEdgeOffsetRequest;
use crate::mirroring::remote::update_offsets::UpdateRemoteOffsetRequest;
use crate::replication::leader::SharedFileLeaderState;
use crate::services::auth::SpuAuthServiceContext;

Expand Down Expand Up @@ -220,7 +220,7 @@ impl MirrorHomeHandler {
RemoteMirrorRequest::SyncRecords(sync_request)=> {
self.sync_record_from_remote(&mut sink,sync_request.request).await?;
}
RemoteMirrorRequest::UpdateEdgeOffset(_req) => {
RemoteMirrorRequest::UpdateRemoteOffset(_req) => {
return Err(anyhow!("received offset request from remote, this should not happen, since we are target"));
}
}
Expand Down Expand Up @@ -319,7 +319,7 @@ impl MirrorHomeHandler {
RemoteMirrorRequest::SyncRecords(_sync_request)=> {
return Err(anyhow!("received sync request from remote, this should not happen, since we are source"));
}
RemoteMirrorRequest::UpdateEdgeOffset(req) => {
RemoteMirrorRequest::UpdateRemoteOffset(req) => {
remote_updated_needed = self.update_from_remote(req)?;
}
}
Expand Down Expand Up @@ -424,7 +424,7 @@ impl MirrorHomeHandler {

// received new offset from edge, this happens when edge is behind
#[instrument(skip(req))]
fn update_from_remote(&self, req: RequestMessage<UpdateEdgeOffsetRequest>) -> Result<bool> {
fn update_from_remote(&self, req: RequestMessage<UpdateRemoteOffsetRequest>) -> Result<bool> {
let leader_leo = self.leader.leo();
let old_remote_leo = self.metrics.get_remote_leo();
let new_remote_leo = req.request.offset().leo;
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-spu/src/mirroring/home/update_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::replication::leader::ReplicaOffsetRequest;

use super::api_key::MirrorHomeApiEnum;

/// Update home's offset to edge
/// Update home's offset to remote
pub(crate) type UpdateHomeOffsetRequest = ReplicaOffsetRequest;

impl Request for UpdateHomeOffsetRequest {
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-spu/src/mirroring/remote/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use fluvio_types::event::offsets::OffsetChangeListener;
use crate::{
control_plane::SharedMirrorStatusUpdate,
core::{mirror::SharedMirrorLocalStore, GlobalContext},
mirroring::remote::update_offsets::UpdateEdgeOffsetRequest,
mirroring::remote::update_offsets::UpdateRemoteOffsetRequest,
replication::leader::{FollowerNotifier, ReplicaOffsetRequest, SharedLeaderState},
};
use crate::mirroring::home::{
Expand Down Expand Up @@ -631,7 +631,7 @@ where
};

debug!(?offset_request, "sending offset to home");
let req_msg: RequestMessage<UpdateEdgeOffsetRequest> =
let req_msg: RequestMessage<UpdateRemoteOffsetRequest> =
RequestMessage::new_request(offset_request.into()).set_client_id("mirror home");

sink.send_request(&req_msg).await?;
Expand Down
13 changes: 8 additions & 5 deletions crates/fluvio-spu/src/mirroring/remote/remote_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use fluvio_protocol::api::{RequestMessage, ApiMessage, RequestHeader};

use super::api_key::MirrorRemoteApiEnum;
use super::sync::DefaultRemotePartitionSyncRequest;
use super::update_offsets::UpdateEdgeOffsetRequest;
use super::update_offsets::UpdateRemoteOffsetRequest;

/// Requests from remote to home
#[derive(Debug)]
pub enum RemoteMirrorRequest {
SyncRecords(RequestMessage<DefaultRemotePartitionSyncRequest>),
UpdateEdgeOffset(RequestMessage<UpdateEdgeOffsetRequest>),
UpdateRemoteOffset(RequestMessage<UpdateRemoteOffsetRequest>),
}

impl Default for RemoteMirrorRequest {
Expand All @@ -36,9 +36,12 @@ impl ApiMessage for RemoteMirrorRequest {
trace!("decoding with header: {:#?}", header);
let version = header.api_version();
match header.api_key().try_into()? {
MirrorRemoteApiEnum::UpdateEdgeOffset => Ok(Self::UpdateEdgeOffset(
RequestMessage::new(header, UpdateEdgeOffsetRequest::decode_from(src, version)?),
)),
MirrorRemoteApiEnum::UpdateEdgeOffset => {
Ok(Self::UpdateRemoteOffset(RequestMessage::new(
header,
UpdateRemoteOffsetRequest::decode_from(src, version)?,
)))
}
MirrorRemoteApiEnum::SyncRecords => Ok(Self::SyncRecords(RequestMessage::new(
header,
DefaultRemotePartitionSyncRequest::decode_from(src, version)?,
Expand Down
Loading

0 comments on commit 77f5fa7

Please sign in to comment.