Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,16 +526,21 @@ impl PgReplicationConnection {
unsafe { PQfreemem(buffer as *mut c_void) };
Ok(ReadResult::Data(data))
}
0 | -2 => {
// 0 : According to libpq docs, 0 means async mode and no data ready
// 2 : No complete data available yet, would block
0 => {
// 0: According to libpq docs, async mode and no data ready
Ok(ReadResult::WouldBlock)
}
-1 => {
// COPY finished - this is a graceful shutdown signal
debug!("COPY stream finished (PQgetCopyData returned -1)");
Ok(ReadResult::CopyDone)
}
-2 => {
let error_msg = self.last_error_message();
Err(ReplicationError::protocol(format!(
"PQgetCopyData error: {error_msg}"
)))
}
other => Err(ReplicationError::protocol(format!(
"Unexpected PQgetCopyData result: {other}"
))),
Expand Down
14 changes: 7 additions & 7 deletions src/lsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//! we need a thread-safe way to share the committed LSN from consumer back to producer
//! for accurate feedback to PostgreSQL.

use crate::types::{format_lsn, XLogRecPtr};
use crate::types::{format_lsn, CachePadded, XLogRecPtr};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tracing::{debug, info};
Expand Down Expand Up @@ -42,9 +42,9 @@ use tracing::{debug, info};
#[derive(Debug)]
pub struct SharedLsnFeedback {
/// Last flushed LSN - data written to destination before commit
flushed_lsn: AtomicU64,
flushed_lsn: CachePadded<AtomicU64>,
/// Last applied/replayed LSN - data committed to destination
applied_lsn: AtomicU64,
applied_lsn: CachePadded<AtomicU64>,
}

impl SharedLsnFeedback {
Expand All @@ -64,8 +64,8 @@ impl SharedLsnFeedback {
/// ```
pub fn new() -> Self {
Self {
flushed_lsn: AtomicU64::new(0),
applied_lsn: AtomicU64::new(0),
flushed_lsn: CachePadded::new(AtomicU64::new(0)),
applied_lsn: CachePadded::new(AtomicU64::new(0)),
}
}

Expand Down Expand Up @@ -244,8 +244,8 @@ impl Default for SharedLsnFeedback {
impl Clone for SharedLsnFeedback {
fn clone(&self) -> Self {
Self {
flushed_lsn: AtomicU64::new(self.flushed_lsn.load(Ordering::Acquire)),
applied_lsn: AtomicU64::new(self.applied_lsn.load(Ordering::Acquire)),
flushed_lsn: CachePadded::new(AtomicU64::new(self.flushed_lsn.load(Ordering::Acquire))),
applied_lsn: CachePadded::new(AtomicU64::new(self.applied_lsn.load(Ordering::Acquire))),
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,11 @@ impl LogicalReplicationStream {
return Err(e);
}

if e.is_permanent() {
error!("Permanent error in event processing: {}", e);
return Err(e);
}

// Check if we've exhausted retry attempts
if attempt >= MAX_ATTEMPTS {
error!(
Expand Down Expand Up @@ -719,7 +724,7 @@ impl LogicalReplicationStream {
}

// Get the remaining bytes for message parsing
let message_data = reader.read_bytes(reader.remaining())?;
let message_data = reader.read_bytes_buf(reader.remaining())?;
let replication_message = self.parser.parse_wal_message(&message_data)?;
self.convert_to_change_event(replication_message, start_lsn)
}
Expand Down
83 changes: 83 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,74 @@ pub type Oid = u32;
/// PostgreSQL Timestamp (microseconds since 2000-01-01)
pub type TimestampTz = i64;

/// Pads and aligns a value to the length of a cache line to reduce false sharing.
#[derive(Debug)]
#[cfg_attr(
any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "arm64ec",
target_arch = "powerpc64",
),
repr(align(128))
)]
#[cfg_attr(
any(
target_arch = "arm",
target_arch = "mips",
target_arch = "mips32r6",
target_arch = "mips64",
target_arch = "mips64r6",
target_arch = "sparc",
target_arch = "hexagon",
),
repr(align(32))
)]
#[cfg_attr(target_arch = "m68k", repr(align(16)))]
#[cfg_attr(target_arch = "s390x", repr(align(256)))]
#[cfg_attr(
not(any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "arm64ec",
target_arch = "powerpc64",
target_arch = "arm",
target_arch = "mips",
target_arch = "mips32r6",
target_arch = "mips64",
target_arch = "mips64r6",
target_arch = "sparc",
target_arch = "hexagon",
target_arch = "m68k",
target_arch = "s390x",
)),
repr(align(64))
)]
pub struct CachePadded<T> {
value: T,
}

impl<T> CachePadded<T> {
#[inline]
pub const fn new(value: T) -> Self {
Self { value }
}
}

impl<T> std::ops::Deref for CachePadded<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.value
}
}

impl<T> std::ops::DerefMut for CachePadded<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.value
}
}

/// Convert SystemTime to PostgreSQL timestamp format (microseconds since 2000-01-01)
///
/// PostgreSQL uses a different epoch than Unix (2000-01-01 vs 1970-01-01).
Expand Down Expand Up @@ -811,6 +879,21 @@ mod tests {
use chrono::{TimeZone, Utc};
use std::time::{SystemTime, UNIX_EPOCH};

#[test]
fn test_cache_padded_deref_and_mut() {
let mut padded = CachePadded::new(10u32);
assert_eq!(*padded, 10);
*padded = 42;
assert_eq!(*padded, 42);
}

#[test]
fn test_cache_padded_alignment() {
let padded = CachePadded::new(0u8);
let addr = (&*padded as *const u8 as usize) % std::mem::align_of::<CachePadded<u8>>();
assert_eq!(addr, 0);
}

#[test]
fn test_lsn_parsing() {
assert_eq!(parse_lsn("0/12345678").unwrap(), 0x12345678);
Expand Down