Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
run: |
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --profile=minimal --default-toolchain ${{ matrix.toolchain }}
rustup override set ${{ matrix.toolchain }}
if [ "${{ matrix.msrv }}" = "true" ]; then rustup component add clippy; fi
- name: Check formatting
if: matrix.check-fmt
run: rustup component add rustfmt && cargo fmt --all -- --check
Expand All @@ -37,6 +38,9 @@ jobs:
echo "No packages need pinning for MSRV ${{ matrix.toolchain }}"
- name: Build on Rust ${{ matrix.toolchain }}
run: cargo build --verbose --color always
- name: Check clippy if on msrv
if: matrix.msrv
run: cargo clippy --all-features -- -D warnings
- name: Test on Rust ${{ matrix.toolchain }}
run: cargo test
- name: Cargo check release on Rust ${{ matrix.toolchain }}
Expand Down
7 changes: 7 additions & 0 deletions ldk-server-protos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ edition = "2021"

build = "build.rs"

# We use a cfg instead of a feature for genproto to prevent it from being
# enabled with --all-features. Proto generation is a developer-only tool that
# requires external dependencies (protoc) and shouldn't be triggered accidentally.
# This lint configuration tells Cargo that genproto is an expected custom cfg.
[lints.rust]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this correctly, but running the command to generate updated proto objects usually introduces some format changes to all the existing generated rust code, and will need you to re-format each file manually. Does this also fix that or is there a way we can handle that here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are just supposed to run cargo fmt after you generate, not really relevant here though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Just wanted to know if there's a way to make it skip re-formatting files that weren't touched, but I agree it's not relevant here

unexpected_cfgs = { level = "warn", check-cfg = ['cfg(genproto)'] }

