diff --git a/src/connection.rs b/src/connection.rs index 1c61027..069ed8d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -526,9 +526,8 @@ impl PgReplicationConnection { unsafe { PQfreemem(buffer as *mut c_void) }; Ok(ReadResult::Data(data)) } - 0 | -2 => { - // 0 : According to libpq docs, 0 means async mode and no data ready - // 2 : No complete data available yet, would block + 0 => { + // 0: According to libpq docs, async mode and no data ready Ok(ReadResult::WouldBlock) } -1 => { @@ -536,6 +535,12 @@ impl PgReplicationConnection { debug!("COPY stream finished (PQgetCopyData returned -1)"); Ok(ReadResult::CopyDone) } + -2 => { + let error_msg = self.last_error_message(); + Err(ReplicationError::protocol(format!( + "PQgetCopyData error: {error_msg}" + ))) + } other => Err(ReplicationError::protocol(format!( "Unexpected PQgetCopyData result: {other}" ))), diff --git a/src/lsn.rs b/src/lsn.rs index 397e809..5b8e747 100644 --- a/src/lsn.rs +++ b/src/lsn.rs @@ -12,7 +12,7 @@ //! we need a thread-safe way to share the committed LSN from consumer back to producer //! for accurate feedback to PostgreSQL. -use crate::types::{format_lsn, XLogRecPtr}; +use crate::types::{format_lsn, CachePadded, XLogRecPtr}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tracing::{debug, info}; @@ -42,9 +42,9 @@ use tracing::{debug, info}; #[derive(Debug)] pub struct SharedLsnFeedback { /// Last flushed LSN - data written to destination before commit - flushed_lsn: AtomicU64, + flushed_lsn: CachePadded, /// Last applied/replayed LSN - data committed to destination - applied_lsn: AtomicU64, + applied_lsn: CachePadded, } impl SharedLsnFeedback { @@ -64,8 +64,8 @@ impl SharedLsnFeedback { /// ``` pub fn new() -> Self { Self { - flushed_lsn: AtomicU64::new(0), - applied_lsn: AtomicU64::new(0), + flushed_lsn: CachePadded::new(AtomicU64::new(0)), + applied_lsn: CachePadded::new(AtomicU64::new(0)), } } @@ -244,8 +244,8 @@ impl Default for SharedLsnFeedback { impl Clone for SharedLsnFeedback { fn clone(&self) -> Self { Self { - flushed_lsn: AtomicU64::new(self.flushed_lsn.load(Ordering::Acquire)), - applied_lsn: AtomicU64::new(self.applied_lsn.load(Ordering::Acquire)), + flushed_lsn: CachePadded::new(AtomicU64::new(self.flushed_lsn.load(Ordering::Acquire))), + applied_lsn: CachePadded::new(AtomicU64::new(self.applied_lsn.load(Ordering::Acquire))), } } } diff --git a/src/stream.rs b/src/stream.rs index a9ef8a2..b63c129 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -645,6 +645,11 @@ impl LogicalReplicationStream { return Err(e); } + if e.is_permanent() { + error!("Permanent error in event processing: {}", e); + return Err(e); + } + // Check if we've exhausted retry attempts if attempt >= MAX_ATTEMPTS { error!( @@ -719,7 +724,7 @@ impl LogicalReplicationStream { } // Get the remaining bytes for message parsing - let message_data = reader.read_bytes(reader.remaining())?; + let message_data = reader.read_bytes_buf(reader.remaining())?; let replication_message = self.parser.parse_wal_message(&message_data)?; self.convert_to_change_event(replication_message, start_lsn) } diff --git a/src/types.rs b/src/types.rs index 1e95725..1e55944 100644 --- a/src/types.rs +++ b/src/types.rs @@ -23,6 +23,74 @@ pub type Oid = u32; /// PostgreSQL Timestamp (microseconds since 2000-01-01) pub type TimestampTz = i64; +/// Pads and aligns a value to the length of a cache line to reduce false sharing. +#[derive(Debug)] +#[cfg_attr( + any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "arm64ec", + target_arch = "powerpc64", + ), + repr(align(128)) +)] +#[cfg_attr( + any( + target_arch = "arm", + target_arch = "mips", + target_arch = "mips32r6", + target_arch = "mips64", + target_arch = "mips64r6", + target_arch = "sparc", + target_arch = "hexagon", + ), + repr(align(32)) +)] +#[cfg_attr(target_arch = "m68k", repr(align(16)))] +#[cfg_attr(target_arch = "s390x", repr(align(256)))] +#[cfg_attr( + not(any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "arm64ec", + target_arch = "powerpc64", + target_arch = "arm", + target_arch = "mips", + target_arch = "mips32r6", + target_arch = "mips64", + target_arch = "mips64r6", + target_arch = "sparc", + target_arch = "hexagon", + target_arch = "m68k", + target_arch = "s390x", + )), + repr(align(64)) +)] +pub struct CachePadded { + value: T, +} + +impl CachePadded { + #[inline] + pub const fn new(value: T) -> Self { + Self { value } + } +} + +impl std::ops::Deref for CachePadded { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.value + } +} + +impl std::ops::DerefMut for CachePadded { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.value + } +} + /// Convert SystemTime to PostgreSQL timestamp format (microseconds since 2000-01-01) /// /// PostgreSQL uses a different epoch than Unix (2000-01-01 vs 1970-01-01). @@ -811,6 +879,21 @@ mod tests { use chrono::{TimeZone, Utc}; use std::time::{SystemTime, UNIX_EPOCH}; + #[test] + fn test_cache_padded_deref_and_mut() { + let mut padded = CachePadded::new(10u32); + assert_eq!(*padded, 10); + *padded = 42; + assert_eq!(*padded, 42); + } + + #[test] + fn test_cache_padded_alignment() { + let padded = CachePadded::new(0u8); + let addr = (&*padded as *const u8 as usize) % std::mem::align_of::>(); + assert_eq!(addr, 0); + } + #[test] fn test_lsn_parsing() { assert_eq!(parse_lsn("0/12345678").unwrap(), 0x12345678);