diff --git a/Cargo.lock b/Cargo.lock index db209756..9232ff07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -875,6 +875,8 @@ dependencies = [ "serde-aux", "thiserror", "tokio", + "tracing", + "tracing-futures", "url", ] @@ -902,6 +904,8 @@ dependencies = [ "thiserror", "tokio", "tokio-native-tls", + "tracing", + "tracing-futures", "url", ] @@ -2555,9 +2559,21 @@ dependencies = [ "cfg-if 1.0.0", "log", "pin-project-lite 0.2.0", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e0ccfc3378da0cce270c946b676a376943f5cd16aeba64568e7939806f4ada" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.17" diff --git a/ethers-core/src/types/txpool.rs b/ethers-core/src/types/txpool.rs index 38d439b0..3c36fa73 100644 --- a/ethers-core/src/types/txpool.rs +++ b/ethers-core/src/types/txpool.rs @@ -7,7 +7,7 @@ use serde::{ use std::{collections::BTreeMap, fmt, str::FromStr}; /// Transaction summary as found in the Txpool Inspection property. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Serialize)] pub struct TxpoolInspectSummary { /// Recipient (None when contract creation) pub to: Option
, @@ -122,7 +122,7 @@ pub struct TxpoolContent { /// /// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_inspect) for more details /// -#[derive(Debug, Default, Clone, PartialEq, Deserialize)] +#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] pub struct TxpoolInspect { /// pending tx pub pending: BTreeMap>, diff --git a/ethers-middleware/Cargo.toml b/ethers-middleware/Cargo.toml index 001dda01..b455dc4b 100644 --- a/ethers-middleware/Cargo.toml +++ b/ethers-middleware/Cargo.toml @@ -32,6 +32,8 @@ url = { version = "2.1.1", default-features = false } # optional for runtime tokio = { version = "0.2.22", optional = true } async-std = { version = "1.6.5", optional = true } +tracing = "0.1.22" +tracing-futures = "0.2.4" [dev-dependencies] ethers = { version = "0.1.3", path = "../ethers" } diff --git a/ethers-middleware/src/gas_escalator/mod.rs b/ethers-middleware/src/gas_escalator/mod.rs index 97bbb6a8..d138f0a9 100644 --- a/ethers-middleware/src/gas_escalator/mod.rs +++ b/ethers-middleware/src/gas_escalator/mod.rs @@ -12,6 +12,9 @@ use std::sync::Arc; use std::{pin::Pin, time::Instant}; use thiserror::Error; +#[cfg(any(feature = "async-std", feature = "tokio"))] +use tracing_futures::Instrument; + #[cfg(all(not(feature = "tokio"), feature = "async-std"))] use async_std::task::spawn; #[cfg(all(feature = "tokio", not(feature = "async-std")))] @@ -136,7 +139,11 @@ where { let this2 = this.clone(); spawn(async move { - this2.escalate().await.unwrap(); + this2 + .escalate() + .instrument(tracing::trace_span!("gas-escalation")) + .await + .unwrap(); }); } @@ -170,15 +177,16 @@ where txs.pop().expect("should have element in vector"); let receipt = self.get_transaction_receipt(tx_hash).await?; + tracing::trace!(tx_hash = ?tx_hash, "checking if exists"); if receipt.is_none() { + let old_gas_price = replacement_tx.gas_price.expect("gas price must be set"); // Get the new gas price based on how much time passed since the // tx was last broadcast - let new_gas_price = self.escalator.get_gas_price( - replacement_tx.gas_price.expect("gas price must be set"), - now.duration_since(time).as_secs(), - ); + let new_gas_price = self + .escalator + .get_gas_price(old_gas_price, now.duration_since(time).as_secs()); - let new_txhash = if Some(new_gas_price) != replacement_tx.gas_price { + let new_txhash = if new_gas_price != old_gas_price { // bump the gas price replacement_tx.gas_price = Some(new_gas_price); @@ -188,7 +196,17 @@ where .send_transaction(replacement_tx.clone(), priority) .await { - Ok(tx_hash) => *tx_hash, + Ok(new_tx_hash) => { + let new_tx_hash = *new_tx_hash; + tracing::trace!( + old_tx_hash = ?tx_hash, + new_tx_hash = ?new_tx_hash, + old_gas_price = ?old_gas_price, + new_gas_price = ?new_gas_price, + "escalated" + ); + new_tx_hash + } Err(err) => { if err.to_string().contains("nonce too low") { // ignore "nonce too low" errors because they diff --git a/ethers-providers/Cargo.toml b/ethers-providers/Cargo.toml index 965e98bf..4b979370 100644 --- a/ethers-providers/Cargo.toml +++ b/ethers-providers/Cargo.toml @@ -42,6 +42,8 @@ async-tls = { version = "0.7.0", optional = true } # needed for parsing while deserialization in gas oracles serde-aux = "0.6.1" auto_impl = "0.4.1" +tracing = "0.1.22" +tracing-futures = "0.2.4" [dev-dependencies] ethers = { version = "0.1.3", path = "../ethers" } diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index f00895d9..80bdecd2 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -140,7 +140,7 @@ pub trait JsonRpcClient: Debug + Send + Sync { async fn request(&self, method: &str, params: T) -> Result where T: Debug + Serialize + Send + Sync, - R: DeserializeOwned; + R: Serialize + DeserializeOwned; } use ethers_core::types::*; @@ -333,7 +333,7 @@ pub trait Middleware: Sync + Send + Debug { async fn get_filter_changes(&self, id: T) -> Result, Self::Error> where T: Into + Send + Sync, - R: DeserializeOwned + Send + Sync, + R: Serialize + DeserializeOwned + Send + Sync + Debug, { self.inner() .get_filter_changes(id) diff --git a/ethers-providers/src/pending_transaction.rs b/ethers-providers/src/pending_transaction.rs index 2f4482db..b5b292db 100644 --- a/ethers-providers/src/pending_transaction.rs +++ b/ethers-providers/src/pending_transaction.rs @@ -121,11 +121,12 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> { // if the transaction has at least K confirmations, return the receipt // (subtract 1 since the tx already has 1 conf when it's mined) - if current_block >= inclusion_block + *this.confirmations - 1 { + if current_block > inclusion_block + *this.confirmations - 1 { let receipt = *receipt.clone(); *this.state = PendingTxState::Completed; return Poll::Ready(Ok(receipt)); } else { + tracing::trace!(tx_hash = ?this.tx_hash, "confirmations {}/{}", current_block - inclusion_block + 1, this.confirmations); *this.state = PendingTxState::PausedGettingBlockNumber(receipt.clone()); ctx.waker().wake_by_ref(); } diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index 635edafb..692c4d22 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -22,6 +22,8 @@ use thiserror::Error; use url::{ParseError, Url}; use std::{convert::TryFrom, fmt::Debug, time::Duration}; +use tracing::trace; +use tracing_futures::Instrument; /// An abstract provider for interacting with the [Ethereum JSON RPC /// API](https://github.com/ethereum/wiki/wiki/JSON-RPC). Must be instantiated @@ -70,6 +72,9 @@ pub enum ProviderError { /// An error during ENS name resolution #[error("ens name not found: {0}")] EnsError(String), + + #[error(transparent)] + SerdeJson(#[from] serde_json::Error), } /// Types of filters supported by the JSON-RPC. @@ -97,7 +102,26 @@ impl Provider

{ self } - async fn get_block_gen( + async fn request(&self, method: &str, params: T) -> Result + where + T: Debug + Serialize + Send + Sync, + R: Serialize + DeserializeOwned + Debug, + { + let span = + tracing::trace_span!("rpc", method = method, params = ?serde_json::to_string(¶ms)?); + // https://docs.rs/tracing/0.1.22/tracing/span/struct.Span.html#in-asynchronous-code + let res = async move { + trace!("tx"); + let res: R = self.0.request(method, params).await.map_err(Into::into)?; + trace!(rx = ?serde_json::to_string(&res)?); + Ok::<_, ProviderError>(res) + } + .instrument(span) + .await?; + Ok(res) + } + + async fn get_block_gen( &self, id: BlockId, include_txs: bool, @@ -107,17 +131,13 @@ impl Provider

{ Ok(match id { BlockId::Hash(hash) => { let hash = utils::serialize(&hash); - self.0 - .request("eth_getBlockByHash", [hash, include_txs]) - .await - .map_err(Into::into)? + self.request("eth_getBlockByHash", [hash, include_txs]) + .await? } BlockId::Number(num) => { let num = utils::serialize(&num); - self.0 - .request("eth_getBlockByNumber", [num, include_txs]) - .await - .map_err(Into::into)? + self.request("eth_getBlockByNumber", [num, include_txs]) + .await? } }) } @@ -143,11 +163,7 @@ impl Middleware for Provider

{ /// Gets the latest block number via the `eth_BlockNumber` API async fn get_block_number(&self) -> Result { - Ok(self - .0 - .request("eth_blockNumber", ()) - .await - .map_err(Into::into)?) + self.request("eth_blockNumber", ()).await } /// Gets the block at `block_hash_or_number` (transaction hashes only) @@ -155,9 +171,7 @@ impl Middleware for Provider

{ &self, block_hash_or_number: T, ) -> Result>, Self::Error> { - Ok(self - .get_block_gen(block_hash_or_number.into(), false) - .await?) + self.get_block_gen(block_hash_or_number.into(), false).await } /// Gets the block at `block_hash_or_number` (full transactions included) @@ -165,9 +179,7 @@ impl Middleware for Provider

{ &self, block_hash_or_number: T, ) -> Result>, ProviderError> { - Ok(self - .get_block_gen(block_hash_or_number.into(), true) - .await?) + self.get_block_gen(block_hash_or_number.into(), true).await } /// Gets the transaction with `transaction_hash` @@ -176,11 +188,7 @@ impl Middleware for Provider

{ transaction_hash: T, ) -> Result, ProviderError> { let hash = transaction_hash.into(); - Ok(self - .0 - .request("eth_getTransactionByHash", [hash]) - .await - .map_err(Into::into)?) + self.request("eth_getTransactionByHash", [hash]).await } /// Gets the transaction receipt with `transaction_hash` @@ -189,29 +197,17 @@ impl Middleware for Provider

{ transaction_hash: T, ) -> Result, ProviderError> { let hash = transaction_hash.into(); - Ok(self - .0 - .request("eth_getTransactionReceipt", [hash]) - .await - .map_err(Into::into)?) + self.request("eth_getTransactionReceipt", [hash]).await } /// Gets the current gas price as estimated by the node async fn get_gas_price(&self) -> Result { - Ok(self - .0 - .request("eth_gasPrice", ()) - .await - .map_err(Into::into)?) + self.request("eth_gasPrice", ()).await } /// Gets the accounts on the node async fn get_accounts(&self) -> Result, ProviderError> { - Ok(self - .0 - .request("eth_accounts", ()) - .await - .map_err(Into::into)?) + self.request("eth_accounts", ()).await } /// Returns the nonce of the address @@ -227,11 +223,7 @@ impl Middleware for Provider

{ let from = utils::serialize(&from); let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest)); - Ok(self - .0 - .request("eth_getTransactionCount", [from, block]) - .await - .map_err(Into::into)?) + self.request("eth_getTransactionCount", [from, block]).await } /// Returns the account's balance @@ -247,21 +239,13 @@ impl Middleware for Provider

{ let from = utils::serialize(&from); let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest)); - Ok(self - .0 - .request("eth_getBalance", [from, block]) - .await - .map_err(Into::into)?) + self.request("eth_getBalance", [from, block]).await } /// Returns the currently configured chain id, a value used in replay-protected /// transaction signing as introduced by EIP-155. async fn get_chainid(&self) -> Result { - Ok(self - .0 - .request("eth_chainId", ()) - .await - .map_err(Into::into)?) + self.request("eth_chainId", ()).await } ////// Contract Execution @@ -277,24 +261,14 @@ impl Middleware for Provider

{ ) -> Result { let tx = utils::serialize(tx); let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest)); - Ok(self - .0 - .request("eth_call", [tx, block]) - .await - .map_err(Into::into)?) + self.request("eth_call", [tx, block]).await } /// Sends a transaction to a single Ethereum node and return the estimated amount of gas required (as a U256) to send it /// This is free, but only an estimate. Providing too little gas will result in a transaction being rejected /// (while still consuming all provided gas). async fn estimate_gas(&self, tx: &TransactionRequest) -> Result { - let tx = utils::serialize(tx); - - Ok(self - .0 - .request("eth_estimateGas", [tx]) - .await - .map_err(Into::into)?) + self.request("eth_estimateGas", [tx]).await } /// Sends the transaction to the entire Ethereum network and returns the transaction's hash @@ -322,11 +296,7 @@ impl Middleware for Provider