[features]
default = []
serde = ["dep:serde", "dep:bytes"]
Expand Down
2 changes: 1 addition & 1 deletion ldk-server/src/api/bolt11_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::str::FromStr;
pub(crate) fn handle_bolt11_send_request(
context: Context, request: Bolt11SendRequest,
) -> Result<Bolt11SendResponse, LdkServerError> {
let invoice = Bolt11Invoice::from_str(&request.invoice.as_str())
let invoice = Bolt11Invoice::from_str(request.invoice.as_str())
.map_err(|_| ldk_node::NodeError::InvalidInvoice)?;

let route_parameters = match request.route_parameters {
Expand Down
2 changes: 1 addition & 1 deletion ldk-server/src/api/bolt12_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub(crate) fn handle_bolt12_send_request(
context: Context, request: Bolt12SendRequest,
) -> Result<Bolt12SendResponse, LdkServerError> {
let offer =
Offer::from_str(&request.offer.as_str()).map_err(|_| ldk_node::NodeError::InvalidOffer)?;
Offer::from_str(request.offer.as_str()).map_err(|_| ldk_node::NodeError::InvalidOffer)?;

let route_parameters = match request.route_parameters {
Some(params) => {
Expand Down
5 changes: 1 addition & 4 deletions ldk-server/src/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ impl fmt::Display for LdkServerError {
}

#[derive(Clone, Debug, PartialEq, Eq)]
#[allow(clippy::enum_variant_names)]
pub(crate) enum LdkServerErrorCode {
/// Please refer to [`protos::error::ErrorCode::InvalidRequestError`].
InvalidRequestError,

/// Please refer to [`protos::error::ErrorCode::AuthError`].
AuthError,

/// Please refer to [`protos::error::ErrorCode::LightningError`].
LightningError,

Expand All @@ -47,7 +45,6 @@ impl fmt::Display for LdkServerErrorCode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LdkServerErrorCode::InvalidRequestError => write!(f, "InvalidRequestError"),
LdkServerErrorCode::AuthError => write!(f, "AuthError"),
LdkServerErrorCode::LightningError => write!(f, "LightningError"),
LdkServerErrorCode::InternalServerError => write!(f, "InternalServerError"),
}
Expand Down
4 changes: 1 addition & 3 deletions ldk-server/src/api/get_payment_details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ pub(crate) fn handle_get_payment_details_request(

let payment_details = context.node.payment(&PaymentId(payment_id_bytes));

let response = GetPaymentDetailsResponse {
payment: payment_details.map(|payment| payment_to_proto(payment)),
};
let response = GetPaymentDetailsResponse { payment: payment_details.map(payment_to_proto) };

Ok(response)
}
2 changes: 1 addition & 1 deletion ldk-server/src/api/list_channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ldk_server_protos::api::{ListChannelsRequest, ListChannelsResponse};
pub(crate) fn handle_list_channels_request(
context: Context, _request: ListChannelsRequest,
) -> Result<ListChannelsResponse, LdkServerError> {
let channels = context.node.list_channels().into_iter().map(|c| channel_to_proto(c)).collect();
let channels = context.node.list_channels().into_iter().map(channel_to_proto).collect();

let response = ListChannelsResponse { channels };
Ok(response)
Expand Down
2 changes: 1 addition & 1 deletion ldk-server/src/api/onchain_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) fn handle_onchain_send_request(
)
})?;

let fee_rate = request.fee_rate_sat_per_vb.map(FeeRate::from_sat_per_vb).flatten();
let fee_rate = request.fee_rate_sat_per_vb.and_then(FeeRate::from_sat_per_vb);
let txid = match (request.amount_sats, request.send_all) {
(Some(amount_sats), None) => {
context.node.onchain_payment().send_to_address(&address, amount_sats, fee_rate)?
Expand Down
3 changes: 3 additions & 0 deletions ldk-server/src/io/events/event_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ pub trait EventPublisher: Send + Sync {
async fn publish(&self, event: EventEnvelope) -> Result<(), LdkServerError>;
}

/// A no-op implementation of the [`EventPublisher`] trait.
#[cfg(not(feature = "events-rabbitmq"))]
pub(crate) struct NoopEventPublisher;

#[async_trait]
#[cfg(not(feature = "events-rabbitmq"))]
impl EventPublisher for NoopEventPublisher {
/// Publishes an event to a no-op sink, effectively discarding it.
///
Expand Down
19 changes: 0 additions & 19 deletions ldk-server/src/io/persist/paginated_kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,6 @@ pub trait PaginatedKVStore: Send + Sync {
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, time: i64, buf: &[u8],
) -> Result<(), io::Error>;

/// Removes any data that had previously been persisted under the given `key`.
///
/// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily
/// remove the given `key` at some point in time after the method returns, e.g., as part of an
/// eventual batch deletion of multiple keys. As a consequence, subsequent calls to
/// [`PaginatedKVStore::list`] might include the removed key until the changes are actually persisted.
///
/// Note that while setting the `lazy` flag reduces the I/O burden of multiple subsequent
/// `remove` calls, it also influences the atomicity guarantees as lazy `remove`s could
/// potentially get lost on crash after the method returns. Therefore, this flag should only be
/// set for `remove` operations that can be safely replayed at a later time.
///
/// Returns successfully if no data will be stored for the given `primary_namespace`,
/// `secondary_namespace`, and `key`, independently of whether it was present before its
/// invocation or not.
fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> Result<(), io::Error>;

/// Returns a paginated list of keys that are stored under the given `secondary_namespace` in
/// `primary_namespace`, ordered in descending order of `time`.
///
Expand Down
103 changes: 17 additions & 86 deletions ldk-server/src/io/persist/sqlite_store/mod.rs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be okay for now, but that's a reminder that before we have the codebases diverge even further, can should finally upstream PaginatedKVStore to LDK and add SqliteStore/PostgresStore there, too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good call out, will start on that soon

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const LIST_KEYS_MAX_PAGE_SIZE: i32 = 100;

pub struct SqliteStore {
connection: Arc<Mutex<Connection>>,
data_dir: PathBuf,
paginated_kv_table_name: String,
}

Expand All @@ -44,18 +43,18 @@ impl SqliteStore {
data_dir.display(),
e
);
io::Error::new(io::ErrorKind::Other, msg)
io::Error::other(msg)
})?;
let mut db_file_path = data_dir.clone();
let mut db_file_path = data_dir;
db_file_path.push(db_file_name);

let connection = Connection::open(db_file_path.clone()).map_err(|e| {
let msg =
format!("Failed to open/create database file {}: {}", db_file_path.display(), e);
io::Error::new(io::ErrorKind::Other, msg)
io::Error::other(msg)
})?;

let sql = format!("SELECT user_version FROM pragma_user_version");
let sql = "SELECT user_version FROM pragma_user_version".to_string();
let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).unwrap();

if version_res == 0 {
Expand All @@ -69,14 +68,14 @@ impl SqliteStore {
)
.map_err(|e| {
let msg = format!("Failed to set PRAGMA user_version: {}", e);
io::Error::new(io::ErrorKind::Other, msg)
io::Error::other(msg)
})?;
} else if version_res > SCHEMA_USER_VERSION {
let msg = format!(
"Failed to open database: incompatible schema version {}. Expected: {}",
version_res, SCHEMA_USER_VERSION
);
return Err(io::Error::new(io::ErrorKind::Other, msg));
return Err(io::Error::other(msg));
}

let create_paginated_kv_table_sql = format!(
Expand All @@ -92,7 +91,7 @@ impl SqliteStore {

connection.execute(&create_paginated_kv_table_sql, []).map_err(|e| {
let msg = format!("Failed to create table {}: {}", paginated_kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
io::Error::other(msg)
})?;

let index_creation_time_sql = format!(
Expand All @@ -105,16 +104,11 @@ impl SqliteStore {
"Failed to create index on creation_time, table {}: {}",
paginated_kv_table_name, e
);
io::Error::new(io::ErrorKind::Other, msg)
io::Error::other(msg)
})?;

let connection = Arc::new(Mutex::new(connection));
Ok(Self { connection, data_dir, paginated_kv_table_name })
}

/// Returns the data directory.
pub fn get_data_dir(&self) -> PathBuf {
self.data_dir.clone()
Ok(Self { connection, paginated_kv_table_name })
}

fn read_internal(
Expand All @@ -129,7 +123,7 @@ impl SqliteStore {

let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
let msg = format!("Failed to prepare statement: {}", e);
io::Error::new(io::ErrorKind::Other, msg)
io::Error::other(msg)
})?;

let res = stmt
Expand Down Expand Up @@ -159,43 +153,11 @@ impl SqliteStore {
PrintableString(key),
e
);
io::Error::new(io::ErrorKind::Other, msg)
io::Error::other(msg)
},
})?;
Ok(res)
}

