Skip to content

Commit 0bbf11c

Browse files
authored
Merge pull request #28 from sergerad/main
Support multiple monitors in rbtr with async actor
2 parents 5efdc12 + 77a5f0e commit 0bbf11c

File tree

11 files changed

+148
-61
lines changed

11 files changed

+148
-61
lines changed

Cargo.toml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,20 @@ members = [
66
"crates/tokens",
77
"crates/uniswapv3pool",
88
"crates/lib",
9-
"bin/rbtr", "tests",
9+
"bin/rbtr",
10+
"tests",
1011
]
1112

1213
[workspace.dependencies]
13-
uniswap-v3-sdk = { version = "2.2.0", features = ["extensions", "std"] }
14-
uniswap-sdk-core = "3.0.0"
14+
uniswap-v3-sdk = { version = "2.6.0", features = ["extensions", "std"] }
15+
uniswap-sdk-core = "3.2.0"
1516
eyre = "0.6.12"
16-
alloy = { version = "0.5", features = ["contract"] }
17+
alloy = { version = "0.6", features = ["contract"] }
1718
alloy-primitives = "0.8"
1819
alloy-sol-types = "0.8"
19-
tokio = { version = "1.40", features = ["full"] }
20+
tokio = { version = "1.41.1", features = ["full"] }
2021
toml = "0.8.19"
21-
serde = { version = "1.0.214", features = ["derive"] }
22+
serde = { version = "1.0.215", features = ["derive"] }
2223

2324
tokens = { path = "./crates/tokens" }
2425
uniswapv3pool = { path = "./crates/uniswapv3pool" }

bin/rbtr/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ serde = { workspace = true }
99
eyre = { workspace = true }
1010
alloy = { workspace = true }
1111
tokio = { workspace = true }
12+
uniswap-v3-sdk = { workspace = true }
1213
tokio-util = "0.7.12"
1314
actix = "0.13.5"
1415
ctrlc = "3.1.0"
16+
uniswapv3pool.workspace = true
17+
duration-string = { version = "0.4.0", features = ["serde"] }
1518

1619
lib = { path = "../../crates/lib" }

bin/rbtr/Config.toml

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,13 @@
1-
url_one = "https://rpc.immutable.com"
2-
url_two = "https://rpc.immutable.com"
3-
token_one = "0xD67cc11151dBccCC424A16F8963ece3D0539BD61"
4-
token_two = "0xD67cc11151dBccCC424A16F8963ece3D0539BD61"
1+
tick_rate = "5s"
2+
3+
[[monitors]]
4+
rpc_url = "https://rpc.immutable.com"
5+
factory = "0x56c2162254b0E4417288786eE402c2B41d4e181e"
6+
token_one = "0x52A6c53869Ce09a731CD772f245b97A4401d3348"
7+
token_two = "0x3A0C2Ba54D6CBd3121F01b96dFd20e99D1696C9D"
8+
9+
[[monitors]]
10+
rpc_url = "https://rpc.immutable.com"
11+
factory = "0x56c2162254b0E4417288786eE402c2B41d4e181e"
12+
token_one = "0x52A6c53869Ce09a731CD772f245b97A4401d3348"
13+
token_two = "0x3A0C2Ba54D6CBd3121F01b96dFd20e99D1696C9D"

bin/rbtr/src/actors/messages.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use actix::prelude::*;
2+
use uniswapv3pool::UniswapV3PoolSdk;
23

