Geth TxPool API Support (#86)

* feat: support for txpool API

* feat: add a geth spawner test helper

* fix(txpool): use TxRequest instead of new data struct

The `raw` field is no longer present in latest geth's response.

* fix(txpool): use proper api response format

ref: https://github.com/ethereum/go-ethereum/pull/21720

Also add a Geth test

* ci: install geth 1.9.23

Co-authored-by: Rohit Narurkar <rohit.narurkar@protonmail.com>
This commit is contained in:
Georgios Konstantopoulos 2020-10-24 11:13:13 +03:00 committed by GitHub
parent 938972e9cc
commit a22f1f9aa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 703 additions and 19 deletions

View File

@ -33,6 +33,16 @@ jobs:
export PATH=$HOME/bin:$PATH
solc --version
- name: Install geth
run: |
mkdir -p "$HOME/bin"
wget -q https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.9.23-8c2f2715.tar.gz
tar -xvf geth-linux-amd64-1.9.23-8c2f2715.tar.gz
mv geth-linux-amd64-1.9.23-8c2f2715/geth $HOME/bin/geth
chmod u+x "$HOME/bin/geth"
export PATH=$HOME/bin:$PATH
geth version
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
with:

View File

@ -27,3 +27,6 @@ pub use ens::NameOrAddress;
mod signature;
pub use signature::*;
mod txpool;
pub use txpool::*;

File diff suppressed because one or more lines are too long

View File

@ -1,9 +1,11 @@
use crate::{types::Address, utils::secret_key_to_address};
use crate::{
types::Address,
utils::{secret_key_to_address, unused_port},
};
use k256::{ecdsa::SigningKey, SecretKey as K256SecretKey};
use rustc_hex::FromHex;
use std::{
io::{BufRead, BufReader},
net::TcpListener,
process::{Child, Command},
time::{Duration, Instant},
};
@ -178,17 +180,3 @@ impl Ganache {
}
}
}
/// A bit of hack to find an unused TCP port.
///
/// Does not guarantee that the given port is unused after the function exists, just that it was
/// unused before the function started (i.e., it does not reserve a port).
pub fn unused_port() -> u16 {
let listener = TcpListener::bind("127.0.0.1:0")
.expect("Failed to create TCP listener to find unused port");
let local_addr = listener
.local_addr()
.expect("Failed to read TCP listener local_addr to find unused port");
local_addr.port()
}

View File