{ } } - let tx_hash = self - .0 - .request("eth_sendTransaction", [tx]) - .await - .map_err(Into::into)?; + let tx_hash = self.request("eth_sendTransaction", [tx]).await?; Ok(PendingTransaction::new(tx_hash, self).interval(self.get_interval())) } @@ -338,11 +308,7 @@ impl Middleware for Provider

{ tx: &Transaction, ) -> Result, ProviderError> { let rlp = utils::serialize(&tx.rlp()); - let tx_hash = self - .0 - .request("eth_sendRawTransaction", [rlp]) - .await - .map_err(Into::into)?; + let tx_hash = self.request("eth_sendRawTransaction", [rlp]).await?; Ok(PendingTransaction::new(tx_hash, self).interval(self.get_interval())) } @@ -354,22 +320,14 @@ impl Middleware for Provider

{ ) -> Result { let data = utils::serialize(&data.into()); let from = utils::serialize(from); - Ok(self - .0 - .request("eth_sign", [from, data]) - .await - .map_err(Into::into)?) + self.request("eth_sign", [from, data]).await } ////// Contract state /// Returns an array (possibly empty) of logs that match the filter async fn get_logs(&self, filter: &Filter) -> Result, ProviderError> { - Ok(self - .0 - .request("eth_getLogs", [filter]) - .await - .map_err(Into::into)?) + self.request("eth_getLogs", [filter]).await } /// Streams matching filter logs @@ -407,7 +365,7 @@ impl Middleware for Provider

