feat(providers): add tracing (#113)

* feat(provider): add async tracing

* feat(middleware): add tracing to gas escalator

* chore: make clippy happy
This commit is contained in:
Georgios Konstantopoulos 2020-12-24 22:23:05 +02:00 committed by GitHub
parent 65b50a7668
commit dc4c2a807e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 119 additions and 181 deletions

16
Cargo.lock generated
View File

@ -875,6 +875,8 @@ dependencies = [
"serde-aux",
"thiserror",
"tokio",
"tracing",
"tracing-futures",
"url",
]
@ -902,6 +904,8 @@ dependencies = [
"thiserror",
"tokio",
"tokio-native-tls",
"tracing",
"tracing-futures",
"url",
]
@ -2555,9 +2559,21 @@ dependencies = [
"cfg-if 1.0.0",
"log",
"pin-project-lite 0.2.0",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80e0ccfc3378da0cce270c946b676a376943f5cd16aeba64568e7939806f4ada"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.17"

View File

@ -7,7 +7,7 @@ use serde::{
use std::{collections::BTreeMap, fmt, str::FromStr};
/// Transaction summary as found in the Txpool Inspection property.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct TxpoolInspectSummary {
/// Recipient (None when contract creation)
pub to: Option<Address>,
@ -122,7 +122,7 @@ pub struct TxpoolContent {
///
/// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_inspect) for more details
///
#[derive(Debug, Default, Clone, PartialEq, Deserialize)]
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
pub struct TxpoolInspect {
/// pending tx
pub pending: BTreeMap<Address, BTreeMap<String, TxpoolInspectSummary>>,

View File

@ -32,6 +32,8 @@ url = { version = "2.1.1", default-features = false }
# optional for runtime
tokio = { version = "0.2.22", optional = true }
async-std = { version = "1.6.5", optional = true }
tracing = "0.1.22"
tracing-futures = "0.2.4"
[dev-dependencies]
ethers = { version = "0.1.3", path = "../ethers" }

View File

@ -12,6 +12,9 @@ use std::sync::Arc;
use std::{pin::Pin, time::Instant};
use thiserror::Error;
#[cfg(any(feature = "async-std", feature = "tokio"))]
use tracing_futures::Instrument;
#[cfg(all(not(feature = "tokio"), feature = "async-std"))]
use async_std::task::spawn;
#[cfg(all(feature = "tokio", not(feature = "async-std")))]
@ -136,7 +139,11 @@ where
{
let this2 = this.clone();
spawn(async move {
this2.escalate().await.unwrap();
this2
.escalate()
.instrument(tracing::trace_span!("gas-escalation"))
.await
.unwrap();
});
}
@ -170,15 +177,16 @@ where
txs.pop().expect("should have element in vector");
let receipt = self.get_transaction_receipt(tx_hash).await?;
tracing::trace!(tx_hash = ?tx_hash, "checking if exists");
if receipt.is_none() {
let old_gas_price = replacement_tx.gas_price.expect("gas price must be set");
// Get the new gas price based on how much time passed since the
// tx was last broadcast
let new_gas_price = self.escalator.get_gas_price(
replacement_tx.gas_price.expect("gas price must be set"),
now.duration_since(time).as_secs(),
);
let new_gas_price = self
.escalator
.get_gas_price(old_gas_price, now.duration_since(time).as_secs());
let new_txhash = if Some(new_gas_price) != replacement_tx.gas_price {
let new_txhash = if new_gas_price != old_gas_price {
// bump the gas price
replacement_tx.gas_price = Some(new_gas_price);
@ -188,7 +196,17 @@ where
.send_transaction(replacement_tx.clone(), priority)
.await
{
Ok(tx_hash) => *tx_hash,
Ok(new_tx_hash) => {
let new_tx_hash = *new_tx_hash;
tracing::trace!(
old_tx_hash = ?tx_hash,
new_tx_hash = ?new_tx_hash,
old_gas_price = ?old_gas_price,
new_gas_price = ?new_gas_price,
"escalated"
);
new_tx_hash
}
Err(err) => {
if err.to_string().contains("nonce too low") {
// ignore "nonce too low" errors because they

View File

@ -42,6 +42,8 @@ async-tls = { version = "0.7.0", optional = true }
# needed for parsing while deserialization in gas oracles
serde-aux = "0.6.1"
auto_impl = "0.4.1"
tracing = "0.1.22"
tracing-futures = "0.2.4"
[dev-dependencies]
ethers = { version = "0.1.3", path = "../ethers" }

View File

@ -140,7 +140,7 @@ pub trait JsonRpcClient: Debug + Send + Sync {
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned;
R: Serialize + DeserializeOwned;
}
use ethers_core::types::*;
@ -333,7 +333,7 @@ pub trait Middleware: Sync + Send + Debug {
async fn get_filter_changes<T, R>(&self, id: T) -> Result<Vec<R>, Self::Error>
where
T: Into<U256> + Send + Sync,
R: DeserializeOwned + Send + Sync,
R: Serialize + DeserializeOwned + Send + Sync + Debug,
{
self.inner()
.get_filter_changes(id)

View File

@ -121,11 +121,12 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
// if the transaction has at least K confirmations, return the receipt
// (subtract 1 since the tx already has 1 conf when it's mined)
if current_block >= inclusion_block + *this.confirmations - 1 {
if current_block > inclusion_block + *this.confirmations - 1 {
let receipt = *receipt.clone();
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(receipt));
} else {
tracing::trace!(tx_hash = ?this.tx_hash, "confirmations {}/{}", current_block - inclusion_block + 1, this.confirmations);
*this.state = PendingTxState::PausedGettingBlockNumber(receipt.clone());
ctx.waker().wake_by_ref();
}

View File

@ -22,6 +22,8 @@ use thiserror::Error;
use url::{ParseError, Url};
use std::{convert::TryFrom, fmt::Debug, time::Duration};
use tracing::trace;
use tracing_futures::Instrument;
/// An abstract provider for interacting with the [Ethereum JSON RPC
/// API](https://github.com/ethereum/wiki/wiki/JSON-RPC). Must be instantiated
@ -70,6 +72,9 @@ pub enum ProviderError {
/// An error during ENS name resolution
#[error("ens name not found: {0}")]
EnsError(String),
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),
}
/// Types of filters supported by the JSON-RPC.
@ -97,7 +102,26 @@ impl<P: JsonRpcClient> Provider<P> {
self
}
async fn get_block_gen<Tx: Default + Serialize + DeserializeOwned>(
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, ProviderError>
where
T: Debug + Serialize + Send + Sync,
R: Serialize + DeserializeOwned + Debug,
{
let span =
tracing::trace_span!("rpc", method = method, params = ?serde_json::to_string(&params)?);
// https://docs.rs/tracing/0.1.22/tracing/span/struct.Span.html#in-asynchronous-code
let res = async move {
trace!("tx");
let res: R = self.0.request(method, params).await.map_err(Into::into)?;
trace!(rx = ?serde_json::to_string(&res)?);
Ok::<_, ProviderError>(res)
}
.instrument(span)
.await?;
Ok(res)
}
async fn get_block_gen<Tx: Default + Serialize + DeserializeOwned + Debug>(
&self,
id: BlockId,
include_txs: bool,
@ -107,17 +131,13 @@ impl<P: JsonRpcClient> Provider<P> {
Ok(match id {
BlockId::Hash(hash) => {
let hash = utils::serialize(&hash);
self.0
.request("eth_getBlockByHash", [hash, include_txs])
.await
.map_err(Into::into)?
self.request("eth_getBlockByHash", [hash, include_txs])
.await?
}
BlockId::Number(num) => {
let num = utils::serialize(&num);
self.0
.request("eth_getBlockByNumber", [num, include_txs])
.await
.map_err(Into::into)?
self.request("eth_getBlockByNumber", [num, include_txs])
.await?
}
})
}
@ -143,11 +163,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
/// Gets the latest block number via the `eth_BlockNumber` API
async fn get_block_number(&self) -> Result<U64, ProviderError> {
Ok(self
.0
.request("eth_blockNumber", ())
.await
.map_err(Into::into)?)
self.request("eth_blockNumber", ()).await
}
/// Gets the block at `block_hash_or_number` (transaction hashes only)
@ -155,9 +171,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
&self,
block_hash_or_number: T,
) -> Result<Option<Block<TxHash>>, Self::Error> {
Ok(self
.get_block_gen(block_hash_or_number.into(), false)
.await?)
self.get_block_gen(block_hash_or_number.into(), false).await
}
/// Gets the block at `block_hash_or_number` (full transactions included)
@ -165,9 +179,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
&self,
block_hash_or_number: T,
) -> Result<Option<Block<Transaction>>, ProviderError> {
Ok(self
.get_block_gen(block_hash_or_number.into(), true)
.await?)
self.get_block_gen(block_hash_or_number.into(), true).await
}
/// Gets the transaction with `transaction_hash`
@ -176,11 +188,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
transaction_hash: T,
) -> Result<Option<Transaction>, ProviderError> {
let hash = transaction_hash.into();
Ok(self
.0
.request("eth_getTransactionByHash", [hash])
.await
.map_err(Into::into)?)
self.request("eth_getTransactionByHash", [hash]).await
}
/// Gets the transaction receipt with `transaction_hash`
@ -189,29 +197,17 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
transaction_hash: T,
) -> Result<Option<TransactionReceipt>, ProviderError> {
let hash = transaction_hash.into();
Ok(self
.0
.request("eth_getTransactionReceipt", [hash])
.await
.map_err(Into::into)?)
self.request("eth_getTransactionReceipt", [hash]).await
}
/// Gets the current gas price as estimated by the node
async fn get_gas_price(&self) -> Result<U256, ProviderError> {
Ok(self
.0
.request("eth_gasPrice", ())
.await
.map_err(Into::into)?)
self.request("eth_gasPrice", ()).await
}
/// Gets the accounts on the node
async fn get_accounts(&self) -> Result<Vec<Address>, ProviderError> {
Ok(self
.0
.request("eth_accounts", ())
.await
.map_err(Into::into)?)
self.request("eth_accounts", ()).await
}
/// Returns the nonce of the address
@ -227,11 +223,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
let from = utils::serialize(&from);
let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest));
Ok(self
.0
.request("eth_getTransactionCount", [from, block])
.await
.map_err(Into::into)?)
self.request("eth_getTransactionCount", [from, block]).await
}
/// Returns the account's balance
@ -247,21 +239,13 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
let from = utils::serialize(&from);
let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest));
Ok(self
.0
.request("eth_getBalance", [from, block])
.await
.map_err(Into::into)?)
self.request("eth_getBalance", [from, block]).await
}
/// Returns the currently configured chain id, a value used in replay-protected
/// transaction signing as introduced by EIP-155.
async fn get_chainid(&self) -> Result<U256, ProviderError> {
Ok(self
.0
.request("eth_chainId", ())
.await
.map_err(Into::into)?)
self.request("eth_chainId", ()).await
}
////// Contract Execution
@ -277,24 +261,14 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
) -> Result<Bytes, ProviderError> {
let tx = utils::serialize(tx);
let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest));
Ok(self
.0
.request("eth_call", [tx, block])
.await
.map_err(Into::into)?)
self.request("eth_call", [tx, block]).await
}
/// Sends a transaction to a single Ethereum node and return the estimated amount of gas required (as a U256) to send it
/// This is free, but only an estimate. Providing too little gas will result in a transaction being rejected
/// (while still consuming all provided gas).
async fn estimate_gas(&self, tx: &TransactionRequest) -> Result<U256, ProviderError> {
let tx = utils::serialize(tx);
Ok(self
.0
.request("eth_estimateGas", [tx])
.await
.map_err(Into::into)?)
self.request("eth_estimateGas", [tx]).await
}
/// Sends the transaction to the entire Ethereum network and returns the transaction's hash
@ -322,11 +296,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
}
}
let tx_hash = self
.0
.request("eth_sendTransaction", [tx])
.await
.map_err(Into::into)?;
let tx_hash = self.request("eth_sendTransaction", [tx]).await?;
Ok(PendingTransaction::new(tx_hash, self).interval(self.get_interval()))
}
@ -338,11 +308,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
tx: &Transaction,
) -> Result<PendingTransaction<'a, P>, ProviderError> {
let rlp = utils::serialize(&tx.rlp());
let tx_hash = self
.0
.request("eth_sendRawTransaction", [rlp])
.await
.map_err(Into::into)?;
let tx_hash = self.request("eth_sendRawTransaction", [rlp]).await?;
Ok(PendingTransaction::new(tx_hash, self).interval(self.get_interval()))
}
@ -354,22 +320,14 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
) -> Result<Signature, ProviderError> {
let data = utils::serialize(&data.into());
let from = utils::serialize(from);
Ok(self
.0
.request("eth_sign", [from, data])
.await
.map_err(Into::into)?)
self.request("eth_sign", [from, data]).await
}
////// Contract state
/// Returns an array (possibly empty) of logs that match the filter
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>, ProviderError> {
Ok(self
.0
.request("eth_getLogs", [filter])
.await
.map_err(Into::into)?)
self.request("eth_getLogs", [filter]).await
}
/// Streams matching filter logs
@ -407,7 +365,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
FilterKind::Logs(filter) => ("eth_newFilter", vec![utils::serialize(&filter)]),
};
Ok(self.0.request(method, args).await.map_err(Into::into)?)
self.request(method, args).await
}
/// Uninstalls a filter
@ -416,11 +374,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
id: T,
) -> Result<bool, ProviderError> {
let id = utils::serialize(&id.into());
Ok(self
.0
.request("eth_uninstallFilter", [id])
.await
.map_err(Into::into)?)
self.request("eth_uninstallFilter", [id]).await
}
/// Polling method for a filter, which returns an array of logs which occurred since last poll.
@ -439,14 +393,10 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
async fn get_filter_changes<T, R>(&self, id: T) -> Result<Vec<R>, ProviderError>
where
T: Into<U256> + Send + Sync,
R: DeserializeOwned + Send + Sync,
R: Serialize + DeserializeOwned + Send + Sync + Debug,
{
let id = utils::serialize(&id.into());
Ok(self
.0
.request("eth_getFilterChanges", [id])
.await
.map_err(Into::into)?)
self.request("eth_getFilterChanges", [id]).await
}
/// Get the storage of an address for a particular slot location
@ -464,11 +414,8 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
let from = utils::serialize(&from);
let location = utils::serialize(&location);
let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest));
Ok(self
.0
.request("eth_getStorageAt", [from, location, block])
self.request("eth_getStorageAt", [from, location, block])
.await
.map_err(Into::into)?)
}
/// Returns the deployed code at a given address
@ -484,11 +431,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
let at = utils::serialize(&at);
let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest));
Ok(self
.0
.request("eth_getCode", [at, block])
.await
.map_err(Into::into)?)
self.request("eth_getCode", [at, block]).await
}
////// Ethereum Naming Service
@ -524,33 +467,21 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
/// block(s), as well as the ones that are being scheduled for future execution only.
/// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_content)
async fn txpool_content(&self) -> Result<TxpoolContent, ProviderError> {
Ok(self
.0
.request("txpool_content", ())
.await
.map_err(Into::into)?)
self.request("txpool_content", ()).await
}
/// Returns a summary of all the transactions currently pending for inclusion in the next
/// block(s), as well as the ones that are being scheduled for future execution only.
/// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_inspect)
async fn txpool_inspect(&self) -> Result<TxpoolInspect, ProviderError> {
Ok(self
.0
.request("txpool_inspect", ())
.await
.map_err(Into::into)?)
self.request("txpool_inspect", ()).await
}
/// Returns the number of transactions currently pending for inclusion in the next block(s), as
/// well as the ones that are being scheduled for future execution only.
/// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_status)
async fn txpool_status(&self) -> Result<TxpoolStatus, ProviderError> {
Ok(self
.0
.request("txpool_status", ())
.await
.map_err(Into::into)?)
self.request("txpool_status", ()).await
}
/// Executes the given call and returns a number of possible traces for it
@ -563,10 +494,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
let req = utils::serialize(&req);
let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest));
let trace_type = utils::serialize(&trace_type);
self.0
.request("trace_call", [req, trace_type, block])
.await
.map_err(Into::into)
self.request("trace_call", [req, trace_type, block]).await
}
/// Traces a call to `eth_sendRawTransaction` without making the call, returning the traces
@ -577,10 +505,8 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
) -> Result<BlockTrace, ProviderError> {
let data = utils::serialize(&data);
let trace_type = utils::serialize(&trace_type);
self.0
.request("trace_rawTransaction", [data, trace_type])
self.request("trace_rawTransaction", [data, trace_type])
.await
.map_err(Into::into)
}
/// Replays a transaction, returning the traces
@ -591,11 +517,8 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
) -> Result<BlockTrace, ProviderError> {
let hash = utils::serialize(&hash);
let trace_type = utils::serialize(&trace_type);
Ok(self
.0
.request("trace_replayTransaction", [hash, trace_type])
self.request("trace_replayTransaction", [hash, trace_type])
.await
.map_err(Into::into)?)
}
/// Replays all transactions in a block returning the requested traces for each transaction
@ -606,28 +529,20 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
) -> Result<Vec<BlockTrace>, ProviderError> {
let block = utils::serialize(&block);
let trace_type = utils::serialize(&trace_type);
self.0
.request("trace_replayBlockTransactions", [block, trace_type])
self.request("trace_replayBlockTransactions", [block, trace_type])
.await
.map_err(Into::into)
}
/// Returns traces created at given block
async fn trace_block(&self, block: BlockNumber) -> Result<Vec<Trace>, ProviderError> {
let block = utils::serialize(&block);
self.0
.request("trace_block", [block])
.await
.map_err(Into::into)
self.request("trace_block", [block]).await
}
/// Return traces matching the given filter
async fn trace_filter(&self, filter: TraceFilter) -> Result<Vec<Trace>, ProviderError> {
let filter = utils::serialize(&filter);
self.0
.request("trace_filter", vec![filter])
.await
.map_err(Into::into)
self.request("trace_filter", vec![filter]).await
}
/// Returns trace at the given position
@ -639,19 +554,13 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
let hash = utils::serialize(&hash);
let index: Vec<U64> = index.into_iter().map(|i| i.into()).collect();
let index = utils::serialize(&index);
self.0
.request("trace_get", vec![hash, index])
.await
.map_err(Into::into)
self.request("trace_get", vec![hash, index]).await
}
/// Returns all traces of a given transaction
async fn trace_transaction(&self, hash: H256) -> Result<Vec<Trace>, ProviderError> {
let hash = utils::serialize(&hash);
self.0
.request("trace_transaction", vec![hash])
.await
.map_err(Into::into)
self.request("trace_transaction", vec![hash]).await
}
/// Returns all receipts for that block. Must be done on a parity node.
@ -659,10 +568,8 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
&self,
block: T,
) -> Result<Vec<TransactionReceipt>, Self::Error> {
self.0
.request("parity_getBlockReceipts", vec![block.into()])
self.request("parity_getBlockReceipts", vec![block.into()])
.await
.map_err(Into::into)
}
async fn subscribe<T, R>(
@ -674,11 +581,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
R: DeserializeOwned + Send + Sync,
P: PubsubClient,
{
let id: U256 = self
.0
.request("eth_subscribe", params)
.await
.map_err(Into::into)?;
let id: U256 = self.request("eth_subscribe", params).await?;
SubscriptionStream::new(id, self).map_err(Into::into)
}
@ -687,12 +590,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
T: Into<U256> + Send + Sync,
P: PubsubClient,
{
let ok: bool = self
.0
.request("eth_unsubscribe", [id.into()])
.await
.map_err(Into::into)?;
Ok(ok)
self.request("eth_unsubscribe", [id.into()]).await
}
async fn subscribe_blocks(

View File

@ -6,8 +6,9 @@ use futures_core::stream::Stream;
use futures_timer::Delay;
use futures_util::{stream, FutureExt, StreamExt};
use pin_project::pin_project;
use serde::de::DeserializeOwned;
use serde::{de::DeserializeOwned, Serialize};
use std::{
fmt::Debug,
pin::Pin,
task::{Context, Poll},
time::Duration,
@ -75,7 +76,7 @@ where
impl<'a, P, R> Stream for FilterWatcher<'a, P, R>
where
P: JsonRpcClient,
R: Send + Sync + DeserializeOwned + 'a,
R: Serialize + Send + Sync + DeserializeOwned + Debug + 'a,
{
type Item = R;