Skip to content

Commit

Permalink
Merge pull request #170 from EventStore/update-deps
Browse files Browse the repository at this point in the history
Update to Tonic 0.10.
  • Loading branch information
YoEight authored Oct 16, 2023
2 parents 8130769 + 81f3b20 commit 51f991d
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 63 deletions.
16 changes: 8 additions & 8 deletions eventstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ hyper = { version = "0.14", features = ["client", "tcp"] }
hyper-rustls = { version = "0.23", features = ["rustls-native-certs", "http2"] }
log = "0.4"
nom = "7"
prost = "0.11"
prost-derive = "0.11"
prost-types = "0.11"
prost = "0.12"
prost-derive = "0.12"
prost-types = "0.12"
rand = { version = "0.8", features = ["small_rng"] }
reqwest = { version = "0.11", default-features = false, features = [
"rustls-tls",
Expand All @@ -45,17 +45,17 @@ rustls-native-certs = "0.6"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1"
tokio = { version = "1.20", default-features = false, features = ["time"] }
tokio = { version = "1", default-features = false, features = ["time"] }
tokio-rustls = "0.23"
tonic = { version = "0.9", features = ["tls", "tls-roots"] }
tonic = { version = "0.10", features = ["tls", "tls-roots"] }
tower = "0.4"
url = "2"
urlencoding = "2"
uuid = { version = "1.1", features = ["v4", "serde"] }
uuid = { version = "1", features = ["v4", "serde"] }
webpki = "0.22"

[build-dependencies]
tonic-build = { version = "0.9", features = ["prost-build"] }
tonic-build = { version = "0.10.2", features = ["prost-build"] }

[[test]]
name = "integration"
Expand All @@ -65,7 +65,7 @@ names = "0.14"
pretty_env_logger = "0.5"
serde = { version = "1", features = ["derive"] }
testcontainers = "0.14"
tokio = { version = "1.20", default-features = false, features = [
tokio = { version = "1", default-features = false, features = [
"rt-multi-thread",
"macros",
] }
Expand Down
9 changes: 9 additions & 0 deletions eventstore/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ pub fn generate() -> Result<(), Box<dyn std::error::Error>> {

tonic_build::configure()
.build_server(false)
.bytes(&[
"AppendReq.ProposedMessage.custom_metadata",
"AppendReq.ProposedMessage.data",
"BatchAppendReq.ProposedMessage.custom_metadata",
"BatchAppendReq.ProposedMessage.data",
"ReadEvent.RecordedEvent.custom_metadata",
"ReadEvent.RecordedEvent.data",
"StreamIdentifier.stream_name",
])
.out_dir(out_dir)
.compile(&files, &["protos"])?;

Expand Down
69 changes: 35 additions & 34 deletions eventstore/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::types::{

use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use nom::AsBytes;
use persistent::persistent_subscriptions_client::PersistentSubscriptionsClient;
use prost_types::Timestamp;
use shared::{Empty, StreamIdentifier, Uuid};
Expand Down Expand Up @@ -92,15 +93,13 @@ fn convert_event_data(event: EventData) -> streams::AppendReq {
let id = event.id_opt.unwrap_or_else(uuid::Uuid::new_v4);
let id = shared::uuid::Value::String(id.to_string());
let id = Uuid { value: Some(id) };
let custom_metadata = event
.custom_metadata
.map_or_else(Vec::new, |b| (&*b).into());
let custom_metadata = event.custom_metadata.unwrap_or_default();

let msg = append_req::ProposedMessage {
id: Some(id),
metadata: event.metadata,
custom_metadata,
data: (&*event.payload).into(),
data: event.payload,
};

let content = append_req::Content::ProposedMessage(msg);
Expand Down Expand Up @@ -146,13 +145,15 @@ fn convert_proto_recorded_event(
false
};

let stream_id = String::from_utf8(
let stream_id = String::from_utf8_lossy(
event
.stream_identifier
.expect("stream_identifier is always defined")
.stream_name,
.stream_name
.as_bytes(),
)
.expect("It's always UTF-8");
.to_string();

RecordedEvent {
id,
stream_id,
Expand All @@ -162,8 +163,8 @@ fn convert_proto_recorded_event(
is_json,
created,
metadata: event.metadata,
custom_metadata: event.custom_metadata.into(),
data: event.data.into(),
custom_metadata: event.custom_metadata,
data: event.data,
}
}

Expand Down Expand Up @@ -203,13 +204,14 @@ fn convert_persistent_proto_recorded_event(
false
};

let stream_id = String::from_utf8(
let stream_id = String::from_utf8_lossy(
event
.stream_identifier
.expect("stream_identifier is always defined")
.stream_name,
.stream_name
.as_bytes(),
)
.expect("string is UTF-8 valid");
.to_string();

RecordedEvent {
id,
Expand All @@ -220,8 +222,8 @@ fn convert_persistent_proto_recorded_event(
is_json,
created,
metadata: event.metadata,
custom_metadata: event.custom_metadata.into(),
data: event.data.into(),
custom_metadata: event.custom_metadata,
data: event.data,
}
}

Expand All @@ -233,15 +235,13 @@ fn convert_event_data_to_batch_proposed_message(
let id = event.id_opt.unwrap_or_else(uuid::Uuid::new_v4);
let id = shared::uuid::Value::String(id.to_string());
let id = Uuid { value: Some(id) };
let custom_metadata = event
.custom_metadata
.map_or_else(Vec::new, |b| (&*b).into());
let custom_metadata = event.custom_metadata.unwrap_or_default();

batch_append_req::ProposedMessage {
id: Some(id),
metadata: event.metadata,
custom_metadata,
data: (&*event.payload).into(),
data: event.payload,
}
}

Expand Down Expand Up @@ -514,7 +514,7 @@ pub async fn append_to_stream(

let stream = stream.as_ref().to_string();
let stream_identifier = Some(StreamIdentifier {
stream_name: stream.into_bytes(),
stream_name: stream.into_bytes().into(),
});
let header = Content::Options(append_req::Options {
stream_identifier,
Expand Down Expand Up @@ -622,7 +622,7 @@ pub async fn batch_append(
value: Some(correlation_id),
});
let stream_identifier = Some(StreamIdentifier {
stream_name: req.stream_name.into_bytes(),
stream_name: req.stream_name.into_bytes().into(),
});

let expected_stream_position = match req.expected_revision {
Expand Down Expand Up @@ -678,9 +678,10 @@ pub async fn batch_append(
let resp_stream = resp.into_inner();

let mut resp_stream = resp_stream.map_ok(|resp| {
let stream_name =
String::from_utf8(resp.stream_identifier.unwrap().stream_name)
.expect("valid UTF-8 string");
let stream_name = String::from_utf8_lossy(
resp.stream_identifier.unwrap().stream_name.as_bytes(),
)
.to_string();

let correlation_id = raw_uuid_to_uuid(resp.correlation_id.unwrap());
let result = match resp.result.unwrap() {
Expand Down Expand Up @@ -867,7 +868,7 @@ pub async fn read_stream<S: AsRef<str>>(
};

let stream_identifier = Some(StreamIdentifier {
stream_name: stream.as_ref().to_string().into_bytes(),
stream_name: stream.as_ref().to_string().into_bytes().into(),
});
let stream_options = StreamOptions {
stream_identifier,
Expand Down Expand Up @@ -1003,7 +1004,7 @@ pub async fn delete_stream<S: AsRef<str>>(

let expected_stream_revision = Some(expected_stream_revision);
let stream_identifier = Some(StreamIdentifier {
stream_name: stream.as_ref().to_string().into_bytes(),
stream_name: stream.as_ref().to_string().into_bytes().into(),
});
let req_options = Options {
stream_identifier,
Expand Down Expand Up @@ -1062,7 +1063,7 @@ pub async fn tombstone_stream<S: AsRef<str>>(

let expected_stream_revision = Some(expected_stream_revision);
let stream_identifier = Some(StreamIdentifier {
stream_name: stream.as_ref().to_string().into_bytes(),
stream_name: stream.as_ref().to_string().into_bytes().into(),
});
let req_options = Options {
stream_identifier,
Expand Down Expand Up @@ -1315,7 +1316,7 @@ pub fn subscribe_to_stream<S: AsRef<str>>(
};

let stream_identifier = Some(StreamIdentifier {
stream_name: stream_id.as_ref().to_string().into_bytes(),
stream_name: stream_id.as_ref().to_string().into_bytes().into(),
});
let stream_options = StreamOptions {
stream_identifier,
Expand Down Expand Up @@ -1532,7 +1533,7 @@ where
let handle = connection.current_selected_node().await?;
let settings = convert_settings_create(options.settings())?;
let stream_identifier = StreamIdentifier {
stream_name: stream.as_ref().to_string().into_bytes(),
stream_name: stream.as_ref().to_string().into_bytes().into(),
};

let req_options = options.to_create_options(stream_identifier.clone());
Expand Down Expand Up @@ -1586,7 +1587,7 @@ where
let handle = connection.current_selected_node().await?;
let settings = convert_settings_update(options.settings())?;
let stream_identifier = StreamIdentifier {
stream_name: stream.as_ref().to_string().into_bytes(),
stream_name: stream.as_ref().to_string().into_bytes().into(),
};

let req_options = options.to_update_options(stream_identifier.clone());
Expand Down Expand Up @@ -1641,7 +1642,7 @@ pub async fn delete_persistent_subscription<S: AsRef<str>>(

let stream_option = if !to_all {
StreamOption::StreamIdentifier(StreamIdentifier {
stream_name: stream_id.as_ref().to_string().into_bytes(),
stream_name: stream_id.as_ref().to_string().into_bytes().into(),
})
} else {
StreamOption::All(Empty {})
Expand Down Expand Up @@ -1704,7 +1705,7 @@ pub async fn subscribe_to_persistent_subscription<S: AsRef<str>>(

let stream_option = if !to_all {
StreamOption::StreamIdentifier(StreamIdentifier {
stream_name: stream_id.as_ref().to_string().into_bytes(),
stream_name: stream_id.as_ref().to_string().into_bytes().into(),
})
} else {
StreamOption::All(Empty {})
Expand Down Expand Up @@ -2026,7 +2027,7 @@ where
list_req::stream_option::StreamOption::All(Empty {})
} else {
list_req::stream_option::StreamOption::Stream(StreamIdentifier {
stream_name: stream_name.name().to_string().into_bytes(),
stream_name: stream_name.name().to_string().into_bytes().into(),
})
};

Expand Down Expand Up @@ -2274,7 +2275,7 @@ where
replay_parked_req::options::StreamOption::All(Empty {})
} else {
replay_parked_req::options::StreamOption::StreamIdentifier(StreamIdentifier {
stream_name: stream_name.name().to_string().into_bytes(),
stream_name: stream_name.name().to_string().into_bytes().into(),
})
};

Expand Down Expand Up @@ -2342,7 +2343,7 @@ where
get_info_req::options::StreamOption::All(Empty {})
} else {
get_info_req::options::StreamOption::StreamIdentifier(StreamIdentifier {
stream_name: stream_name.name().to_string().into_bytes(),
stream_name: stream_name.name().to_string().into_bytes().into(),
})
};
let options = get_info_req::Options {
Expand Down
4 changes: 2 additions & 2 deletions eventstore/src/event_store/generated/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ pub struct Empty {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StreamIdentifier {
#[prost(bytes = "vec", tag = "3")]
pub stream_name: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "bytes", tag = "3")]
pub stream_name: ::prost::bytes::Bytes,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
6 changes: 3 additions & 3 deletions eventstore/src/event_store/generated/google_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,20 +198,20 @@ impl Code {
}
/// The `Status` type defines a logical error model that is suitable for
/// different programming environments, including REST APIs and RPC APIs. It is
/// used by \[gRPC\](<https://github.com/grpc>). Each `Status` message contains
/// used by [gRPC](<https://github.com/grpc>). Each `Status` message contains
/// three pieces of data: error code, error message, and error details.
///
/// You can find out more about this error model and how to work with it in the
/// [API Design Guide](<https://cloud.google.com/apis/design/errors>).
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Status {
/// The status code, which should be an enum value of \[google.rpc.Code][google.rpc.Code\].
/// The status code, which should be an enum value of [google.rpc.Code][google.rpc.Code].
#[prost(enumeration = "Code", tag = "1")]
pub code: i32,
/// A developer-facing error message, which should be in English. Any
/// user-facing error message should be localized and sent in the
/// \[google.rpc.Status.details][google.rpc.Status.details\] field, or localized by the client.
/// [google.rpc.Status.details][google.rpc.Status.details] field, or localized by the client.
#[prost(string, tag = "2")]
pub message: ::prost::alloc::string::String,
/// A list of messages that carry the error details. There is a common set of
Expand Down
8 changes: 4 additions & 4 deletions eventstore/src/event_store/generated/persistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ pub mod read_resp {
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
#[prost(bytes = "vec", tag = "7")]
pub custom_metadata: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "vec", tag = "8")]
pub data: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "bytes", tag = "7")]
pub custom_metadata: ::prost::bytes::Bytes,
#[prost(bytes = "bytes", tag = "8")]
pub data: ::prost::bytes::Bytes,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
Expand Down
24 changes: 12 additions & 12 deletions eventstore/src/event_store/generated/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ pub mod read_resp {
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
#[prost(bytes = "vec", tag = "7")]
pub custom_metadata: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "vec", tag = "8")]
pub data: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "bytes", tag = "7")]
pub custom_metadata: ::prost::bytes::Bytes,
#[prost(bytes = "bytes", tag = "8")]
pub data: ::prost::bytes::Bytes,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
Expand Down Expand Up @@ -323,10 +323,10 @@ pub mod append_req {
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
#[prost(bytes = "vec", tag = "3")]
pub custom_metadata: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "vec", tag = "4")]
pub data: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "bytes", tag = "3")]
pub custom_metadata: ::prost::bytes::Bytes,
#[prost(bytes = "bytes", tag = "4")]
pub data: ::prost::bytes::Bytes,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
Expand Down Expand Up @@ -504,10 +504,10 @@ pub mod batch_append_req {
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
#[prost(bytes = "vec", tag = "3")]
pub custom_metadata: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "vec", tag = "4")]
pub data: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "bytes", tag = "3")]
pub custom_metadata: ::prost::bytes::Bytes,
#[prost(bytes = "bytes", tag = "4")]
pub data: ::prost::bytes::Bytes,
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down

0 comments on commit 51f991d

Please sign in to comment.