Skip to content

Commit 9cc8599

Browse files
authored
Merge pull request #31 from loadnetwork/helios
chore: refactors
2 parents d972ea9 + b744ce5 commit 9cc8599

File tree

6 files changed

+263
-241
lines changed

6 files changed

+263
-241
lines changed

native/helios_nif/src/core/light_client.rs

Lines changed: 8 additions & 239 deletions
Original file line numberDiff line numberDiff line change
@@ -1,148 +1,18 @@
1-
use alloy::{
2-
eips::BlockId,
3-
primitives::{Address, B256, Bytes, U256},
4-
rpc::types::{Filter, Log},
5-
};
1+
use alloy::eips::BlockId;
62
use anyhow::Error;
7-
use helios::ethereum::{EthereumClientBuilder, config::networks::Network};
8-
use hyper::{
9-
Body, Method, Request, Response, Server, StatusCode,
10-
service::{make_service_fn, service_fn},
11-
};
12-
use serde::{Deserialize, Serialize};
3+
use hyper::{Body, Method, Request, Response, StatusCode};
134
use serde_json::{Value, json};
145
use std::convert::Infallible;
15-
use std::net::SocketAddr;
16-
use std::path::PathBuf;
17-
use std::str::FromStr;
186
use std::sync::{Arc, Mutex};
19-
use tokio::runtime::Runtime;
20-
use tokio::sync::oneshot;
217
use tracing::{error, info};
22-
use tracing_subscriber::FmtSubscriber;
23-
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
24-
25-
#[derive(Deserialize, Debug)]
26-
struct JsonRpcRequest {
27-
jsonrpc: String,
28-
method: String,
29-
params: Vec<Value>,
30-
id: Value,
31-
}
32-
33-
#[derive(Serialize)]
34-
struct JsonRpcResponse {
35-
jsonrpc: String,
36-
result: Value,
37-
id: Value,
38-
}
39-
40-
#[derive(Serialize)]
41-
struct JsonRpcErrorResponse {
42-
jsonrpc: String,
43-
error: JsonRpcError,
44-
id: Value,
45-
}
46-
47-
#[derive(Serialize)]
48-
struct JsonRpcError {
49-
code: i32,
50-
message: String,
51-
}
52-
53-
struct ServerState {
54-
client: Arc<helios::ethereum::EthereumClient>,
55-
shutdown_signal: Option<oneshot::Receiver<()>>,
56-
}
57-
58-
fn parse_block_id(value: &Value) -> Result<BlockId, Error> {
59-
match value {
60-
Value::String(s) => {
61-
if s == "latest" {
62-
Ok(BlockId::latest())
63-
} else if s == "earliest" {
64-
Ok(BlockId::earliest())
65-
} else if s == "pending" {
66-
Ok(BlockId::pending())
67-
} else if s == "safe" {
68-
Ok(BlockId::safe())
69-
} else if s == "finalized" {
70-
Ok(BlockId::finalized())
71-
} else if s.starts_with("0x") {
72-
// Try to parse as block hash
73-
let hash = B256::from_str(s)
74-
.map_err(|e| Error::msg(format!("Invalid block hash: {}", e)))?;
75-
Ok(BlockId::Hash(hash.into()))
76-
} else {
77-
// Try to parse as block number
78-
let num = s
79-
.parse::<u64>()
80-
.map_err(|e| Error::msg(format!("Invalid block number: {}", e)))?;
81-
Ok(BlockId::Number(num.into()))
82-
}
83-
}
84-
Value::Number(n) => {
85-
if let Some(num) = n.as_u64() {
86-
Ok(BlockId::Number(num.into()))
87-
} else {
88-
Err(Error::msg("Invalid block number"))
89-
}
90-
}
91-
_ => Err(Error::msg("Invalid block identifier")),
92-
}
93-
}
94-
95-
fn parse_address(value: &Value) -> Result<Address, Error> {
96-
match value {
97-
Value::String(s) => {
98-
Address::from_str(s).map_err(|e| Error::msg(format!("Invalid address: {}", e)))
99-
}
100-
_ => Err(Error::msg("Invalid address format")),
101-
}
102-
}
103-
104-
fn parse_b256(value: &Value) -> Result<B256, Error> {
105-
match value {
106-
Value::String(s) => {
107-
B256::from_str(s).map_err(|e| Error::msg(format!("Invalid hash: {}", e)))
108-
}
109-
_ => Err(Error::msg("Invalid hash format")),
110-
}
111-
}
112-
113-
fn parse_u256(value: &Value) -> Result<U256, Error> {
114-
match value {
115-
Value::String(s) => {
116-
U256::from_str(s).map_err(|e| Error::msg(format!("Invalid U256: {}", e)))
117-
}
118-
Value::Number(n) => {
119-
if let Some(num) = n.as_u64() {
120-
Ok(U256::from(num))
121-
} else {
122-
Err(Error::msg("Invalid U256 number"))
123-
}
124-
}
125-
_ => Err(Error::msg("Invalid U256 format")),
126-
}
127-
}
1288