fn remove_internal(
&self, kv_table_name: &str, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> io::Result<()> {
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;

let locked_conn = self.connection.lock().unwrap();

let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", kv_table_name);

let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
let msg = format!("Failed to prepare statement: {}", e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

stmt.execute(named_params! {
":primary_namespace": primary_namespace,
":secondary_namespace": secondary_namespace,
":key": key,
})
.map_err(|e| {
let msg = format!(
"Failed to delete key {}/{}/{}: {}",
PrintableString(primary_namespace),
PrintableString(secondary_namespace),
PrintableString(key),
e
);
io::Error::new(io::ErrorKind::Other, msg)
})?;
Ok(())
}
}

impl PaginatedKVStore for SqliteStore {
Expand Down Expand Up @@ -227,7 +189,7 @@ impl PaginatedKVStore for SqliteStore {

let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
let msg = format!("Failed to prepare statement: {}", e);
io::Error::new(io::ErrorKind::Other, msg)
io::Error::other(msg)
})?;

stmt.execute(named_params! {
Expand All @@ -246,21 +208,10 @@ impl PaginatedKVStore for SqliteStore {
PrintableString(key),
e
);
io::Error::new(io::ErrorKind::Other, msg)
io::Error::other(msg)
})
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
) -> io::Result<()> {
self.remove_internal(
&self.paginated_kv_table_name,
primary_namespace,
secondary_namespace,
key,
)
}

fn list(
&self, primary_namespace: &str, secondary_namespace: &str,
page_token: Option<(String, i64)>,
Expand All @@ -278,7 +229,7 @@ impl PaginatedKVStore for SqliteStore {

let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
let msg = format!("Failed to prepare statement: {}", e);
io::Error::new(io::ErrorKind::Other, msg)
io::Error::other(msg)
})?;

let mut keys: Vec<String> = Vec::new();
Expand All @@ -301,14 +252,14 @@ impl PaginatedKVStore for SqliteStore {
)
.map_err(|e| {
let msg = format!("Failed to retrieve queried rows: {}", e);
io::Error::new(io::ErrorKind::Other, msg)
io::Error::other(msg)
})?;

let mut last_creation_time: Option<i64> = None;
for r in rows_iter {
let (k, ct) = r.map_err(|e| {
let msg = format!("Failed to retrieve queried rows: {}", e);
io::Error::new(io::ErrorKind::Other, msg)
io::Error::other(msg)
})?;
keys.push(k);
last_creation_time = Some(ct);
Expand All @@ -333,15 +284,6 @@ mod tests {
use rand::{thread_rng, Rng};
use std::panic::RefUnwindSafe;

impl Drop for SqliteStore {
fn drop(&mut self) {
match fs::remove_dir_all(&self.data_dir) {
Err(e) => println!("Failed to remove test store directory: {}", e),
_ => {},
}
}
}

#[test]
fn read_write_remove_list_persist() {
let mut temp_path = random_storage_path();
Expand Down Expand Up @@ -413,14 +355,8 @@ mod tests {
let read_data = kv_store.read(primary_namespace, secondary_namespace, testkey).unwrap();
assert_eq!(data, &*read_data);

kv_store.remove(primary_namespace, secondary_namespace, testkey, false).unwrap();

let listed_keys = list_all_keys(primary_namespace, secondary_namespace);
assert_eq!(listed_keys.len(), 109);

// Ensure we have no issue operating with primary_namespace/secondary_namespace/key being KVSTORE_NAMESPACE_KEY_MAX_LEN
let max_chars: String =
std::iter::repeat('A').take(KVSTORE_NAMESPACE_KEY_MAX_LEN).collect();
let max_chars: String = "A".repeat(KVSTORE_NAMESPACE_KEY_MAX_LEN);
kv_store.write(&max_chars, &max_chars, &max_chars, 0, &data).unwrap();

println!("{:?}", listed_keys);
Expand All @@ -431,10 +367,5 @@ mod tests {

let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap();
assert_eq!(data, &*read_data);

kv_store.remove(&max_chars, &max_chars, &max_chars, false).unwrap();

let listed_keys = list_all_keys(&max_chars, &max_chars);
assert_eq!(listed_keys.len(), 0);
}
}
Loading