Skip to content

Commit

Permalink
pink-runtime: HTTP egress via ocall
Browse files Browse the repository at this point in the history
  • Loading branch information
kvinwang committed Mar 8, 2023
1 parent 14b3d9b commit 69c812c
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 155 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/phactory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ phala-serde-more = { path = "../phala-serde-more" }
phala-crypto = { path = "../phala-crypto", features = ["getrandom", "stream"] }
prpc = { path = "../prpc" }
pink = { path = "../pink/runner", package = "pink-runner" }
pink-extension-runtime = { path = "../pink/pink-extension-runtime" }

sp-io = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.37", features = ["disable_panic_handler", "disable_oom", "disable_allocator"] }
sp-runtime-interface = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.37", features = ["disable_target_static_assertions"] }
Expand Down
73 changes: 59 additions & 14 deletions crates/phactory/src/contracts/pink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use phala_types::contract::messaging::ResourceType;
use pink::{
capi::v1::{
ecall::{ECalls, ECallsRo},
ocall::{ExecContext, OCalls, StorageChanges},
ocall::{ExecContext, HttpRequest, HttpRequestError, HttpResponse, OCalls, StorageChanges},
},
local_cache::{self, StorageQuotaExceeded},
runtimes::v1::using_ocalls,
Expand Down Expand Up @@ -125,6 +125,8 @@ impl RuntimeHandle<'_> {
}