@ -0,0 +1,151 @@
use super::unused_port;
use std::{
io::{BufRead, BufReader},
process::{Child, Command},
time::{Duration, Instant},
};
/// How long we will wait for geth to indicate that it is ready.
const GETH_STARTUP_TIMEOUT_MILLIS: u64 = 10_000;
/// The exposed APIs
const API: &str = "eth,net,web3,txpool";
/// The geth command
const GETH: &str = "geth";
/// A geth instance. Will close the instance when dropped.
///
/// Construct this using [`Geth`](crate::utils::Geth)
pub struct GethInstance {
pid: Child,
port: u16,
}
impl GethInstance {
/// Returns the port of this instance
pub fn port(&self) -> u16 {
self.port
}
/// Returns the HTTP endpoint of this instance
pub fn endpoint(&self) -> String {
format!("http://localhost:{}", self.port)
}
/// Returns the Websocket endpoint of this instance
pub fn ws_endpoint(&self) -> String {
format!("ws://localhost:{}", self.port)
}
}
impl Drop for GethInstance {
fn drop(&mut self) {
let _ = self.pid.kill().expect("could not kill geth");
}
}
/// Builder for launching `geth`.
///
/// # Panics
///
/// If `spawn` is called without `geth` being available in the user's $PATH
///
/// # Example
///
/// ```no_run
/// use ethers::utils::Geth;
///
/// let port = 8545u16;
/// let url = format!("http://localhost:{}", port).to_string();
///
/// let geth = Geth::new()
/// .port(port)
/// .block_time(5000u64)
/// .spawn();
///
/// drop(geth); // this will kill the instance
/// ```
#[derive(Clone, Default)]
pub struct Geth {
port: Option<u16>,
block_time: Option<u64>,
}
impl Geth {
/// Creates an empty Geth builder.
/// The default port is 8545. The mnemonic is chosen randomly.
pub fn new() -> Self {
Self::default()
}
/// Sets the port which will be used when the `geth-cli` instance is launched.
pub fn port<T: Into<u16>>(mut self, port: T) -> Self {
self.port = Some(port.into());
self
}
/// Sets the block-time which will be used when the `geth-cli` instance is launched.
pub fn block_time<T: Into<u64>>(mut self, block_time: T) -> Self {
self.block_time = Some(block_time.into());
self
}
/// Consumes the builder and spawns `geth` with stdout redirected
/// to /dev/null.
pub fn spawn(self) -> GethInstance {
let mut cmd = Command::new(GETH);
// geth uses stderr for its logs
cmd.stderr(std::process::Stdio::piped());
let port = if let Some(port) = self.port {
port
} else {
unused_port()
};
// Open the HTTP API
cmd.arg("--http");
cmd.arg("--http.port").arg(port.to_string());
cmd.arg("--http.api").arg(API);
// Open the WS API
cmd.arg("--ws");
cmd.arg("--ws.port").arg(port.to_string());
cmd.arg("--ws.api").arg(API);
// Dev mode with custom block time
cmd.arg("--dev");
if let Some(block_time) = self.block_time {
cmd.arg("--dev.period").arg(block_time.to_string());
}
let mut child = cmd.spawn().expect("couldnt start geth");
let stdout = child
.stderr
.expect("Unable to get stderr for geth child process");
let start = Instant::now();
let mut reader = BufReader::new(stdout);
loop {
if start + Duration::from_millis(GETH_STARTUP_TIMEOUT_MILLIS) <= Instant::now() {
panic!("Timed out waiting for geth to start. Is geth installed?")
}
let mut line = String::new();
reader
.read_line(&mut line)
.expect("Failed to read line from geth process");
// geth 1.9.23 uses "server started" while 1.9.18 uses "endpoint opened"
if line.contains("HTTP endpoint opened") || line.contains("HTTP server started") {
break;
}
}
child.stderr = Some(reader.into_inner());
GethInstance { pid: child, port }
}
}

View File