{ FilterKind::Logs(filter) => ("eth_newFilter", vec![utils::serialize(&filter)]), }; - Ok(self.0.request(method, args).await.map_err(Into::into)?) + self.request(method, args).await } /// Uninstalls a filter @@ -416,11 +374,7 @@ impl Middleware for Provider

{ id: T, ) -> Result { let id = utils::serialize(&id.into()); - Ok(self - .0 - .request("eth_uninstallFilter", [id]) - .await - .map_err(Into::into)?) + self.request("eth_uninstallFilter", [id]).await } /// Polling method for a filter, which returns an array of logs which occurred since last poll. @@ -439,14 +393,10 @@ impl Middleware for Provider

{ async fn get_filter_changes(&self, id: T) -> Result, ProviderError> where T: Into + Send + Sync, - R: DeserializeOwned + Send + Sync, + R: Serialize + DeserializeOwned + Send + Sync + Debug, { let id = utils::serialize(&id.into()); - Ok(self - .0 - .request("eth_getFilterChanges", [id]) - .await - .map_err(Into::into)?) + self.request("eth_getFilterChanges", [id]).await } /// Get the storage of an address for a particular slot location @@ -464,11 +414,8 @@ impl Middleware for Provider

{ let from = utils::serialize(&from); let location = utils::serialize(&location); let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest)); - Ok(self - .0 - .request("eth_getStorageAt", [from, location, block]) + self.request("eth_getStorageAt", [from, location, block]) .await - .map_err(Into::into)?) } /// Returns the deployed code at a given address @@ -484,11 +431,7 @@ impl Middleware for Provider

