Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 151 additions & 73 deletions src/electrum/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,27 @@ const MAX_ARRAY_BATCH: usize = 20;
#[cfg(feature = "electrum-discovery")]
use crate::electrum::{DiscoveryManager, ServerFeatures};

fn invalid_params(msg: impl Into<String>) -> Error {
ErrorKind::InvalidParams(msg.into()).into()
}

// TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
let script_hash = val.chain_err(|| "missing hash")?;
let script_hash = script_hash.as_str().chain_err(|| "non-string hash")?;
let script_hash = script_hash.parse().chain_err(|| "non-hex hash")?;
let script_hash = val.ok_or_else(|| invalid_params("missing hash"))?;
let script_hash = script_hash
.as_str()
.ok_or_else(|| invalid_params("non-string hash"))?;
let script_hash = script_hash
.parse()
.map_err(|_| invalid_params("non-hex hash"))?;
Ok(script_hash)
}

fn usize_from_value(val: Option<&Value>, name: &str) -> Result<usize> {
let val = val.chain_err(|| format!("missing {}", name))?;
let val = val.as_u64().chain_err(|| format!("non-integer {}", name))?;
let val = val.ok_or_else(|| invalid_params(format!("missing {}", name)))?;
let val = val
.as_u64()
.ok_or_else(|| invalid_params(format!("non-integer {}", name)))?;
Ok(val as usize)
}

Expand All @@ -56,8 +66,10 @@ fn usize_from_value_or(val: Option<&Value>, name: &str, default: usize) -> Resul
}

fn bool_from_value(val: Option<&Value>, name: &str) -> Result<bool> {
let val = val.chain_err(|| format!("missing {}", name))?;
let val = val.as_bool().chain_err(|| format!("not a bool {}", name))?;
let val = val.ok_or_else(|| invalid_params(format!("missing {}", name)))?;
let val = val
.as_bool()
.ok_or_else(|| invalid_params(format!("not a bool {}", name)))?;
Ok(val)
}

Expand All @@ -68,6 +80,52 @@ fn bool_from_value_or(val: Option<&Value>, name: &str, default: bool) -> Result<
bool_from_value(val, name)
}

// JSON-RPC 2.0 error codes (https://www.jsonrpc.org/specification#error_object),
// plus the application-level codes used by ElectrumX and romanz/electrs.
#[repr(i16)]
#[derive(Clone, Copy, PartialEq, Eq)]
enum JsonRpcV2Error {
ParseError = -32700,
InvalidRequest = -32600,
MethodNotFound = -32601,
InvalidParams = -32602,
InternalError = -32603,
BadRequest = 1,
DaemonError = 2,
}

impl JsonRpcV2Error {
#[inline]
fn into_i16(self) -> i16 {
self as i16
}
}

fn jsonrpc_code(e: &Error) -> JsonRpcV2Error {
match e.kind() {
ErrorKind::InvalidParams(_) => JsonRpcV2Error::InvalidParams,
ErrorKind::TooPopular => JsonRpcV2Error::BadRequest,
ErrorKind::RpcError(..) => JsonRpcV2Error::DaemonError,
_ => JsonRpcV2Error::InternalError,
}
}

#[inline]
fn json_rpc_error(
input: impl core::fmt::Display,
id: Option<&Value>,
code: JsonRpcV2Error,
) -> Value {
json!({
"jsonrpc": "2.0",
"id": id.unwrap_or(&Value::Null),
"error": {
"code": code.into_i16(),
"message": format!("{}", input),
},
})
}

