Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add: optional v4 token access #4212

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl SharedConfig {
}
}

/// Corresponds to https://docs.rs/fluvio/latest/fluvio/struct.TopicProducerConfigBuilder.html
/// Corresponds to <https://docs.rs/fluvio/latest/fluvio/struct.TopicProducerConfigBuilder.html>
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct FluvioProducerConfig {
pub batch_size: Vec<u64>,
Expand Down Expand Up @@ -54,7 +54,7 @@ impl Default for FluvioProducerConfig {
}
}

/// Corresponds to https://docs.rs/fluvio/latest/fluvio/consumer/struct.ConsumerConfigBuilder.html
/// Corresponds to <https://docs.rs/fluvio/latest/fluvio/consumer/struct.ConsumerConfigBuilder.html>
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct FluvioConsumerConfig {
pub max_bytes: Vec<u64>,
Expand All @@ -69,7 +69,7 @@ impl Default for FluvioConsumerConfig {
}
}

/// Corresponds to https://docs.rs/fluvio/latest/fluvio/metadata/topic/struct.TopicSpec.html
/// Corresponds to <https://docs.rs/fluvio/latest/fluvio/metadata/topic/struct.TopicSpec.html>
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct FluvioTopicConfig {
pub num_partitions: Vec<u64>,
Expand Down
116 changes: 111 additions & 5 deletions crates/fluvio-hub-protocol/src/infinyon_tok.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
// minimal login token read module that just exposes a
// 'read_infinyon_token' function to read from the current login config
//
use std::collections::HashMap;
use std::env;
use std::fs;
use std::path::Path;

use serde::{Deserialize, Serialize};
use serde_json;
use tracing::debug;

use fluvio_types::defaults::CLI_CONFIG_PATH;
Expand All @@ -20,20 +22,124 @@ type InfinyonRemote = String;

#[derive(thiserror::Error, Debug)]
pub enum InfinyonCredentialError {
#[error("no org access token found, please login or switch to an org with 'fluvio cloud org switch'")]
MissingOrgToken,

#[error("{0}")]
Read(String),

#[error("unable to parse credentials")]
UnableToParseCredentials,
}

pub enum AccessToken {
V3(InfinyonToken),
V4(CliAccessTokens),
}

impl AccessToken {
pub fn get_hub_token(&self) -> Result<String, InfinyonCredentialError> {
match self {
AccessToken::V3(tok) => Ok(tok.to_owned()),
AccessToken::V4(cli_access_tokens) => cli_access_tokens.get_current_org_token(),
}
}

pub fn is_v4(&self) -> bool {
matches!(self, AccessToken::V4(_))
}
}

// multi-org access token output
#[derive(Debug, Serialize, Deserialize)]
pub struct CliAccessTokens {
pub remote: String,
pub user_access_token: String,
pub org_access_tokens: HashMap<String, String>,
}

impl CliAccessTokens {
pub fn get_current_org_name(&self) -> Result<String, InfinyonCredentialError> {
let key = self
.org_access_tokens
.keys()
.next()
.ok_or(InfinyonCredentialError::MissingOrgToken)?;
Ok(key.to_owned())
}
pub fn get_current_org_token(&self) -> Result<String, InfinyonCredentialError> {
let org = self.get_current_org_name()?;
let tok = self
.org_access_tokens
.get(&org)
.ok_or(InfinyonCredentialError::MissingOrgToken)?
.to_owned();
Ok(tok)
}
}

/// replaces old read_infinyon_token
pub fn read_access_token() -> Result<AccessToken, InfinyonCredentialError> {
match read_infinyon_token_v4() {
Ok(cli_access_tokens) => {
println!(
"Using org access: {}",
cli_access_tokens.get_current_org_name()?
);
return Ok(AccessToken::V4(cli_access_tokens));
}
Err(_e) => {
// fallback to old token logic
}
};
let tok = read_infinyon_token_v3()?;
Ok(AccessToken::V3(tok))
}

pub fn read_infinyon_token() -> Result<InfinyonToken, InfinyonCredentialError> {
// the ENV variable should point directly to the applicable profile
if let Ok(profilepath) = env::var(INFINYON_CONFIG_PATH_ENV) {
let cred = Credentials::load(Path::new(&profilepath))?;
debug!("{INFINYON_CONFIG_PATH_ENV} {profilepath} loaded");
return Ok(cred.token);
match read_infinyon_token_v4() {
Ok(cli_access_tokens) => {
tracing::debug!(
"using v4 token for org {}",
cli_access_tokens.get_current_org_name()?
);
return cli_access_tokens.get_current_org_token();
}
Err(_e) => {
// fallback to old token logic
}
};
read_infinyon_token_v3()
}

pub fn read_infinyon_token_v4() -> Result<CliAccessTokens, InfinyonCredentialError> {
const LOGIN_BIN: &str = "fluvio-cloud-v4";

let mut cmd = std::process::Command::new(LOGIN_BIN);
cmd.arg("cli-access-tokens");
cmd.env_remove("RUST_LOG"); // remove RUST_LOG to avoid debug output
match cmd.output() {
Ok(output) => {
let output = String::from_utf8_lossy(&output.stdout);
let cli_access_tokens: CliAccessTokens =
serde_json::from_slice(output.as_bytes()).map_err(|e| {
tracing::debug!("failed to parse v4 output: {}\n$ fluvio-cloud-v4 cli-access-tokens\n-->>{}<<--", e, output);
InfinyonCredentialError::UnableToParseCredentials
})?;
tracing::trace!("cli access tokens: {:#?}", cli_access_tokens);
Ok(cli_access_tokens)
}
Err(e) => {
tracing::debug!("failed to execute v4: {}", e);
Err(InfinyonCredentialError::Read(
"failed to execute v4".to_owned(),
))
}
}
}

// depcreated, will be removed after multi-org is stable
pub fn read_infinyon_token_v3() -> Result<InfinyonToken, InfinyonCredentialError> {
let cfgpath = default_file_path();
// this will read the indirection file to resolve the profile
let cred = Credentials::try_load(cfgpath)?;
Expand Down
32 changes: 25 additions & 7 deletions crates/fluvio-hub-util/src/hubaccess.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ use fluvio_future::task::run_block_on;
use fluvio_hub_protocol::infinyon_tok::read_infinyon_token_rem;

use fluvio_hub_protocol::{Result, HubError};
use fluvio_hub_protocol::infinyon_tok::read_infinyon_token;
use fluvio_hub_protocol::infinyon_tok::read_access_token;
use fluvio_hub_protocol::constants::{HUB_API_ACT, HUB_API_HUBID, HUB_REMOTE, CLI_CONFIG_HUB};
use fluvio_types::defaults::CLI_CONFIG_PATH;
use fluvio_hub_protocol::infinyon_tok::AccessToken;

#[cfg(not(target_arch = "wasm32"))]
use crate::htclient;
Expand Down Expand Up @@ -149,15 +150,16 @@ impl HubAccess {
action: &str,
authn_token: &str,
) -> Result<String> {
self.make_action_token(action, authn_token.into()).await
let access_token = AccessToken::V3(authn_token.to_string());
self.make_action_token(action, Some(access_token)).await
}

async fn get_action_auth(&self, action: &str) -> Result<String> {
let cloud_token = read_infinyon_token().unwrap_or_default();
self.make_action_token(action, cloud_token).await
let access_token = read_access_token().ok();
self.make_action_token(action, access_token).await
}

async fn make_action_token(&self, action: &str, authn_token: String) -> Result<String> {
async fn make_action_token(&self, action: &str, token: Option<AccessToken>) -> Result<String> {
let host = &self.remote;
let api_url = format!("{host}/{HUB_API_ACT}");
let mat = MsgActionToken {
Expand All @@ -167,8 +169,24 @@ impl HubAccess {
.map_err(|_e| HubError::HubAccess("Failed access setup".to_string()))?;

let mut builder = http::Request::post(&api_url);
if !authn_token.is_empty() {
builder = builder.header("Authorization", &authn_token);
match token {
Some(AccessToken::V4(cli_access_tokens)) => {
let org = cli_access_tokens.get_current_org_name()?;
let tok = cli_access_tokens
.org_access_tokens
.get(&org)
.ok_or(HubError::HubAccess("Missing org token".to_string()))?;
let authn_token = format!("bearer {tok}");
builder = builder.header("Authorization", &authn_token);
}
Some(AccessToken::V3(tok)) => {
// v3 does not use "Bearer" prefix
builder = builder.header("Authorization", &tok);
}
None => {
// no token is allowed for some actions like downloading public
// packages
}
}
let req = builder
.header(http::header::CONTENT_TYPE, mime::JSON.as_str())
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-test-util/test_meta/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub struct EnvironmentSetup {
#[arg(long, default_value = "1")]
pub topic: u16,

/// Append random as "-<random>" to topic name (before id, if --num-topics > 1)
/// Append random as "-\<random\>" to topic name (before id, if --num-topics > 1)
#[arg(long)]
pub topic_random: bool,

Expand Down
9 changes: 9 additions & 0 deletions crates/fluvio/src/producer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//! The Fluvio Producer module allows applications to send messages to topics in the Fluvio cluster.
//!
//! # Overview
//!
//! This module provides the necessary structures and functions to produce messages to a Fluvio topic.
//! It includes the `TopicProducerPool` struct, which manages the production of messages
//! to specific topics and partitions, respectively.
//!
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -47,6 +55,7 @@ pub use self::output::ProduceOutput;
use self::partition_producer::PartitionProducer;
pub use self::record::{FutureRecordMetadata, RecordMetadata};

/// Pool of producers for a given topic. There is a producer per partition
pub type TopicProducerPool = TopicProducer<SpuSocketPool>;

/// Pool of producers for a given topic. There is a producer per partition
Expand Down