-
Notifications
You must be signed in to change notification settings - Fork 36
/
block_collector.rs
53 lines (49 loc) · 1.48 KB
/
block_collector.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
use anyhow::Result;
use artemis_core::types::{Collector, CollectorStream};
use async_trait::async_trait;
use ethers::{
prelude::Middleware,
providers::PubsubClient,
types::{H256, U256, U64},
};
use std::sync::Arc;
use tokio_stream::StreamExt;
/// A collector that listens for new blocks, and generates a stream of
/// [events](NewBlock) which contain the block number and hash.
pub struct BlockCollector<M> {
provider: Arc<M>,
}
/// A new block event, containing the block number and hash.
#[derive(Debug, Clone)]
pub struct NewBlock {
pub hash: H256,
pub number: U64,
pub timestamp: U256,
}
impl<M> BlockCollector<M> {
pub fn new(provider: Arc<M>) -> Self {
Self { provider }
}
}
/// Implementation of the [Collector](Collector) trait for the [BlockCollector](BlockCollector).
/// This implementation uses the [PubsubClient](PubsubClient) to subscribe to new blocks.
#[async_trait]
impl<M> Collector<NewBlock> for BlockCollector<M>
where
M: Middleware,
M::Provider: PubsubClient,
M::Error: 'static,
{
async fn get_event_stream(&self) -> Result<CollectorStream<'_, NewBlock>> {
let stream = self.provider.subscribe_blocks().await?;
let stream = stream.filter_map(|block| match block.hash {
Some(hash) => block.number.map(|number| NewBlock {
hash,
number,
timestamp: block.timestamp,
}),
None => None,
});
Ok(Box::pin(stream))
}
}