{ let at = utils::serialize(&at); let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest)); - Ok(self - .0 - .request("eth_getCode", [at, block]) - .await - .map_err(Into::into)?) + self.request("eth_getCode", [at, block]).await } ////// Ethereum Naming Service @@ -524,33 +467,21 @@ impl Middleware for Provider

{ /// block(s), as well as the ones that are being scheduled for future execution only. /// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_content) async fn txpool_content(&self) -> Result { - Ok(self - .0 - .request("txpool_content", ()) - .await - .map_err(Into::into)?) + self.request("txpool_content", ()).await } /// Returns a summary of all the transactions currently pending for inclusion in the next /// block(s), as well as the ones that are being scheduled for future execution only. /// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_inspect) async fn txpool_inspect(&self) -> Result { - Ok(self - .0 - .request("txpool_inspect", ()) - .await - .map_err(Into::into)?) + self.request("txpool_inspect", ()).await } /// Returns the number of transactions currently pending for inclusion in the next block(s), as /// well as the ones that are being scheduled for future execution only. /// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_status) async fn txpool_status(&self) -> Result { - Ok(self - .0 - .request("txpool_status", ()) - .await - .map_err(Into::into)?) + self.request("txpool_status", ()).await } /// Executes the given call and returns a number of possible traces for it @@ -563,10 +494,7 @@ impl Middleware for Provider

{ let req = utils::serialize(&req); let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest)); let trace_type = utils::serialize(&trace_type); - self.0 - .request("trace_call", [req, trace_type, block]) - .await - .map_err(Into::into) + self.request("trace_call", [req, trace_type, block]).await } /// Traces a call to `eth_sendRawTransaction` without making the call, returning the traces @@ -577,10 +505,8 @@ impl Middleware for Provider

