Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
Merge pull request #24 from stevenroose/batching
Browse files Browse the repository at this point in the history
Support JSON-RPC batching
  • Loading branch information
apoelstra authored Apr 5, 2019
2 parents 972b7d4 + 261d77c commit 02697b7
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 4 deletions.
61 changes: 57 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! and parsing responses
//!
use std::collections::HashMap;
use std::io;
use std::io::Read;
use std::sync::{Arc, Mutex};
Expand All @@ -29,6 +30,7 @@ use serde;
use serde_json;

use super::{Request, Response};
use util::HashableValue;
use error::Error;

/// A handle to a remote JSONRPC server
Expand Down Expand Up @@ -67,10 +69,14 @@ impl Client {
Ok(response.into_result()?)
}

/// Sends a request to a client
pub fn send_request(&self, request: &Request) -> Result<Response, Error> {
/// The actual send logic used by both [send_request] and [send_batch].
fn send_raw<B, R>(&self, body: &B) -> Result<R, Error>
where
B: serde::ser::Serialize,
R: for<'de> serde::de::Deserialize<'de>,
{
// Build request
let request_raw = serde_json::to_vec(request)?;
let request_raw = serde_json::to_vec(body)?;

// Setup connection
let mut headers = Headers::new();
Expand Down Expand Up @@ -115,8 +121,14 @@ impl Client {

// nb we ignore stream.status since we expect the body
// to contain information about any error
let response: Response = serde_json::from_reader(&mut stream)?;
let response: R = serde_json::from_reader(&mut stream)?;
stream.bytes().count(); // Drain the stream so it can be reused
Ok(response)
}

/// Sends a request to a client
pub fn send_request(&self, request: &Request) -> Result<Response, Error> {
let response: Response = self.send_raw(&request)?;
if response.jsonrpc != None && response.jsonrpc != Some(From::from("2.0")) {
return Err(Error::VersionMismatch);
}
Expand All @@ -126,6 +138,47 @@ impl Client {
Ok(response)
}

/// Sends a batch of requests to the client. The return vector holds the response
/// for the request at the corresponding index. If no response was provided, it's [None].
///
/// Note that the requests need to have valid IDs, so it is advised to create the requests
/// with [build_request].
pub fn send_batch(&self, requests: &[Request]) -> Result<Vec<Option<Response>>, Error> {
if requests.len() < 1 {
return Err(Error::EmptyBatch);
}

// If the request body is invalid JSON, the response is a single response object.
// We ignore this case since we are confident we are producing valid JSON.
let responses: Vec<Response> = self.send_raw(&requests)?;
if responses.len() > requests.len() {
return Err(Error::WrongBatchResponseSize);
}

// To prevent having to clone responses, we first copy all the IDs so we can reference
// them easily. IDs can only be of JSON type String or Number (or Null), so cloning
// should be inexpensive and require no allocations as Numbers are more common.
let ids: Vec<serde_json::Value> = responses.iter().map(|r| r.id.clone()).collect();
// First index responses by ID and catch duplicate IDs.
let mut resp_by_id = HashMap::new();
for (id, resp) in ids.iter().zip(responses.into_iter()) {
if let Some(dup) = resp_by_id.insert(HashableValue(&id), resp) {
return Err(Error::BatchDuplicateResponseId(dup.id));
}
}
// Match responses to the requests.
let results =
requests.into_iter().map(|r| resp_by_id.remove(&HashableValue(&r.id))).collect();

// Since we're also just producing the first duplicate ID, we can also just produce the
// first incorrect ID in case there are multiple.
if let Some(incorrect) = resp_by_id.into_iter().nth(0) {
return Err(Error::WrongBatchResponseId(incorrect.1.id));
}

Ok(results)
}

/// Builds a request
pub fn build_request<'a, 'b>(
&self,
Expand Down
18 changes: 18 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ pub enum Error {
NonceMismatch,
/// Response to a request had a jsonrpc field other than "2.0"
VersionMismatch,
/// Batches can't be empty
EmptyBatch,
/// Too many responses returned in batch
WrongBatchResponseSize,
/// Batch response contained a duplicate ID
BatchDuplicateResponseId(serde_json::Value),
/// Batch response contained an ID that didn't correspond to any request ID
WrongBatchResponseId(serde_json::Value),
}

impl From<serde_json::Error> for Error {
Expand All @@ -63,6 +71,10 @@ impl fmt::Display for Error {
Error::Json(ref e) => write!(f, "JSON decode error: {}", e),
Error::Hyper(ref e) => write!(f, "Hyper error: {}", e),
Error::Rpc(ref r) => write!(f, "RPC error response: {:?}", r),
Error::BatchDuplicateResponseId(ref v) => {
write!(f, "duplicate RPC batch response ID: {}", v)
}
Error::WrongBatchResponseId(ref v) => write!(f, "wrong RPC batch response ID: {}", v),
_ => f.write_str(error::Error::description(self)),
}
}
Expand All @@ -76,6 +88,12 @@ impl error::Error for Error {
Error::Rpc(_) => "RPC error response",
Error::NonceMismatch => "Nonce of response did not match nonce of request",
Error::VersionMismatch => "`jsonrpc` field set to non-\"2.0\"",
Error::EmptyBatch => "batches can't be empty",
Error::WrongBatchResponseSize => "too many responses returned in batch",
Error::BatchDuplicateResponseId(_) => "batch response contained a duplicate ID",
Error::WrongBatchResponseId(_) => {
"batch response contained an ID that didn't correspond to any request ID"
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub extern crate serde_json;

pub mod client;
pub mod error;
mod util;

// Re-export error type
pub use error::Error;
Expand Down Expand Up @@ -158,4 +159,18 @@ mod tests {
assert!(recovered1.is_err());
assert!(recovered2.is_err());
}

#[test]
fn batch_response() {
// from the jsonrpc.org spec example
let s = r#"[
{"jsonrpc": "2.0", "result": 7, "id": "1"},
{"jsonrpc": "2.0", "result": 19, "id": "2"},
{"jsonrpc": "2.0", "error": {"code": -32600, "message": "Invalid Request"}, "id": null},
{"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": "5"},
{"jsonrpc": "2.0", "result": ["hello", 5], "id": "9"}
]"#;
let batch_response: Vec<Response> = serde_json::from_str(&s).unwrap();
assert_eq!(batch_response.len(), 5);
}
}
120 changes: 120 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Rust JSON-RPC Library
// Written in 2019 by
// Andrew Poelstra <[email protected]>
//
// To the extent possible under law, the author(s) have dedicated all
// copyright and related and neighboring rights to this software to
// the public domain worldwide. This software is distributed without
// any warranty.
//
// You should have received a copy of the CC0 Public Domain Dedication
// along with this software.
// If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
//

use std::hash::{Hash, Hasher};

use serde_json::Value;

/// Newtype around `Value` which allows hashing for use as hashmap keys
/// This is needed for batch requests.
///
/// The reason `Value` does not support `Hash` or `Eq` by itself
/// is that it supports `f64` values; but for batch requests we
/// will only be hashing the "id" field of the request/response
/// pair, which should never need decimal precision and therefore
/// never use `f64`.
#[derive(Clone, PartialEq, Debug)]
pub struct HashableValue<'a>(pub &'a Value);

impl<'a> Eq for HashableValue<'a> {}

impl<'a> Hash for HashableValue<'a> {
fn hash<H: Hasher>(&self, state: &mut H) {
match *self.0 {
Value::Null => "null".hash(state),
Value::Bool(false) => "false".hash(state),
Value::Bool(true) => "true".hash(state),
Value::Number(ref n) => {
"number".hash(state);
if let Some(n) = n.as_i64() {
n.hash(state);
} else if let Some(n) = n.as_u64() {
n.hash(state);
} else {
n.to_string().hash(state);
}
},
Value::String(ref s) => {
"string".hash(state);
s.hash(state);
},
Value::Array(ref v) => {
"array".hash(state);
v.len().hash(state);
for obj in v {
HashableValue(obj).hash(state);
}
},
Value::Object(ref m) => {
"object".hash(state);
m.len().hash(state);
for (key, val) in m {
key.hash(state);
HashableValue(val).hash(state);
}
}
}
}
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::str::FromStr;

use super::*;

#[test]
fn hash_value() {
let val = Value::from_str("null").unwrap();
let val = HashableValue(&val);

let t = Value::from_str("true").unwrap();
let t = HashableValue(&t);

let f = Value::from_str("false").unwrap();
let f = HashableValue(&f);

let ns = Value::from_str("[0, -0, 123.4567, -100000000]").unwrap();
let ns = HashableValue(&ns);

let m = Value::from_str("{ \"field\": 0, \"field\": -0 }").unwrap();
let m = HashableValue(&m);

let mut coll = HashSet::new();

assert!(!coll.contains(&val));
coll.insert(val.clone());
assert!(coll.contains(&val));

assert!(!coll.contains(&t));
assert!(!coll.contains(&f));
coll.insert(t.clone());
assert!(coll.contains(&t));
assert!(!coll.contains(&f));
coll.insert(f.clone());
assert!(coll.contains(&t));
assert!(coll.contains(&f));

assert!(!coll.contains(&ns));
coll.insert(ns.clone());
assert!(coll.contains(&ns));

assert!(!coll.contains(&m));
coll.insert(m.clone());
assert!(coll.contains(&m));
}
}


0 comments on commit 02697b7

Please sign in to comment.