pub(crate) mod context {
use std::time::{Duration, Instant};

use pink::{
capi::v1::ocall::ExecContext,
types::{BlockNumber, ExecutionMode},
Expand All @@ -138,6 +140,7 @@ pub(crate) mod context {
fn chain_storage(&self) -> &ChainStorage;
fn exec_context(&self) -> ExecContext;
fn worker_pubkey(&self) -> [u8; 32];
fn call_elapsed(&self) -> Duration;
}

pub struct ContractExecContext {
Expand All @@ -146,6 +149,26 @@ pub(crate) mod context {
pub block_number: BlockNumber,
pub worker_pubkey: [u8; 32],
pub chain_storage: ChainStorage,
pub start_at: Instant,
}

impl ContractExecContext {
pub fn new(
mode: ExecutionMode,
now_ms: u64,
block_number: BlockNumber,
worker_pubkey: [u8; 32],
chain_storage: ChainStorage,
) -> Self {
Self {
mode,
now_ms,
block_number,
worker_pubkey,
chain_storage,
start_at: Instant::now(),
}
}
}

impl GetContext for ContractExecContext {
Expand All @@ -160,9 +183,14 @@ pub(crate) mod context {
now_ms: self.now_ms,
}
}

fn worker_pubkey(&self) -> [u8; 32] {
self.worker_pubkey
}

fn call_elapsed(&self) -> Duration {
self.start_at.elapsed()
}
}

pub fn get() -> ExecContext {
Expand All @@ -180,6 +208,15 @@ pub(crate) mod context {
pub fn worker_pubkey() -> [u8; 32] {
exec_context::with(|ctx| ctx.worker_pubkey()).unwrap_or_default()
}

pub fn call_elapsed() -> Duration {
exec_context::with(|ctx| ctx.call_elapsed()).unwrap_or_else(|| Duration::from_secs(0))
}

pub fn time_remaining() -> u64 {
const MAX_QUERY_TIME: Duration = Duration::from_secs(10);
MAX_QUERY_TIME.saturating_sub(call_elapsed()).as_millis() as _
}
}

impl OCalls for RuntimeHandle<'_> {
Expand Down Expand Up @@ -262,6 +299,10 @@ impl OCalls for RuntimeHandle<'_> {
fn latest_system_code(&self) -> Vec<u8> {
context::with(|ctx| ctx.chain_storage().pink_system_code().1)
}

fn http_request(&self, request: HttpRequest) -> Result<HttpResponse, HttpRequestError> {
pink_extension_runtime::http_request(request, context::time_remaining())
}
}

impl OCalls for RuntimeHandleMut<'_> {
Expand Down Expand Up @@ -318,6 +359,10 @@ impl OCalls for RuntimeHandleMut<'_> {
fn latest_system_code(&self) -> Vec<u8> {
self.readonly().latest_system_code()
}

fn http_request(&self, request: HttpRequest) -> Result<HttpResponse, HttpRequestError> {
self.readonly().http_request(request)
}
}

impl v1::CrossCall for RuntimeHandle<'_> {
Expand Down Expand Up @@ -461,13 +506,13 @@ impl Cluster {
} else {
ExecutionMode::Query
};
let mut ctx = context::ContractExecContext {
let mut ctx = context::ContractExecContext::new(
mode,
now_ms: context.now_ms,
block_number: context.block_number,
worker_pubkey: context.worker_pubkey,
chain_storage: context.chain_storage,
};
context.now_ms,
context.block_number,
context.worker_pubkey,
context.chain_storage,
);
let log_handler = context.log_handler.clone();
context::using(&mut ctx, move || {
let origin = origin.cloned().ok_or(QueryError::BadOrigin)?;
Expand Down Expand Up @@ -538,13 +583,13 @@ impl Cluster {
.or(Err(QueryError::ServiceUnavailable))?;

let origin = origin.cloned().ok_or(QueryError::BadOrigin)?;
let mut ctx = context::ContractExecContext {
mode: ExecutionMode::Estimating,
now_ms: context.now_ms,
block_number: context.block_number,
worker_pubkey: context.worker_pubkey,
chain_storage: context.chain_storage,
};
let mut ctx = context::ContractExecContext::new(
ExecutionMode::Estimating,
context.now_ms,
context.block_number,
context.worker_pubkey,
context.chain_storage,
);
let log_handler = context.log_handler.clone();
context::using(&mut ctx, move || {
let mut runtime = self.runtime_mut(log_handler);
Expand Down
8 changes: 4 additions & 4 deletions crates/phactory/src/prpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,13 @@ impl<Platform: pal::Platform + Serialize + DeserializeOwned> Phactory<Platform>
let now_ms = state.chain_storage.timestamp_now();
let chain_storage = state.chain_storage.snapshot();
let block_number = block.block_header.number;
let mut context = contracts::pink::context::ContractExecContext {
mode: ExecutionMode::Transaction,
let mut context = contracts::pink::context::ContractExecContext::new(
ExecutionMode::Transaction,
now_ms,
block_number,
worker_pubkey: pubkey,
pubkey,
chain_storage,
};
);
self.check_requirements();
contracts::pink::context::using(&mut context, || {
self.handle_inbound_messages(block_number)
Expand Down
12 changes: 6 additions & 6 deletions crates/pink/capi/src/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,12 @@ pub mod ecall {
pub mod ocall {
use super::{CrossCallMut, Executing, OCall};
use crate::types::{AccountId, BlockNumber, ExecSideEffects, ExecutionMode, Hash};
use pink_extension::chain_extension::StorageQuotaExceeded;
use pink_macro::cross_call;
use scale::{Decode, Encode};

pub use pink_extension::chain_extension::{
HttpRequest, HttpRequestError, HttpResponse, StorageQuotaExceeded,
};
pub type StorageChanges = Vec<(Vec<u8>, (Vec<u8>, i32))>;

#[derive(Decode, Encode, Clone, Debug, Default)]
Expand All @@ -133,11 +135,7 @@ pub mod ocall {
}

impl ExecContext {
pub fn new(
mode: ExecutionMode,
block_number: BlockNumber,
now_ms: u64,
) -> Self {
pub fn new(mode: ExecutionMode, block_number: BlockNumber, now_ms: u64) -> Self {
Self {
mode,
block_number,
Expand Down Expand Up @@ -177,5 +175,7 @@ pub mod ocall {
fn cache_remove(&self, contract: Vec<u8>, key: Vec<u8>) -> Option<Vec<u8>>;
#[xcall(id = 13)]
fn latest_system_code(&self) -> Vec<u8>;
#[xcall(id = 14)]
fn http_request(&self, request: HttpRequest) -> Result<HttpResponse, HttpRequestError>;
}
}
157 changes: 80 additions & 77 deletions crates/pink/pink-extension-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use std::{

use pink_extension::{
chain_extension::{
self as ext, HttpRequest, HttpResponse, PinkExtBackend, SigType, StorageQuotaExceeded,
self as ext, HttpRequest, HttpRequestError, HttpResponse, PinkExtBackend, SigType,
StorageQuotaExceeded,
},
Balance, EcdhPublicKey, EcdsaPublicKey, EcdsaSignature, Hash,
};
Expand All @@ -25,7 +26,6 @@ pub trait PinkRuntimeEnv {
type AccountId: AsRef<[u8]> + Display;

fn address(&self) -> &Self::AccountId;
fn call_elapsed(&self) -> Option<Duration>;
}

pub struct DefaultPinkExtension<'a, T, Error> {
Expand All @@ -42,87 +42,90 @@ impl<'a, T, E> DefaultPinkExtension<'a, T, E> {
}
}

impl<T: PinkRuntimeEnv, E: From<&'static str>> PinkExtBackend for DefaultPinkExtension<'_, T, E> {
type Error = E;
fn http_request(&self, request: HttpRequest) -> Result<HttpResponse, Self::Error> {
// Hardcoded limitations for now
const MAX_QUERY_TIME: Duration = Duration::from_secs(10);
const MAX_BODY_SIZE: usize = 1024 * 256; // 256KB

let elapsed = self.env.call_elapsed().ok_or("Invalid exec env")?;
if elapsed >= MAX_QUERY_TIME {
return Err("Query time limitation exceeded".into());
}
let timeout = MAX_QUERY_TIME.saturating_sub(elapsed);

let url: reqwest::Url = request.url.parse().or(Err("Invalid url"))?;

let client = reqwest::blocking::Client::builder()
.timeout(timeout)
.env_proxy(url.host_str().unwrap_or_default())
.build()
.or(Err("Failed to create client"))?;

let method: Method =
FromStr::from_str(request.method.as_str()).or(Err("Invalid HTTP method"))?;
let mut headers = HeaderMap::new();
for (key, value) in &request.headers {
let key = HeaderName::from_str(key.as_str()).or(Err("Invalid HTTP header key"))?;
let value = HeaderValue::from_str(value).or(Err("Invalid HTTP header value"))?;
headers.insert(key, value);
}

let result = client
.request(method, url)
.headers(headers)
.body(request.body)
.send();

let mut response = match result {
Ok(response) => response,
Err(err) => {
// If there is somthing wrong with the network, we can not inspect the reason too
// much here. Let it return a non-standard 523 here.
log::info!("HTTP request error: {err}");
return Ok(HttpResponse {
status_code: 523,
reason_phrase: "Unreachable".into(),
body: format!("{err:?}").into_bytes(),
headers: vec![],
});
}
};

let headers: Vec<_> = response
.headers()
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or_default().into()))
.collect();

let mut body = Vec::new();
let mut writer = LimitedWriter::new(&mut body, MAX_BODY_SIZE);
pub fn http_request(
request: HttpRequest,
timeout_ms: u64,
) -> Result<HttpResponse, HttpRequestError> {
if timeout_ms == 0 {
return Err(HttpRequestError::Timeout);
}
let timeout = Duration::from_millis(timeout_ms);
let url: reqwest::Url = request.url.parse().or(Err(HttpRequestError::InvalidUrl))?;
let client = reqwest::blocking::Client::builder()
.timeout(timeout)
.env_proxy(url.host_str().unwrap_or_default())
.build()
.or(Err(HttpRequestError::FailedToCreateClient))?;

let method: Method =
FromStr::from_str(request.method.as_str()).or(Err(HttpRequestError::InvalidMethod))?;
let mut headers = HeaderMap::new();
for (key, value) in &request.headers {
let key =
HeaderName::from_str(key.as_str()).or(Err(HttpRequestError::InvalidHeaderName))?;
let value = HeaderValue::from_str(value).or(Err(HttpRequestError::InvalidHeaderValue))?;
headers.insert(key, value);
}

if let Err(err) = response.copy_to(&mut writer) {
log::info!("Failed to read HTTP body: {err}");
let result = client
.request(method, url)
.headers(headers)
.body(request.body)
.send();

let mut response = match result {
Ok(response) => response,
Err(err) => {
// If there is somthing wrong with the network, we can not inspect the reason too
// much here. Let it return a non-standard 523 here.
log::info!("HTTP request error: {err}");
return Ok(HttpResponse {
status_code: 524,
reason_phrase: "IO Error".into(),
status_code: 523,
reason_phrase: "Unreachable".into(),
body: format!("{err:?}").into_bytes(),
headers: vec![],
});
};
}
};

let headers: Vec<_> = response
.headers()
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or_default().into()))
.collect();

const MAX_BODY_SIZE: usize = 1024 * 16; // 16KB

let mut body = Vec::new();
let mut writer = LimitedWriter::new(&mut body, MAX_BODY_SIZE);

if let Err(err) = response.copy_to(&mut writer) {
log::info!("Failed to read HTTP body: {err}");
return Ok(HttpResponse {
status_code: 524,
reason_phrase: "IO Error".into(),
body: format!("{err:?}").into_bytes(),
headers: vec![],
});
};

let response = HttpResponse {
status_code: response.status().as_u16(),
reason_phrase: response
.status()
.canonical_reason()
.unwrap_or_default()
.into(),
body,
headers,
};
Ok(response)
}

let response = HttpResponse {
status_code: response.status().as_u16(),
reason_phrase: response
.status()
.canonical_reason()
.unwrap_or_default()
.into(),
body,
headers,
};
Ok(response)
impl<T: PinkRuntimeEnv, E: From<&'static str>> PinkExtBackend for DefaultPinkExtension<'_, T, E> {
type Error = E;
fn http_request(&self, request: HttpRequest) -> Result<HttpResponse, Self::Error> {
http_request(request, 10 * 1000).map_err(|err| err.display().into())
}

fn sign(
Expand Down
Loading

0 comments on commit 69c812c

Please sign in to comment.