Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support custom types in broker #5761

Merged
merged 4 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions storage_broker/benches/rps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use clap::Parser;
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};

use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SubscribeByFilterRequest,
TenantTimelineId as ProtoTenantTimelineId, TypeSubscription, TypedMessage,
};

use storage_broker::{BrokerClientChannel, DEFAULT_ENDPOINT};
use tokio::time;
Expand Down Expand Up @@ -91,15 +94,23 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
};

let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
let ttid = ProtoTenantTimelineId {
tenant_id: vec![0xFF; 16],
timeline_id: tli_from_u64(i),
});
let request = SubscribeSafekeeperInfoRequest {
subscription_key: Some(key),
};
let mut stream = client
.subscribe_safekeeper_info(request)

let request = SubscribeByFilterRequest {
types: vec![TypeSubscription {
r#type: MessageType::SafekeeperTimelineInfo.into(),
arssher marked this conversation as resolved.
Show resolved Hide resolved
}],
tenant_timeline_id: Some(FilterTenantTimelineId {
enabled: true,
tenant_timeline_id: Some(ttid),
}),
};

let mut stream: tonic::Streaming<TypedMessage> = client
.subscribe_by_filter(request)
.await
.unwrap()
.into_inner();
Expand Down
58 changes: 58 additions & 0 deletions storage_broker/proto/broker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ service BrokerService {

// Publish safekeeper updates.
rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (google.protobuf.Empty) {};

// Subscribe to all messages, limited by a filter.
rpc SubscribeByFilter(SubscribeByFilterRequest) returns (stream TypedMessage) {};

// Publish one message.
rpc PublishOne(TypedMessage) returns (google.protobuf.Empty) {};
}

message SubscribeSafekeeperInfoRequest {
Expand Down Expand Up @@ -48,3 +54,55 @@ message TenantTimelineId {
bytes tenant_id = 1;
bytes timeline_id = 2;
}

message FilterTenantTimelineId {
// If true, only messages related to `tenant_timeline_id` will be emitted.
// Otherwise, messages for all timelines will be emitted.
bool enabled = 1;
TenantTimelineId tenant_timeline_id = 2;
}

message TypeSubscription {
MessageType type = 1;
}

message SubscribeByFilterRequest {
// Subscription will emit messages only of the specified types. You need to specify
// at least one type to receive any messages.
repeated TypeSubscription types = 1;

// If set and enabled, subscription will emit messages only for the specified tenant/timeline.
optional FilterTenantTimelineId tenant_timeline_id = 2;
}

enum MessageType {
UNKNOWN = 0;
SAFEKEEPER_TIMELINE_INFO = 2;
petuhovskiy marked this conversation as resolved.
Show resolved Hide resolved
SAFEKEEPER_DISCOVERY_REQUEST = 3;
SAFEKEEPER_DISCOVERY_RESPONSE = 4;
}

// A message with a type.
message TypedMessage {
MessageType type = 1;

optional SafekeeperTimelineInfo safekeeper_timeline_info = 2;
optional SafekeeperDiscoveryRequest safekeeper_discovery_request = 3;
optional SafekeeperDiscoveryResponse safekeeper_discovery_response = 4;
petuhovskiy marked this conversation as resolved.
Show resolved Hide resolved
}

message SafekeeperDiscoveryRequest {
TenantTimelineId tenant_timeline_id = 1;
}

// Shorter version of SafekeeperTimelineInfo, contains only necessary fields.
message SafekeeperDiscoveryResponse {
uint64 safekeeper_id = 1;
TenantTimelineId tenant_timeline_id = 2;
// WAL available to download.
uint64 commit_lsn = 3;
// A connection string to use for WAL downloading.
string safekeeper_connstr = 4;
// Availability zone of a safekeeper.
optional string availability_zone = 5;
}
Loading