{ ) -> Result { let data = utils::serialize(&data); let trace_type = utils::serialize(&trace_type); - self.0 - .request("trace_rawTransaction", [data, trace_type]) + self.request("trace_rawTransaction", [data, trace_type]) .await - .map_err(Into::into) } /// Replays a transaction, returning the traces @@ -591,11 +517,8 @@ impl Middleware for Provider

{ ) -> Result { let hash = utils::serialize(&hash); let trace_type = utils::serialize(&trace_type); - Ok(self - .0 - .request("trace_replayTransaction", [hash, trace_type]) + self.request("trace_replayTransaction", [hash, trace_type]) .await - .map_err(Into::into)?) } /// Replays all transactions in a block returning the requested traces for each transaction @@ -606,28 +529,20 @@ impl Middleware for Provider

{ ) -> Result, ProviderError> { let block = utils::serialize(&block); let trace_type = utils::serialize(&trace_type); - self.0 - .request("trace_replayBlockTransactions", [block, trace_type]) + self.request("trace_replayBlockTransactions", [block, trace_type]) .await - .map_err(Into::into) } /// Returns traces created at given block async fn trace_block(&self, block: BlockNumber) -> Result, ProviderError> { let block = utils::serialize(&block); - self.0 - .request("trace_block", [block]) - .await - .map_err(Into::into) + self.request("trace_block", [block]).await } /// Return traces matching the given filter async fn trace_filter(&self, filter: TraceFilter) -> Result, ProviderError> { let filter = utils::serialize(&filter); - self.0 - .request("trace_filter", vec![filter]) - .await - .map_err(Into::into) + self.request("trace_filter", vec![filter]).await } /// Returns trace at the given position @@ -639,19 +554,13 @@ impl Middleware for Provider

{ let hash = utils::serialize(&hash); let index: Vec = index.into_iter().map(|i| i.into()).collect(); let index = utils::serialize(&index); - self.0 - .request("trace_get", vec![hash, index]) - .await - .map_err(Into::into) + self.request("trace_get", vec![hash, index]).await } /// Returns all traces of a given transaction async fn trace_transaction(&self, hash: H256) -> Result, ProviderError> { let hash = utils::serialize(&hash); - self.0 - .request("trace_transaction", vec![hash]) - .await - .map_err(Into::into) + self.request("trace_transaction", vec![hash]).await } /// Returns all receipts for that block. Must be done on a parity node. @@ -659,10 +568,8 @@ impl Middleware for Provider

{ &self, block: T, ) -> Result, Self::Error> { - self.0 - .request("parity_getBlockReceipts", vec![block.into()]) + self.request("parity_getBlockReceipts", vec![block.into()]) .await - .map_err(Into::into) } async fn subscribe( @@ -674,11 +581,7 @@ impl Middleware for Provider

{ R: DeserializeOwned + Send + Sync, P: PubsubClient, { - let id: U256 = self - .0 - .request("eth_subscribe", params) - .await - .map_err(Into::into)?; + let id: U256 = self.request("eth_subscribe", params).await?; SubscriptionStream::new(id, self).map_err(Into::into) } @@ -687,12 +590,7 @@ impl Middleware for Provider

{ T: Into + Send + Sync, P: PubsubClient, { - let ok: bool = self - .0 - .request("eth_unsubscribe", [id.into()]) - .await - .map_err(Into::into)?; - Ok(ok) + self.request("eth_unsubscribe", [id.into()]).await } async fn subscribe_blocks( diff --git a/ethers-providers/src/stream.rs b/ethers-providers/src/stream.rs index 329f16a3..69cdb438 100644 --- a/ethers-providers/src/stream.rs +++ b/ethers-providers/src/stream.rs @@ -6,8 +6,9 @@ use futures_core::stream::Stream; use futures_timer::Delay; use futures_util::{stream, FutureExt, StreamExt}; use pin_project::pin_project; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Serialize}; use std::{ + fmt::Debug, pin::Pin, task::{Context, Poll}, time::Duration, @@ -75,7 +76,7 @@ where impl<'a, P, R> Stream for FilterWatcher<'a, P, R> where P: JsonRpcClient, - R: Send + Sync + DeserializeOwned + 'a, + R: Serialize + Send + Sync + DeserializeOwned + Debug + 'a, { type Item = R;