// TODO: implement caching and delta updates
#[trace]
fn get_status_hash(txs: Vec<(Txid, Option<BlockId>)>, query: &Query) -> Option<FullHash> {
Expand Down Expand Up @@ -200,9 +258,10 @@ impl Connection {

let features = params
.get(0)
.chain_err(|| "missing features param")?
.ok_or_else(|| invalid_params("missing features param"))?
.clone();
let features = serde_json::from_value(features).chain_err(|| "invalid features")?;
let features =
serde_json::from_value(features).map_err(|_| invalid_params("invalid features"))?;

discovery.add_server_request(self.addr.ip(), features)?;
Ok(json!(true))
Expand Down Expand Up @@ -288,7 +347,7 @@ impl Connection {
}

fn blockchain_scripthash_subscribe(&mut self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
let script_hash = hash_from_value(params.get(0))?;

let history_txids = get_history(&self.query, &script_hash[..], self.txs_limit)?;
let status_hash = get_status_hash(history_txids, &self.query)
Expand All @@ -301,7 +360,7 @@ impl Connection {
}

fn blockchain_scripthash_unsubscribe(&mut self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
let script_hash = hash_from_value(params.get(0))?;

match self.status_hashes.remove(&script_hash) {
None => Ok(json!(false)),
Expand All @@ -314,7 +373,7 @@ impl Connection {

#[cfg(not(feature = "liquid"))]
fn blockchain_scripthash_get_balance(&self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
let script_hash = hash_from_value(params.get(0))?;
let (chain_stats, mempool_stats) = self.query.stats(&script_hash[..]);

Ok(json!({
Expand All @@ -324,7 +383,7 @@ impl Connection {
}

fn blockchain_scripthash_get_history(&self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
let script_hash = hash_from_value(params.get(0))?;
let history_txids = get_history(&self.query, &script_hash[..], self.txs_limit)?;

Ok(json!(history_txids
Expand All @@ -342,7 +401,7 @@ impl Connection {
}

fn blockchain_scripthash_listunspent(&self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
let script_hash = hash_from_value(params.get(0))?;
let utxos = self.query.utxo(&script_hash[..])?;

let to_json = |utxo: Utxo| {
Expand Down Expand Up @@ -370,8 +429,11 @@ impl Connection {
}

fn blockchain_transaction_broadcast(&self, params: &[Value]) -> Result<Value> {
let tx = params.get(0).chain_err(|| "missing tx")?;
let tx = tx.as_str().chain_err(|| "non-string tx")?.to_string();
let tx = params.get(0).ok_or_else(|| invalid_params("missing tx"))?;
let tx = tx
.as_str()
.ok_or_else(|| invalid_params("non-string tx"))?
.to_string();
let txid = self.query.broadcast_raw(&tx)?;
if let Err(e) = self.sender.try_send(Message::PeriodicUpdate) {
warn!("failed to issue PeriodicUpdate after broadcast: {}", e);
Expand All @@ -380,9 +442,11 @@ impl Connection {
}

fn blockchain_transaction_get(&self, params: &[Value]) -> Result<Value> {
let tx_hash = Txid::from(hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?);
let tx_hash = Txid::from(hash_from_value(params.get(0))?);
let verbose = match params.get(1) {
Some(value) => value.as_bool().chain_err(|| "non-bool verbose value")?,
Some(value) => value
.as_bool()
.ok_or_else(|| invalid_params("non-bool verbose value"))?,
None => false,
};

Expand All @@ -400,15 +464,15 @@ impl Connection {

#[trace]
fn blockchain_transaction_get_merkle(&self, params: &[Value]) -> Result<Value> {
let txid = Txid::from(hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?);
let txid = Txid::from(hash_from_value(params.get(0))?);
let height = usize_from_value(params.get(1), "height")?;
let blockid = self
.query
.chain()
.tx_confirming_block(&txid)
.ok_or_else(|| "tx not found or is unconfirmed")?;
if blockid.height != height {
bail!("invalid confirmation height provided");
return Err(invalid_params("invalid confirmation height provided"));
}
let (merkle, pos) = get_tx_merkle_proof(self.query.chain(), &txid, &blockid.hash)
.chain_err(|| "cannot create merkle proof")?;
Expand Down Expand Up @@ -474,10 +538,16 @@ impl Connection {
#[cfg(feature = "electrum-discovery")]
"server.add_peer" => self.server_add_peer(&params),

&_ => bail!("unknown method {} {:?}", method, params),
&_ => {
warn!("rpc #{} unknown method {} {:?}", id, method, params);
return Ok(json_rpc_error(
format!("unknown method {}", method),
Some(id),
JsonRpcV2Error::MethodNotFound,
));
}
};
timer.observe_duration();
// TODO: return application errors should be sent to the client
Ok(match result {
Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}),
Err(e) => {
Expand All @@ -488,7 +558,7 @@ impl Connection {
params,
e.display_chain()
);
json!({"jsonrpc": "2.0", "id": id, "error": format!("{}", e)})
json_rpc_error(&e, Some(id), jsonrpc_code(&e))
}
})
}
Expand Down Expand Up @@ -566,25 +636,28 @@ impl Connection {
trace!("RPC {:?}", msg);
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
if let Value::Array(arr) = cmd {
if arr.len() > MAX_ARRAY_BATCH {
bail!(
"Too many elements in batch requests {} max:{}",
arr.len(),
MAX_ARRAY_BATCH
);
let reply = match from_str::<Value>(&line) {
Ok(Value::Array(arr)) => {
if arr.len() > MAX_ARRAY_BATCH {
bail!(
"Too many elements in batch requests {} max:{}",
arr.len(),
MAX_ARRAY_BATCH
);
}
let mut result = Vec::with_capacity(arr.len());
for el in arr {
result.push(self.handle_value(el, &empty_params));
}
Value::Array(result)
}
let mut result = Vec::with_capacity(arr.len());
for el in arr {
let reply = self.handle_value(el, &empty_params)?;
result.push(reply)
Ok(cmd) => self.handle_value(cmd, &empty_params),
Err(err) => {
warn!("[{}] invalid JSON request: {}", self.addr, err);
json_rpc_error("parse error", None, JsonRpcV2Error::ParseError)
}
self.send_values(&[Value::Array(result)])?
} else {
let reply = self.handle_value(cmd, &empty_params)?;
self.send_values(&[reply])?
}
};
self.send_values(&[reply])?
}
Message::PeriodicUpdate => {
let values = self
Expand All @@ -597,41 +670,46 @@ impl Connection {
}
}

fn handle_value(&mut self, cmd: Value, empty_params: &Value) -> Result<Value> {
fn handle_value(&mut self, cmd: Value, empty_params: &Value) -> Value {
let start_time = Instant::now();
Ok(
match (
cmd.get("method"),
cmd.get("params").unwrap_or_else(|| empty_params),
cmd.get("id"),
) {
(Some(&Value::String(ref method)), &Value::Array(ref params), Some(ref id)) => {
let reply = self.handle_command(method, params, id)?;

conditionally_log_rpc_event!(
self,
json!({
"event": "rpc_response",
"method": method,
"params": if self.rpc_logging.hide_params {
Value::Null
} else {
json!(params)
},
"request_size": serde_json::to_vec(&cmd).map(|v| v.len()).unwrap_or(0),
"response_size": reply.to_string().as_bytes().len(),
"duration_micros": start_time.elapsed().as_micros(),
"id": id,
})
);
match (
cmd.get("method"),
cmd.get("params").unwrap_or_else(|| empty_params),
cmd.get("id"),
) {
(Some(&Value::String(ref method)), &Value::Array(ref params), Some(ref id)) => {
let reply = self.handle_command(method, params, id).unwrap_or_else(|e| {
json_rpc_error(
format!("{} failed: {}", method, e),
Some(id),
JsonRpcV2Error::InternalError,
)
});

conditionally_log_rpc_event!(
self,
json!({
"event": "rpc_response",
"method": method,
"params": if self.rpc_logging.hide_params {
Value::Null
} else {
json!(params)
},
"request_size": serde_json::to_vec(&cmd).map(|v| v.len()).unwrap_or(0),
"response_size": reply.to_string().as_bytes().len(),
"duration_micros": start_time.elapsed().as_micros(),
"id": id,
})
);

reply
}
_ => {
bail!("invalid command: {}", cmd)
}
},
)
reply
}
_ => {
warn!("[{}] invalid request: {}", self.addr, cmd);
json_rpc_error("invalid request", cmd.get("id"), JsonRpcV2Error::InvalidRequest)
}
}
}

#[trace]
Expand Down
5 changes: 5 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ error_chain! {
display("Too many history entries")
}

InvalidParams(msg: String) {
description("Invalid RPC params")
display("{}", msg)
}

#[cfg(feature = "electrum-discovery")]
ElectrumClient(e: electrum_client::Error) {
description("Electrum client error")
Expand Down
Loading
Loading