129-
fn parse_bytes(value: &Value) -> Result<Bytes, Error> {
130-
match value {
131-
Value::String(s) => {
132-
if s.starts_with("0x") {
133-
let s = s.trim_start_matches("0x");
134-
let bytes =
135-
hex::decode(s).map_err(|e| Error::msg(format!("Invalid hex: {}", e)))?;
136-
Ok(Bytes::from(bytes))
137-
} else {
138-
Err(Error::msg("Hex string must start with 0x"))
139-
}
140-
}
141-
_ => Err(Error::msg("Invalid bytes format")),
142-
}
143-
}
9+
use crate::core::server::create_error_response;
10+
use crate::core::types::{
11+
JsonRpcError, JsonRpcErrorResponse, JsonRpcRequest, JsonRpcResponse, ServerState,
12+
};
13+
use crate::core::utils::{parse_address, parse_b256, parse_block_id, parse_bytes, parse_u256};
14414

145-
async fn handle_request(
15+
pub async fn handle_request(
14616
req: Request<Body>,
14717
state: Arc<Mutex<ServerState>>,
14818
) -> Result<Response<Body>, Infallible> {
@@ -441,104 +311,3 @@ async fn handle_request(
441311

442312
Ok(response)
443313
}
444-
445-
fn create_error_response(
446-
code: i32,
447-
message: &str,
448-
id: Value,
449-
) -> Result<Response<Body>, Infallible> {
450-
let error_response = JsonRpcErrorResponse {
451-
jsonrpc: "2.0".to_string(),
452-
error: JsonRpcError {
453-
code,
454-
message: message.to_string(),
455-
},
456-
id,
457-
};
458-
459-
let mut response = Response::new(Body::from(serde_json::to_string(&error_response).unwrap()));
460-
response.headers_mut().insert(
461-
hyper::header::CONTENT_TYPE,
462-
hyper::header::HeaderValue::from_static("application/json"),
463-
);
464-
465-
Ok(response)
466-
}
467-
468-
pub fn start_server(
469-
address: SocketAddr,
470-
execution_rpc: String,
471-
consensus_rpc: String,
472-
data_dir: PathBuf,
473-
shutdown_rx: oneshot::Receiver<()>,
474-
) -> Result<Runtime, Error> {
475-
let env_filter = EnvFilter::builder()
476-
.with_default_directive(LevelFilter::INFO.into())
477-
.from_env_lossy();
478-
479-
let subscriber = FmtSubscriber::builder()
480-
.with_env_filter(env_filter)
481-
.finish();
482-
483-
let _ = tracing::subscriber::set_global_default(subscriber);
484-
485-
let runtime = Runtime::new()?;
486-
487-
let client = runtime.block_on(async {
488-
info!(target: "helios::server", "Building Helios client...");
489-
490-
let client = EthereumClientBuilder::new()
491-
.network(Network::Mainnet)
492-
.execution_rpc(&execution_rpc)
493-
.unwrap()
494-
.consensus_rpc(&consensus_rpc)
495-
.unwrap()
496-
.data_dir(data_dir)
497-
.with_file_db()
498-
.build()
499-
.unwrap();
500-
501-
info!(target: "helios::server", "Helios client built successfully");
502-
503-
Ok::<_, Error>(client)
504-
})?;
505-
506-
let state = Arc::new(Mutex::new(ServerState {
507-
client: Arc::new(client),
508-
shutdown_signal: Some(shutdown_rx),
509-
}));
510-
511-
let state_clone = state.clone();
512-
513-
runtime.spawn(async move {
514-
info!(target: "helios::server", "Starting HTTP server on {}", address);
515-
516-
let make_svc = make_service_fn(move |_conn| {
517-
let state = state_clone.clone();
518-
async move {
519-
Ok::<_, Infallible>(service_fn(move |req| {
520-
handle_request(req, state.clone())
521-
}))
522-
}
523-
});
524-
525-
let server = Server::bind(&address).serve(make_svc);
526-
527-
let shutdown_signal = {
528-
let mut state = state.lock().unwrap();
529-
state.shutdown_signal.take().unwrap()
530-
};
531-
532-
let server = server.with_graceful_shutdown(async {
533-
shutdown_signal.await.ok();
534-
info!(target: "helios::server", "Server shutting down gracefully");
535-
});
536-
537-
info!(target: "helios::server", "HTTP server running at {}", address);
538-
if let Err(e) = server.await {
539-
error!(target: "helios::server", "Server error: {}", e);
540-
}
541-
});
542-
543-
Ok(runtime)
544-
}

native/helios_nif/src/core/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,4 @@
11
pub mod light_client;
2+
pub mod server;
3+
pub mod types;
4+
pub mod utils;
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use anyhow::Error;
2+
use helios::ethereum::{EthereumClientBuilder, config::networks::Network};
3+
use hyper::{
4+
Body, Response, Server,
5+
service::{make_service_fn, service_fn},
6+
};
7+
use serde_json::Value;
8+
use std::convert::Infallible;
9+
use std::net::SocketAddr;
10+
use std::path::PathBuf;
11+
use std::sync::{Arc, Mutex};
12+
use tokio::runtime::Runtime;
13+
use tokio::sync::oneshot;
14+
use tracing::{error, info};
15+
use tracing_subscriber::FmtSubscriber;
16+
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
17+
18+
use crate::core::light_client::handle_request;
19+
use crate::core::types::{JsonRpcError, JsonRpcErrorResponse, ServerState};
20+
21+
pub fn create_error_response(
22+
code: i32,
23+
message: &str,
24+
id: Value,
25+
) -> Result<Response<Body>, Infallible> {
26+
let error_response = JsonRpcErrorResponse {
27+
jsonrpc: "2.0".to_string(),
28+
error: JsonRpcError {
29+
code,
30+
message: message.to_string(),
31+
},
32+
id,
33+
};
34+
35+
let mut response = Response::new(Body::from(serde_json::to_string(&error_response).unwrap()));
36+
response.headers_mut().insert(
37+
hyper::header::CONTENT_TYPE,
38+
hyper::header::HeaderValue::from_static("application/json"),
39+
);
40+
41+
Ok(response)
42+
}
43+
44+
pub fn start_server(
45+
address: SocketAddr,
46+
execution_rpc: String,
47+
consensus_rpc: String,
48+
data_dir: PathBuf,
49+
shutdown_rx: oneshot::Receiver<()>,
50+
) -> Result<Runtime, Error> {
51+
let env_filter = EnvFilter::builder()
52+
.with_default_directive(LevelFilter::INFO.into())
53+
.from_env_lossy();
54+
55+
let subscriber = FmtSubscriber::builder()
56+
.with_env_filter(env_filter)
57+
.finish();
58+
59+
let _ = tracing::subscriber::set_global_default(subscriber);
60+
61+
let runtime = Runtime::new()?;
62+
63+
let client = runtime.block_on(async {
64+
info!(target: "helios::server", "Building Helios client...");
65+
66+
let client = EthereumClientBuilder::new()
67+
.network(Network::Mainnet)
68+
.execution_rpc(&execution_rpc)
69+
.unwrap()
70+
.consensus_rpc(&consensus_rpc)
71+
.unwrap()
72+
.data_dir(data_dir)
73+
.with_file_db()
74+
.build()
75+
.unwrap();
76+
77+
info!(target: "helios::server", "Helios client built successfully");
78+
79+
Ok::<_, Error>(client)
80+
})?;
81+
82+
let state = Arc::new(Mutex::new(ServerState {
83+
client: Arc::new(client),
84+
shutdown_signal: Some(shutdown_rx),
85+
}));
86+
87+
let state_clone = state.clone();
88+
89+
runtime.spawn(async move {
90+
info!(target: "helios::server", "Starting HTTP server on {}", address);
91+
92+
let make_svc = make_service_fn(move |_conn| {
93+
let state = state_clone.clone();
94+
async move {
95+
Ok::<_, Infallible>(service_fn(move |req| {
96+
handle_request(req, state.clone())
97+
}))
98+
}
99+
});
100+
101+
let server = Server::bind(&address).serve(make_svc);
102+
let shutdown_signal = {
103+
let mut state = state.lock().unwrap();
104+
state.shutdown_signal.take().unwrap()
105+
};
106+
107+
let server = server.with_graceful_shutdown(async {
108+
shutdown_signal.await.ok();
109+
info!(target: "helios::server", "Server shutting down gracefully");
110+
});
111+
112+
info!(target: "helios::server", "HTTP server running at {}", address);
113+
if let Err(e) = server.await {
114+
error!(target: "helios::server", "Server error: {}", e);
115+
}
116+
});
117+
118+
Ok(runtime)
119+
}

0 commit comments

Comments
 (0)