@ -4,6 +4,12 @@ mod ganache;
#[cfg(not(target_arch = "wasm32"))]
pub use ganache::{Ganache, GanacheInstance};
/// Utilities for launching a go-ethereum dev-mode instance
#[cfg(not(target_arch = "wasm32"))]
mod geth;
#[cfg(not(target_arch = "wasm32"))]
pub use geth::{Geth, GethInstance};
/// Solidity compiler bindings
#[cfg(not(target_arch = "wasm32"))]
mod solc;
@ -126,7 +132,7 @@ pub fn to_checksum(addr: &Address, chain_id: Option<u8>) -> String {
let addr_hex = addr_hex.as_bytes();
addr_hex
.into_iter()
.iter()
.zip(hash)
.fold("0x".to_owned(), |mut encoded, (addr, hash)| {
encoded.push(if *hash >= 56 {
@ -138,6 +144,20 @@ pub fn to_checksum(addr: &Address, chain_id: Option<u8>) -> String {
})
}
/// A bit of hack to find an unused TCP port.
///
/// Does not guarantee that the given port is unused after the function exists, just that it was
/// unused before the function started (i.e., it does not reserve a port).
pub(crate) fn unused_port() -> u16 {
let listener = std::net::TcpListener::bind("127.0.0.1:0")
.expect("Failed to create TCP listener to find unused port");
let local_addr = listener
.local_addr()
.expect("Failed to read TCP listener local_addr to find unused port");
local_addr.port()
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -358,4 +358,16 @@ pub trait Middleware: Sync + Send + Debug {
fn pending_transaction(&self, tx_hash: TxHash) -> PendingTransaction<'_, Self::Provider> {
self.inner().pending_transaction(tx_hash)
}
async fn txpool_content(&self) -> Result<TxpoolContent, Self::Error> {
self.inner().txpool_content().await.map_err(FromErr::from)
}
async fn txpool_inspect(&self) -> Result<TxpoolInspect, Self::Error> {
self.inner().txpool_inspect().await.map_err(FromErr::from)
}
async fn txpool_status(&self) -> Result<TxpoolStatus, Self::Error> {
self.inner().txpool_status().await.map_err(FromErr::from)
}
}

View File

@ -8,7 +8,8 @@ use ethers_core::{
abi::{self, Detokenize, ParamType},
types::{
Address, Block, BlockId, BlockNumber, Bytes, Filter, Log, NameOrAddress, Selector,
Signature, Transaction, TransactionReceipt, TransactionRequest, TxHash, H256, U256, U64,
Signature, Transaction, TransactionReceipt, TransactionRequest, TxHash, TxpoolContent,
TxpoolInspect, TxpoolStatus, H256, U256, U64,
},
utils,
};
@ -507,6 +508,39 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
fn pending_transaction(&self, tx_hash: TxHash) -> PendingTransaction<'_, P> {
PendingTransaction::new(tx_hash, self).interval(self.get_interval())
}
/// Returns the details of all transactions currently pending for inclusion in the next
/// block(s), as well as the ones that are being scheduled for future execution only.
/// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_content)
async fn txpool_content(&self) -> Result<TxpoolContent, ProviderError> {
Ok(self
.0
.request("txpool_content", ())
.await
.map_err(Into::into)?)
}
/// Returns a summary of all the transactions currently pending for inclusion in the next
/// block(s), as well as the ones that are being scheduled for future execution only.
/// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_inspect)
async fn txpool_inspect(&self) -> Result<TxpoolInspect, ProviderError> {
Ok(self
.0
.request("txpool_inspect", ())
.await
.map_err(Into::into)?)
}
/// Returns the number of transactions currently pending for inclusion in the next block(s), as
/// well as the ones that are being scheduled for future execution only.
/// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_status)
async fn txpool_status(&self) -> Result<TxpoolStatus, ProviderError> {
Ok(self
.0
.request("txpool_status", ())
.await
.map_err(Into::into)?)
}
}
impl<P: JsonRpcClient> Provider<P> {

View File

@ -1,4 +1,3 @@
#![allow(unused_braces)]
use ethers::providers::{Http, Middleware, Provider};
use std::{convert::TryFrom, time::Duration};

View File

@ -0,0 +1,56 @@
use ethers::{
providers::{Http, Middleware, Provider},
types::TransactionRequest,
utils::Geth,
};
use std::convert::TryFrom;
#[tokio::test]
async fn txpool() {
let geth = Geth::new().block_time(20u64).spawn();
let provider = Provider::<Http>::try_from(geth.endpoint()).unwrap();
let account = provider.get_accounts().await.unwrap()[0].clone();
let value: u64 = 42;
let gas_price = ethers::types::U256::from_dec_str("221435145689").unwrap();
let mut tx = TransactionRequest::new()
.to(account)
.from(account)
.value(value)
.gas_price(gas_price);
// send a few transactions
let mut txs = Vec::new();
for _ in 0..10 {
let tx_hash = provider.send_transaction(tx.clone(), None).await.unwrap();
txs.push(tx_hash);
}
// we gave a 20s block time, should be plenty for us to get the txpool's content
let status = provider.txpool_status().await.unwrap();
assert_eq!(status.pending.as_u64(), 10);
assert_eq!(status.queued.as_u64(), 0);
let inspect = provider.txpool_inspect().await.unwrap();
assert!(inspect.queued.is_empty());
let summary = inspect.pending.get(&account).unwrap();
for i in 0..10 {
let tx_summary = summary.get(&i.to_string()).unwrap();
assert_eq!(tx_summary.gas_price, gas_price);
assert_eq!(tx_summary.value, value.into());
assert_eq!(tx_summary.gas, 21000.into());
assert_eq!(tx_summary.to.unwrap(), account);
}
let content = provider.txpool_content().await.unwrap();
assert!(content.queued.is_empty());
let content = content.pending.get(&account).unwrap();
// the txs get their gas and nonce auto-set upon mempool entry
tx = tx.gas(21000);
for i in 0..10 {
tx = tx.nonce(i);
let req = content.get(&i.to_string()).unwrap();
assert_eq!(req, &tx);
}
}