34
/// Control is a message to control the operation of an Actor.
45
#[derive(Debug, Message)]
@@ -12,8 +13,8 @@ pub enum Control {
1213
#[derive(Debug, Message)]
1314
#[rtype(result = "()")]
1415
#[allow(dead_code)]
15-
pub struct PriceDiff {
16-
pub diff: f64,
16+
pub struct PoolUpdate {
17+
pub pool: UniswapV3PoolSdk,
1718
}
1819

1920
/// Message to subscribe to some other message.

bin/rbtr/src/actors/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
mod messages;
2-
pub use messages::{Control, PriceDiff, Subscribe};
2+
pub use messages::{Control, PoolUpdate, Subscribe};
33

44
mod monitor;
55
pub use monitor::Monitor;

bin/rbtr/src/actors/monitor.rs

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,35 @@
1-
use crate::{PriceDiff, Subscribe};
1+
use crate::{PoolUpdate, Subscribe};
22
use actix::prelude::*;
3+
use lib::prelude::{Address, Provider, RootProvider};
4+
use uniswap_v3_sdk::prelude::FeeAmount;
5+
use uniswapv3pool::UniswapV3PoolSdk;
36

47
use super::Control;
58

69
/// Monitor is the actor responsible for monitoring on-chain state
710
/// and notifying subscribers of changes.
11+
#[derive(Clone)]
812
pub struct Monitor {
9-
price_diff_subs: Vec<Recipient<PriceDiff>>,
13+
price_diff_subs: Vec<Recipient<PoolUpdate>>,
14+
provider: RootProvider,
15+
factory: Address,
16+
token_0: Address,
17+
token_1: Address,
1018
}
1119

1220
impl Monitor {
13-
pub fn new() -> Self {
21+
pub fn new(
22+
provider: RootProvider,
23+
factory: Address,
24+
token_0: Address,
25+
token_1: Address,
26+
) -> Self {
1427
Monitor {
1528
price_diff_subs: vec![],
29+
provider,
30+
factory,
31+
token_0,
32+
token_1,
1633
}
1734
}
1835
}
@@ -21,10 +38,10 @@ impl Actor for Monitor {
2138
type Context = Context<Self>;
2239
}
2340

24-
impl Handler<Subscribe<PriceDiff>> for Monitor {
41+
impl Handler<Subscribe<PoolUpdate>> for Monitor {
2542
type Result = ();
2643

27-
fn handle(&mut self, msg: Subscribe<PriceDiff>, _: &mut Self::Context) {
44+
fn handle(&mut self, msg: Subscribe<PoolUpdate>, _: &mut Self::Context) {
2845
self.price_diff_subs.push(msg.0);
2946
}
3047
}
@@ -39,10 +56,25 @@ impl Handler<Control> for Monitor {
3956
ctx.stop();
4057
}
4158
Control::Tick => {
42-
for sub in &self.price_diff_subs {
43-
// TODO: handle error
44-
sub.do_send(PriceDiff { diff: 0.1 });
59+
let self_clone = self.clone();
60+
async move {
61+
let pool = UniswapV3PoolSdk::from_pool_key(
62+
self_clone.provider.get_chain_id().await.unwrap(),
63+
self_clone.factory,
64+
self_clone.token_0,
65+
self_clone.token_1,
66+
FeeAmount::LOW, // TODO: parameterize
67+
self_clone.provider,
68+
None,
69+
)
70+
.await
71+
.unwrap();
72+
for sub in self_clone.price_diff_subs {
73+
sub.do_send(PoolUpdate { pool: pool.clone() });
74+
}
4575
}
76+
.into_actor(self)
77+
.wait(ctx);
4678
}
4779
}
4880
}

bin/rbtr/src/actors/resolver.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use actix::prelude::*;
22

3-
use super::{Control, PriceDiff};
3+
use super::{Control, PoolUpdate};
44

55
/// Resolver is the actor responsible for handling all data and events
66
/// derived from on-chain state that other actors may provide.
@@ -16,10 +16,10 @@ impl Actor for Resolver {
1616
type Context = Context<Self>;
1717
}
1818

19-
impl Handler<PriceDiff> for Resolver {
19+
impl Handler<PoolUpdate> for Resolver {
2020
type Result = ();
2121

22-
fn handle(&mut self, msg: PriceDiff, _: &mut Self::Context) -> Self::Result {
22+
fn handle(&mut self, msg: PoolUpdate, _: &mut Self::Context) -> Self::Result {
2323
println!("Price diff: {msg:?}");
2424
}
2525
}

bin/rbtr/src/config.rs

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,58 @@
1+
use duration_string::DurationString;
12
use eyre::{Context, Result};
23
use lib::prelude::*;
34
use serde::Deserialize;
4-
use std::{fs, path::Path};
5+
use std::{fs, path::Path, time::Duration};
56

67
#[derive(Deserialize)]
78
struct RawConfig {
8-
url_one: String,
9-
url_two: String,
9+
tick_rate: DurationString,
10+
monitors: Vec<RawMonitorConfig>,
11+
}
12+
13+
#[derive(Deserialize)]
14+
struct RawMonitorConfig {
15+
rpc_url: String,
16+
factory: Address,
1017
token_one: Address,
1118
token_two: Address,
1219
}
1320

14-
// TODO: remove allow when this is used
15-
#[allow(dead_code)]
1621
#[derive(Debug)]
1722
pub struct Config {
18-
provider_one: RootProvider,
19-
provider_two: RootProvider,
20-
token_address_one: Address,
21-
token_address_two: Address,
23+
pub tick_rate: Duration,
24+
pub monitors: Vec<MonitorConfig>,
25+
}
26+
27+
impl From<RawConfig> for Config {
28+
fn from(raw: RawConfig) -> Self {
29+
let monitors = raw.monitors.into_iter().map(MonitorConfig::from).collect();
30+
Self {
31+
tick_rate: raw.tick_rate.into(),
32+
monitors,
33+
}
34+
}
35+
}
36+
37+
#[derive(Debug)]
38+
pub struct MonitorConfig {
39+
pub provider: RootProvider,
40+
pub factory: Address,
41+
pub token_one: Address,
42+
pub token_two: Address,
43+
}
44+
45+
impl From<RawMonitorConfig> for MonitorConfig {
46+
fn from(raw: RawMonitorConfig) -> Self {
47+
let rpc_url = Url::parse(&raw.rpc_url).unwrap();
48+
let provider = ProviderBuilder::new().on_http(rpc_url);
49+
Self {
50+
provider,
51+
factory: raw.factory,
52+
token_one: raw.token_one,
53+
token_two: raw.token_two,
54+
}
55+
}
2256
}
2357

2458
impl Config {
@@ -29,19 +63,6 @@ impl Config {
2963
let raw =
3064
toml::from_str::<RawConfig>(&file_str).wrap_err("failed to parse config from toml")?;
3165

32-
// Parse URLs and create the providers.
33-
let url_one =
34-
Url::parse(&raw.url_one).wrap_err(format!("failed to parse url: {}", raw.url_one))?;
35-
let url_two =
36-
Url::parse(&raw.url_two).wrap_err(format!("failed to parse url: {}", raw.url_two))?;
37-
let provider_one = ProviderBuilder::new().on_http(url_one);
38-
let provider_two = ProviderBuilder::new().on_http(url_two);
39-
40-
Ok(Self {
41-
provider_one,
42-
provider_two,
43-
token_address_one: raw.token_one,
44-
token_address_two: raw.token_two,
45-
})
66+
Ok(raw.into())
4667
}
4768
}

bin/rbtr/src/main.rs

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
mod config;
2-
use std::time::Duration;
32

43
use actix::prelude::*;
54
use config::Config;
@@ -28,23 +27,33 @@ fn main() -> Result<()> {
2827

2928
// Block the main thread and run the actor system.
3029
system.block_on(async {
31-
// Set up actors and their subscriptions
30+
// Set up actors and their subscriptions.
3231
let resolver = Resolver::new().start();
33-
let monitor = Monitor::new().start();
34-
let subscribe = Subscribe::<PriceDiff>(resolver.clone().recipient());
35-
monitor
36-
.send(subscribe)
37-
.await
38-
.expect("Failed to set initial subscription to Monitor");
32+
33+
// Setup up the monitors.
34+
let mut monitors = vec![];
35+
for monitor_config in config.monitors.iter() {
36+
let monitor = Monitor::new(
37+
monitor_config.provider.clone(),
38+
monitor_config.factory,
39+
monitor_config.token_one,
40+
monitor_config.token_two,
41+
)
42+
.start();
43+
subscribe_to_monitor(monitor.clone(), resolver.clone()).await;
44+
monitors.push(monitor);
45+
}
3946

4047
// Drive the actors.
41-
let mut interval = tokio::time::interval(Duration::from_secs(5));
48+
let mut interval = tokio::time::interval(config.tick_rate);
4249
loop {
4350
select! {
4451
// Shutdown.
4552
_ = cancel.cancelled() => {
46-
if let Err(e) = monitor.send(Control::Stop).await {
47-
eprintln!("Failed to send Stop to Monitor: {e}");
53+
for monitor in monitors.into_iter() {
54+
if let Err(e) = monitor.send(Control::Stop).await {
55+
eprintln!("Failed to send Stop to Monitor: {e}");
56+
}
4857
}
4958
if let Err(e) = resolver.send(Control::Stop).await {
5059
eprintln!("Failed to send Stop to Resolver: {e}");
@@ -54,8 +63,10 @@ fn main() -> Result<()> {
5463
}
5564
// Tick.
5665
_ = interval.tick() => {
57-
if let Err(e) = monitor.send(Control::Tick).await {
58-
eprintln!("Failed to send Tick to Monitor: {e}");
66+
for monitor in monitors.iter() {
67+
if let Err(e) = monitor.send(Control::Tick).await {
68+
eprintln!("Failed to send Tick to Monitor: {e}");
69+
}
5970
}
6071
}
6172
}
@@ -67,3 +78,11 @@ fn main() -> Result<()> {
6778
println!("Shutting down");
6879
Ok(())
6980
}
81+
82+
async fn subscribe_to_monitor(monitor: Addr<Monitor>, resolver: Addr<Resolver>) {
83+
let subscribe = Subscribe::<PoolUpdate>(resolver.recipient());
84+
monitor
85+
.send(subscribe)
86+
.await
87+
.expect("Failed to set subscription to Monitor");
88+
}

crates/uniswapv3pool/src/univ3sdk.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use eyre::Result;
1414

1515
use lib::prelude::*;
1616

17+
#[derive(Debug, Clone)]
1718
pub struct UniswapV3PoolSdk {
1819
pub pool: Pool,
1920
pub tick_data_provider: EphemeralTickDataProvider,

0 commit comments

Comments
 (0)