From d45a43b3ea00675321981ae5a7d1143b365d0974 Mon Sep 17 00:00:00 2001 From: Duane Johnson Date: Thu, 22 Jan 2026 20:12:19 -0700 Subject: [PATCH 1/7] feat: add arena generation counter for handler invalidation --- crates/loro-internal/src/lib.rs | 8 +++++++- crates/loro-internal/src/loro.rs | 22 +++++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/crates/loro-internal/src/lib.rs b/crates/loro-internal/src/lib.rs index 295ac5665..9fbb6cf76 100644 --- a/crates/loro-internal/src/lib.rs +++ b/crates/loro-internal/src/lib.rs @@ -13,7 +13,7 @@ pub mod diff; pub mod diff_calc; pub mod handler; pub mod sync; -use crate::sync::AtomicBool; +use crate::sync::{AtomicBool, AtomicU64}; use std::sync::Arc; mod change_meta; pub(crate) mod lock; @@ -162,6 +162,12 @@ pub struct LoroDocInner { txn: Arc>>, auto_commit: AtomicBool, detached: AtomicBool, + /// Generation counter for arena invalidation. + /// + /// This counter is incremented when the document's internals are swapped + /// (e.g., during `replace_with_shallow`). Handlers cache this value alongside + /// their `ContainerIdx` to detect when re-resolution is needed. + arena_generation: AtomicU64, local_update_subs: SubscriberSetWithQueue<(), LocalUpdateCallback, Vec>, peer_id_change_subs: SubscriberSetWithQueue<(), PeerIdUpdateCallback, ID>, first_commit_from_peer_subs: diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index db696acd0..a51e93b4c 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -2,7 +2,7 @@ use crate::encoding::json_schema::{encode_change, export_json_in_id_span}; pub use crate::encoding::ExportMode; use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload}; pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis}; -use crate::sync::AtomicBool; +use crate::sync::{AtomicBool, AtomicU64}; pub(crate) use crate::LoroDocInner; use crate::{ arena::SharedArena, @@ -111,6 +111,7 @@ impl LoroDoc { config, detached: AtomicBool::new(false), auto_commit: AtomicBool::new(false), + arena_generation: AtomicU64::new(0), observer: Arc::new(Observer::new(arena.clone())), diff_calculator: Arc::new( lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator), @@ -516,6 +517,25 @@ impl LoroDoc { self.detached.store(detached, Release); } + /// Get the current arena generation counter. + /// + /// This counter is incremented when the document's internals are swapped + /// (e.g., during `replace_with_shallow`). Handlers use this to detect + /// when their cached `ContainerIdx` needs re-resolution. + #[inline(always)] + pub fn arena_generation(&self) -> u64 { + self.arena_generation.load(Acquire) + } + + /// Increment the arena generation counter. + /// + /// This should be called after swapping document internals to invalidate + /// all cached `ContainerIdx` values in handlers. + #[inline] + pub(crate) fn bump_arena_generation(&self) { + self.arena_generation.fetch_add(1, Release); + } + #[inline(always)] pub fn peer_id(&self) -> PeerID { self.state From e95dd6a5d3201fcc8004bc98590dee0e9587c6fb Mon Sep 17 00:00:00 2001 From: Duane Johnson Date: Thu, 22 Jan 2026 21:33:35 -0700 Subject: [PATCH 2/7] feat: handler generation-based caching --- crates/loro-internal/src/handler.rs | 126 +++++++++++++++-------- crates/loro-internal/src/handler/tree.rs | 10 +- 2 files changed, 89 insertions(+), 47 deletions(-) diff --git a/crates/loro-internal/src/handler.rs b/crates/loro-internal/src/handler.rs index 63205c279..ce6ae58c7 100644 --- a/crates/loro-internal/src/handler.rs +++ b/crates/loro-internal/src/handler.rs @@ -63,7 +63,7 @@ pub trait HandlerTrait: Clone + Sized { fn idx(&self) -> ContainerIdx { self.attached_handler() - .map(|x| x.container_idx) + .map(|x| x.container_idx()) .unwrap_or_else(|| ContainerIdx::from_index_and_type(u32::MAX, self.kind())) } @@ -81,7 +81,7 @@ pub trait HandlerTrait: Clone + Sized { })?; let state = inner.doc.state.clone(); let mut guard = state.lock().unwrap(); - guard.with_state_mut(inner.container_idx, f) + guard.with_state_mut(inner.container_idx(), f) } } @@ -90,13 +90,27 @@ fn create_handler(inner: &BasicHandler, id: ContainerID) -> Handler { } /// Flatten attributes that allow overlap -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct BasicHandler { id: ContainerID, - container_idx: ContainerIdx, + /// Cached container index with generation for invalidation detection. + /// The tuple contains (ContainerIdx, generation) where generation is used + /// to detect when the arena has been swapped and re-resolution is needed. + /// Uses Mutex for thread-safety (required for Sync). + cached_idx: Arc>, doc: LoroDoc, } +impl std::fmt::Debug for BasicHandler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BasicHandler") + .field("id", &self.id) + .field("container_idx", &self.container_idx()) + .field("doc", &"...") + .finish() + } +} + struct DetachedInner { value: T, /// If the handler attached later, this field will be filled. @@ -162,6 +176,42 @@ impl From for MaybeDetached { } impl BasicHandler { + /// Create a new BasicHandler with the given id and doc. + /// The container index is resolved from the arena and cached with the current generation. + pub(crate) fn new(id: ContainerID, doc: LoroDoc) -> Self { + let idx = doc.arena.register_container(&id); + let gen = doc.arena_generation(); + Self { + id, + cached_idx: Arc::new(Mutex::new((idx, gen))), + doc, + } + } + + /// Get the container index, re-resolving from the arena if the generation has changed. + /// This ensures handlers remain valid after operations like `replace_with_shallow`. + #[inline] + pub(crate) fn container_idx(&self) -> ContainerIdx { + let guard = self.cached_idx.lock().unwrap(); + let (idx, gen) = *guard; + if gen == self.doc.arena_generation() { + idx + } else { + drop(guard); + self.resolve_and_cache() + } + } + + /// Cold path: re-resolve the container index from the arena and update the cache. + #[cold] + fn resolve_and_cache(&self) -> ContainerIdx { + let idx = self.doc.arena.register_container(&self.id); + let gen = self.doc.arena_generation(); + let mut guard = self.cached_idx.lock().unwrap(); + *guard = (idx, gen); + idx + } + pub(crate) fn doc(&self) -> LoroDoc { self.doc.clone() } @@ -181,15 +231,11 @@ impl BasicHandler { } fn get_parent(&self) -> Option { - let parent_idx = self.doc.arena.get_parent(self.container_idx)?; + let parent_idx = self.doc.arena.get_parent(self.container_idx())?; let parent_id = self.doc.arena.get_container_id(parent_idx).unwrap(); { let kind = parent_id.container_type(); - let handler = BasicHandler { - container_idx: parent_idx, - id: parent_id, - doc: self.doc.clone(), - }; + let handler = BasicHandler::new(parent_id, self.doc.clone()); Some(match kind { ContainerType::Map => Handler::Map(MapHandler { @@ -221,7 +267,7 @@ impl BasicHandler { .state .lock() .unwrap() - .get_value_by_idx(self.container_idx) + .get_value_by_idx(self.container_idx()) } pub fn get_deep_value(&self) -> LoroValue { @@ -229,12 +275,12 @@ impl BasicHandler { .state .lock() .unwrap() - .get_container_deep_value(self.container_idx) + .get_container_deep_value(self.container_idx()) } fn with_state(&self, f: impl FnOnce(&mut State) -> R) -> R { let mut guard = self.doc.state.lock().unwrap(); - guard.with_state_mut(self.container_idx, f) + guard.with_state_mut(self.container_idx(), f) } pub fn parent(&self) -> Option { @@ -246,7 +292,7 @@ impl BasicHandler { .state .lock() .unwrap() - .is_deleted(self.container_idx) + .is_deleted(self.container_idx()) } } @@ -1027,11 +1073,7 @@ impl HandlerTrait for Handler { impl Handler { pub(crate) fn new_attached(id: ContainerID, doc: LoroDoc) -> Self { let kind = id.container_type(); - let handler = BasicHandler { - container_idx: doc.arena.register_container(&id), - id, - doc, - }; + let handler = BasicHandler::new(id, doc); match kind { ContainerType::Map => Self::Map(MapHandler { @@ -1861,7 +1903,7 @@ impl TextHandler { }; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::List(crate::container::list::list_op::ListOp::Insert { slice: ListSlice::RawStr { str: Cow::Borrowed(s), @@ -1933,7 +1975,7 @@ impl TextHandler { for range in ranges.iter().rev() { let event_start = event_end - range.event_len as isize; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::List(ListOp::Delete(DeleteSpanWithId::new( range.id_start, range.entity_start as isize, @@ -2078,7 +2120,7 @@ impl TextHandler { let is_delete = matches!(&value, &LoroValue::Null); let mut doc_state = inner.doc.state.lock().unwrap(); - let len = doc_state.with_state_mut(inner.container_idx, |state| { + let len = doc_state.with_state_mut(inner.container_idx(), |state| { state.as_richtext_state_mut().unwrap().len(pos_type) }); @@ -2091,7 +2133,7 @@ impl TextHandler { } let (entity_range, skip, missing_style_key, event_start, event_end) = doc_state - .with_state_mut(inner.container_idx, |state| { + .with_state_mut(inner.container_idx(), |state| { let state = state.as_richtext_state_mut().unwrap(); let event_start = state.index_to_event_index(start, pos_type); let event_end = state.index_to_event_index(end, pos_type); @@ -2137,7 +2179,7 @@ impl TextHandler { drop(style_config); drop(doc_state); txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::List(ListOp::StyleStart { start: entity_start as u32, end: entity_end as u32, @@ -2154,7 +2196,7 @@ impl TextHandler { )?; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::List(ListOp::StyleEnd), EventHint::MarkEnd, &inner.doc, @@ -2632,7 +2674,7 @@ impl ListHandler { } txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::List(crate::container::list::list_op::ListOp::Insert { slice: ListSlice::RawData(Cow::Owned(vec![v.clone()])), pos, @@ -2716,7 +2758,7 @@ impl ListHandler { let container_id = ContainerID::new_normal(id, child.kind()); let v = LoroValue::Container(container_id.clone()); txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::List(crate::container::list::list_op::ListOp::Insert { slice: ListSlice::RawData(Cow::Owned(vec![v.clone()])), pos, @@ -2762,7 +2804,7 @@ impl ListHandler { for id in ids.into_iter() { txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::List(ListOp::Delete(DeleteSpanWithId::new( id.id(), pos as isize, @@ -2836,7 +2878,7 @@ impl ListHandler { pub fn get_deep_value_with_id(&self) -> LoroResult { let inner = self.inner.try_attached_state()?; Ok(inner.with_doc_state(|state| { - state.get_container_deep_value_with_id(inner.container_idx, None) + state.get_container_deep_value_with_id(inner.container_idx(), None) })) } @@ -3095,7 +3137,7 @@ impl MovableListHandler { let inner = self.inner.try_attached_state()?; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::List(crate::container::list::list_op::ListOp::Insert { slice: ListSlice::RawData(Cow::Owned(vec![v.clone()])), pos: op_index, @@ -3172,7 +3214,7 @@ impl MovableListHandler { let inner = self.inner.try_attached_state()?; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::List(crate::container::list::list_op::ListOp::Move { from: op_from as u32, to: op_to as u32, @@ -3290,7 +3332,7 @@ impl MovableListHandler { let v = LoroValue::Container(container_id.clone()); let inner = self.inner.try_attached_state()?; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::List(crate::container::list::list_op::ListOp::Insert { slice: ListSlice::RawData(Cow::Owned(vec![v.clone()])), pos: op_index, @@ -3350,7 +3392,7 @@ impl MovableListHandler { }); let hint = EventHint::SetList { index, value }; - txn.apply_local_op(inner.container_idx, op, hint, &inner.doc) + txn.apply_local_op(inner.container_idx(), op, hint, &inner.doc) } pub fn set_container(&self, pos: usize, child: H) -> LoroResult { @@ -3393,7 +3435,7 @@ impl MovableListHandler { }; let inner = self.inner.try_attached_state()?; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::List(crate::container::list::list_op::ListOp::Set { elem_id: elem_id.to_id(), value: v.clone(), @@ -3456,7 +3498,7 @@ impl MovableListHandler { let inner = self.inner.try_attached_state()?; for (id, op_pos) in ids.into_iter().zip(new_poses.into_iter()) { txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::List(ListOp::Delete(DeleteSpanWithId::new( id, op_pos as isize, @@ -3542,7 +3584,7 @@ impl MovableListHandler { .state .lock() .unwrap() - .get_container_deep_value_with_id(inner.container_idx, None) + .get_container_deep_value_with_id(inner.container_idx(), None) } pub fn get(&self, index: usize) -> Option { @@ -3794,7 +3836,7 @@ impl MapHandler { let inner = this.inner.try_attached_state()?; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::Map(crate::container::map::MapSet { key: key.into(), value: Some(value.clone()), @@ -3830,7 +3872,7 @@ impl MapHandler { let inner = self.inner.try_attached_state()?; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::Map(crate::container::map::MapSet { key: key.into(), value: Some(value.clone()), @@ -3868,7 +3910,7 @@ impl MapHandler { let id = txn.next_id(); let container_id = ContainerID::new_normal(id, child.kind()); txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::Map(crate::container::map::MapSet { key: key.into(), value: Some(LoroValue::Container(container_id.clone())), @@ -3897,7 +3939,7 @@ impl MapHandler { pub fn delete_with_txn(&self, txn: &mut Transaction, key: &str) -> LoroResult<()> { let inner = self.inner.try_attached_state()?; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::Map(crate::container::map::MapSet { key: key.into(), value: None, @@ -3984,7 +4026,7 @@ impl MapHandler { method: "get_deep_value_with_id", }), MaybeDetached::Attached(inner) => Ok(inner.with_doc_state(|state| { - state.get_container_deep_value_with_id(inner.container_idx, None) + state.get_container_deep_value_with_id(inner.container_idx(), None) })), } } @@ -4220,7 +4262,7 @@ pub mod counter { fn increment_with_txn(&self, txn: &mut Transaction, n: f64) -> LoroResult<()> { let inner = self.inner.try_attached_state()?; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::Counter(n), EventHint::Counter(n), &inner.doc, diff --git a/crates/loro-internal/src/handler/tree.rs b/crates/loro-internal/src/handler/tree.rs index 36e5829e2..d96189f5b 100644 --- a/crates/loro-internal/src/handler/tree.rs +++ b/crates/loro-internal/src/handler/tree.rs @@ -309,7 +309,7 @@ impl TreeHandler { } }; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::Tree(Arc::new(TreeOp::Delete { target })), EventHint::Tree(smallvec![TreeDiffItem { target, @@ -409,7 +409,7 @@ impl TreeHandler { let inner = self.inner.try_attached_state()?; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::Tree(Arc::new(TreeOp::Create { target, parent: parent.tree_id(), @@ -492,7 +492,7 @@ impl TreeHandler { a.with_txn(|txn| { let inner = self.inner.try_attached_state()?; txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::Tree(Arc::new(TreeOp::Move { target, parent: parent.tree_id(), @@ -671,7 +671,7 @@ impl TreeHandler { position: FractionalIndex, ) -> LoroResult { txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::Tree(Arc::new(TreeOp::Create { target: tree_id, parent: parent.tree_id(), @@ -702,7 +702,7 @@ impl TreeHandler { old_index: usize, ) -> LoroResult<()> { txn.apply_local_op( - inner.container_idx, + inner.container_idx(), crate::op::RawOpContent::Tree(Arc::new(TreeOp::Move { target, parent: parent.tree_id(), From b2def002ae438878b81ab8420b9837adfb7b9af7 Mon Sep 17 00:00:00 2001 From: Duane Johnson Date: Thu, 22 Jan 2026 21:40:59 -0700 Subject: [PATCH 3/7] feat: add swap_internals_from to swap internal doc data --- crates/loro-internal/src/arena.rs | 38 +++++ crates/loro-internal/src/loro.rs | 72 ++++++++++ crates/loro-internal/src/oplog.rs | 38 +++++ crates/loro-internal/src/state.rs | 39 ++++++ .../src/state/container_store.rs | 10 ++ .../src/state/container_store/inner_store.rs | 8 ++ crates/loro/src/lib.rs | 48 +++++++ crates/loro/tests/loro_rust_test.rs | 131 ++++++++++++++++++ 8 files changed, 384 insertions(+) diff --git a/crates/loro-internal/src/arena.rs b/crates/loro-internal/src/arena.rs index b7ab1bf22..e63284a77 100644 --- a/crates/loro-internal/src/arena.rs +++ b/crates/loro-internal/src/arena.rs @@ -763,4 +763,42 @@ impl SharedArena { let mut slot = self.inner.parent_resolver.lock().unwrap(); *slot = resolver.map(|f| Arc::new(f) as Arc); } + + /// Swap the internal contents of this arena with another arena. + /// + /// This is used by `swap_internals_from` to atomically replace document internals + /// while preserving the `Arc` wrappers so that existing references remain valid. + /// + /// Note: The `str` arena is shared via `Arc` and is NOT swapped - both arenas + /// will continue to share the same string storage after this call. + pub(crate) fn swap_contents_with(&self, other: &SharedArena) { + // Lock all mutexes in a consistent order to avoid deadlocks + let mut self_idx_to_id = self.inner.container_idx_to_id.lock().unwrap(); + let mut other_idx_to_id = other.inner.container_idx_to_id.lock().unwrap(); + std::mem::swap(&mut *self_idx_to_id, &mut *other_idx_to_id); + + let mut self_depth = self.inner.depth.lock().unwrap(); + let mut other_depth = other.inner.depth.lock().unwrap(); + std::mem::swap(&mut *self_depth, &mut *other_depth); + + let mut self_id_to_idx = self.inner.container_id_to_idx.lock().unwrap(); + let mut other_id_to_idx = other.inner.container_id_to_idx.lock().unwrap(); + std::mem::swap(&mut *self_id_to_idx, &mut *other_id_to_idx); + + let mut self_parents = self.inner.parents.lock().unwrap(); + let mut other_parents = other.inner.parents.lock().unwrap(); + std::mem::swap(&mut *self_parents, &mut *other_parents); + + let mut self_values = self.inner.values.lock().unwrap(); + let mut other_values = other.inner.values.lock().unwrap(); + std::mem::swap(&mut *self_values, &mut *other_values); + + let mut self_root_c_idx = self.inner.root_c_idx.lock().unwrap(); + let mut other_root_c_idx = other.inner.root_c_idx.lock().unwrap(); + std::mem::swap(&mut *self_root_c_idx, &mut *other_root_c_idx); + + // Note: We intentionally do NOT swap the str arena or parent_resolver. + // The str arena is append-only and shared, and the parent_resolver + // is document-specific configuration that should stay with the original doc. + } } diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index a51e93b4c..db3888dd4 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -2017,6 +2017,78 @@ impl LoroDoc { .hide_empty_root_containers .store(hide, std::sync::atomic::Ordering::Relaxed); } + + /// Swap the internal data of this document with another document. + /// + /// This method atomically replaces the document's internal state (OpLog, DocState, Arena) + /// with the contents from another document, while preserving: + /// - The `Arc` wrappers (so existing references remain valid) + /// - Subscriptions and observers + /// - Configuration + /// - Peer ID + /// + /// After this call: + /// - `self` will contain the data that was in `other` + /// - `other` will contain the data that was in `self` + /// - The arena generation counter is bumped, invalidating cached `ContainerIdx` in handlers + /// + /// # Use Case + /// + /// This is primarily used by `replace_with_shallow` to atomically replace a document's + /// contents with a trimmed shallow snapshot while preserving all external references. + /// + /// # Panics + /// + /// - Panics if either document is in a transaction + /// - Panics if either document has an uncommitted change + /// + /// # Example + /// + /// ```ignore + /// // Create a shallow snapshot and swap it in + /// let shallow_bytes = doc.export(ExportMode::shallow_snapshot(&frontiers))?; + /// let temp_doc = LoroDoc::new(); + /// temp_doc.import(&shallow_bytes)?; + /// doc.swap_internals_from(&temp_doc); + /// // Now doc contains the shallow snapshot, and temp_doc contains the old data + /// ``` + pub fn swap_internals_from(&self, other: &LoroDoc) { + // Ensure no transactions are active + let (self_options, _self_guard) = self.implicit_commit_then_stop(); + let (other_options, _other_guard) = other.implicit_commit_then_stop(); + + // Lock all components in a consistent order to avoid deadlocks + // Order: oplog -> state (following LockKind ordering) + let mut self_oplog = self.oplog.lock().unwrap(); + let mut other_oplog = other.oplog.lock().unwrap(); + let mut self_state = self.state.lock().unwrap(); + let mut other_state = other.state.lock().unwrap(); + + // Swap OpLog contents + self_oplog.swap_data_with(&mut other_oplog); + + // Swap DocState contents + self_state.swap_data_with(&mut other_state); + + // Swap Arena contents + self.arena.swap_contents_with(&other.arena); + + // Drop locks before bumping generation + drop(self_state); + drop(other_state); + drop(self_oplog); + drop(other_oplog); + + // Bump arena generation to invalidate cached ContainerIdx in handlers + self.bump_arena_generation(); + other.bump_arena_generation(); + + // Restore auto-commit if it was enabled + drop(_self_guard); + drop(_other_guard); + self.renew_txn_if_auto_commit(self_options); + other.renew_txn_if_auto_commit(other_options); + } } // FIXME: PERF: This method is quite slow because it iterates all the changes diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index a85a67e18..d331f6f00 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -621,6 +621,44 @@ impl OpLog { max_timestamp } + + /// Swap the data-related contents of this OpLog with another. + /// + /// This swaps: + /// - `dag` - the AppDag containing version history + /// - `change_store` - the store containing all changes + /// - `history_cache` - the history cache + /// - `pending_changes` - pending changes waiting for deps + /// + /// This does NOT swap (these stay with the original doc): + /// - `arena` - arena contents are swapped separately via `SharedArena::swap_contents_with` + /// - `batch_importing` - stays with original doc + /// - `configure` - configuration stays with original doc + /// - `uncommitted_change` - must be None during swap + /// + /// # Panics + /// + /// Panics if either OpLog has an uncommitted change. + pub(crate) fn swap_data_with(&mut self, other: &mut OpLog) { + assert!( + self.uncommitted_change.is_none(), + "Cannot swap OpLog with uncommitted change" + ); + assert!( + other.uncommitted_change.is_none(), + "Cannot swap OpLog with uncommitted change" + ); + + // Swap the data fields + std::mem::swap(&mut self.dag, &mut other.dag); + std::mem::swap(&mut self.change_store, &mut other.change_store); + std::mem::swap(&mut self.pending_changes, &mut other.pending_changes); + + // Swap history cache contents + let mut self_cache = self.history_cache.lock().unwrap(); + let mut other_cache = other.history_cache.lock().unwrap(); + std::mem::swap(&mut *self_cache, &mut *other_cache); + } } #[derive(Debug)] diff --git a/crates/loro-internal/src/state.rs b/crates/loro-internal/src/state.rs index 1b9bacf0b..f97d93029 100644 --- a/crates/loro-internal/src/state.rs +++ b/crates/loro-internal/src/state.rs @@ -1605,6 +1605,45 @@ impl DocState { pub(crate) fn shallow_root_store(&self) -> Option<&Arc> { self.store.shallow_root_store() } + + /// Swap the data-related contents of this DocState with another. + /// + /// This swaps: + /// - `frontiers` - the version frontiers + /// - `store` - the container store with all container states + /// + /// This does NOT swap (these stay with the original doc): + /// - `peer` - peer ID stays with original doc + /// - `arena` - arena contents are swapped separately via `SharedArena::swap_contents_with` + /// - `config` - configuration stays with original doc + /// - `doc` - weak reference stays with original doc + /// - `in_txn`, `changed_idx_in_txn` - transaction state stays + /// - `event_recorder` - event recording state is cleared + /// - `dead_containers_cache` - cache is cleared + /// + /// # Panics + /// + /// Panics if either DocState is in a transaction. + pub(crate) fn swap_data_with(&mut self, other: &mut DocState) { + assert!(!self.in_txn, "Cannot swap DocState while in transaction"); + assert!(!other.in_txn, "Cannot swap DocState while in transaction"); + + // Swap the data fields + std::mem::swap(&mut self.frontiers, &mut other.frontiers); + std::mem::swap(&mut self.store, &mut other.store); + + // After swapping stores, update their arena references. + // The swapped stores still reference their original arenas, so we need + // to point them to the correct (now-swapped) arenas. + self.store.set_arena(self.arena.clone()); + other.store.set_arena(other.arena.clone()); + + // Clear caches and event recording state since they're now invalid + self.event_recorder = Default::default(); + self.dead_containers_cache = Default::default(); + other.event_recorder = Default::default(); + other.dead_containers_cache = Default::default(); + } } fn create_state_(idx: ContainerIdx, config: &Configure, peer: u64) -> State { diff --git a/crates/loro-internal/src/state/container_store.rs b/crates/loro-internal/src/state/container_store.rs index f37b09694..ff44ff2e2 100644 --- a/crates/loro-internal/src/state/container_store.rs +++ b/crates/loro-internal/src/state/container_store.rs @@ -243,6 +243,16 @@ impl ContainerStore { } } + /// Update the arena reference in this store and its inner store. + /// + /// This is needed after `DocState::swap_data_with` because the swapped store + /// still references the old arena. After swapping, we need to update the arena + /// reference to point to the correct (swapped) arena. + pub(crate) fn set_arena(&mut self, arena: SharedArena) { + self.arena = arena.clone(); + self.store.set_arena(arena); + } + #[allow(unused)] fn check_eq_after_parsing(&mut self, other: &mut ContainerStore) { for (idx, container) in self.store.iter_all_containers_mut() { diff --git a/crates/loro-internal/src/state/container_store/inner_store.rs b/crates/loro-internal/src/state/container_store/inner_store.rs index 7fe01c665..99556c90a 100644 --- a/crates/loro-internal/src/state/container_store/inner_store.rs +++ b/crates/loro-internal/src/state/container_store/inner_store.rs @@ -263,4 +263,12 @@ impl InnerStore { new_store.decode(bytes).unwrap(); new_store } + + /// Update the arena reference in this store. + /// + /// This is needed after `DocState::swap_data_with` because the swapped store + /// still references the old arena. + pub(crate) fn set_arena(&mut self, arena: SharedArena) { + self.arena = arena; + } } diff --git a/crates/loro/src/lib.rs b/crates/loro/src/lib.rs index 99a5b0a33..ee750de87 100644 --- a/crates/loro/src/lib.rs +++ b/crates/loro/src/lib.rs @@ -1594,6 +1594,54 @@ impl LoroDoc { pub fn set_hide_empty_root_containers(&self, hide: bool) { self.doc.set_hide_empty_root_containers(hide); } + + /// Swap the internal data of this document with another document. + /// + /// This method atomically replaces the document's internal state (OpLog, DocState, Arena) + /// with the contents from another document, while preserving: + /// - The `Arc` wrappers (so existing references remain valid) + /// - Subscriptions and observers + /// - Configuration + /// - Peer ID + /// + /// After this call: + /// - `self` will contain the data that was in `other` + /// - `other` will contain the data that was in `self` + /// - The arena generation counter is bumped, invalidating cached `ContainerIdx` in handlers + /// + /// # Use Case + /// + /// This is primarily used by `replace_with_shallow` to atomically replace a document's + /// contents with a trimmed shallow snapshot while preserving all external references. + /// + /// # Panics + /// + /// - Panics if either document is in a transaction + /// - Panics if either document has an uncommitted change + /// + /// # Example + /// + /// ``` + /// use loro::{LoroDoc, ExportMode}; + /// + /// let doc = LoroDoc::new(); + /// doc.get_text("text").insert(0, "Hello").unwrap(); + /// doc.commit(); + /// let frontiers = doc.oplog_frontiers(); + /// let value_before = doc.get_deep_value(); + /// + /// // Create a shallow snapshot and swap it in + /// let shallow_bytes = doc.export(ExportMode::shallow_snapshot(&frontiers)).unwrap(); + /// let temp_doc = LoroDoc::new(); + /// temp_doc.import(&shallow_bytes).unwrap(); + /// doc.swap_internals_from(&temp_doc); + /// + /// // Now doc contains the shallow snapshot + /// assert_eq!(doc.get_deep_value(), value_before); + /// ``` + pub fn swap_internals_from(&self, other: &LoroDoc) { + self.doc.swap_internals_from(&other.doc); + } } /// It's used to prevent the user from implementing the trait directly. diff --git a/crates/loro/tests/loro_rust_test.rs b/crates/loro/tests/loro_rust_test.rs index 6d27cfc57..faed7ea44 100644 --- a/crates/loro/tests/loro_rust_test.rs +++ b/crates/loro/tests/loro_rust_test.rs @@ -2029,6 +2029,137 @@ fn no_dead_loop_when_subscribe_local_updates_to_each_other() { assert_eq!(doc1.get_deep_value(), doc2.get_deep_value()); } +#[test] +#[parallel] +fn test_swap_internals_from_basic() { + // Create doc1 with some data + let doc1 = LoroDoc::new(); + doc1.set_peer_id(1).unwrap(); + let text1 = doc1.get_text("text"); + text1.insert(0, "Hello from doc1").unwrap(); + doc1.commit(); + let doc1_value_before = doc1.get_deep_value(); + let doc1_vv_before = doc1.oplog_vv(); + + // Create doc2 with different data + let doc2 = LoroDoc::new(); + doc2.set_peer_id(2).unwrap(); + let text2 = doc2.get_text("text"); + text2.insert(0, "Hello from doc2").unwrap(); + doc2.get_map("map").insert("key", "value").unwrap(); + doc2.commit(); + let doc2_value_before = doc2.get_deep_value(); + let doc2_vv_before = doc2.oplog_vv(); + + // Swap internals + doc1.swap_internals_from(&doc2); + + // Verify doc1 now has doc2's data + assert_eq!(doc1.get_deep_value(), doc2_value_before); + assert_eq!(doc1.oplog_vv(), doc2_vv_before); + + // Verify doc2 now has doc1's data + assert_eq!(doc2.get_deep_value(), doc1_value_before); + assert_eq!(doc2.oplog_vv(), doc1_vv_before); + + // Verify peer IDs are preserved (not swapped) + assert_eq!(doc1.peer_id(), 1); + assert_eq!(doc2.peer_id(), 2); +} + +#[test] +#[parallel] +fn test_swap_internals_from_preserves_peer_id() { + // Test that peer IDs are preserved after swap + let doc1 = LoroDoc::new(); + doc1.set_peer_id(1).unwrap(); + let text1 = doc1.get_text("text"); + text1.insert(0, "Initial").unwrap(); + doc1.commit(); + + // Create doc2 with different data + let doc2 = LoroDoc::new(); + doc2.set_peer_id(2).unwrap(); + doc2.get_text("text").insert(0, "From doc2").unwrap(); + doc2.commit(); + + // Swap internals + doc1.swap_internals_from(&doc2); + + // Verify peer IDs are preserved (not swapped) + assert_eq!(doc1.peer_id(), 1); + assert_eq!(doc2.peer_id(), 2); + + // Verify data was swapped + assert_eq!(doc1.get_text("text").to_string(), "From doc2"); + assert_eq!(doc2.get_text("text").to_string(), "Initial"); +} + +#[test] +#[parallel] +fn test_swap_internals_from_handler_invalidation() { + let doc1 = LoroDoc::new(); + doc1.set_peer_id(1).unwrap(); + let text1 = doc1.get_text("text"); + text1.insert(0, "Hello").unwrap(); + doc1.commit(); + + // Create doc2 with different data + let doc2 = LoroDoc::new(); + doc2.set_peer_id(2).unwrap(); + doc2.get_text("text").insert(0, "World").unwrap(); + doc2.commit(); + + // Swap internals + doc1.swap_internals_from(&doc2); + + // Get a fresh handler after swap - it should work with the new data + let text_after = doc1.get_text("text"); + assert_eq!(text_after.to_string(), "World"); + + // The old handler should be invalidated (generation mismatch) + // Operations on it should fail or use the new data + // Note: The exact behavior depends on how handler invalidation is implemented +} + +#[test] +#[parallel] +fn test_swap_internals_for_shallow_snapshot() { + // This test simulates the use case for replace_with_shallow + let doc = LoroDoc::new(); + doc.set_peer_id(1).unwrap(); + + // Create some history + for i in 0..10 { + doc.get_text("text").insert(i, &format!("{}", i)).unwrap(); + doc.commit(); + } + + let frontiers = doc.oplog_frontiers(); + let value_before = doc.get_deep_value(); + + // Export a shallow snapshot + let shallow_bytes = doc + .export(ExportMode::shallow_snapshot(&frontiers)) + .unwrap(); + + // Create a temp doc and import the shallow snapshot + let temp_doc = LoroDoc::new(); + temp_doc.import(&shallow_bytes).unwrap(); + + // Verify temp doc has the same value + assert_eq!(temp_doc.get_deep_value(), value_before); + + // Swap internals - this is what replace_with_shallow would do + doc.swap_internals_from(&temp_doc); + + // Verify doc still has the same value + assert_eq!(doc.get_deep_value(), value_before); + + // Verify doc is now shallow + assert!(doc.is_shallow()); +} + /// https://github.com/loro-dev/loro/issues/490 #[test] #[parallel] From c991bb0802673d14487b0d77d8574fbb4fbf43d6 Mon Sep 17 00:00:00 2001 From: Duane Johnson Date: Thu, 22 Jan 2026 22:34:46 -0700 Subject: [PATCH 4/7] feat: replace_with_shallow loro internal function Implemented replace_with_shallow method in crates/loro-internal/src/loro.rs that: - Exports a shallow snapshot at the given frontiers - Creates a temporary document from the snapshot via LoroDoc::from_snapshot - Calls swap_internals_from to swap the temp doc's internals into self Also exposed the method in the public API at crates/loro/src/lib.rs with documentation. Key Design Decisions - Builds on existing APIs: Uses export(ExportMode::shallow_snapshot) and from_snapshot rather than implementing custom shallow logic, following zxch3n's guidance for correctness. - Shallow snapshot semantics: The method exports the current state with history trimmed to start from the given frontiers. It does NOT export the state at those frontiers - this is how Loro's shallow snapshot works. - Peer ID preservation: The original document's peer ID is preserved after the swap (handled by swap_internals_from). Tests Added - test_replace_with_shallow_basic - Verifies document becomes shallow with correct state - test_replace_with_shallow_preserves_peer_id - Peer ID preservation - test_replace_with_shallow_can_continue_editing - Can edit and sync after replace - test_replace_with_shallow_at_intermediate_version - Intermediate version handling - test_replace_with_shallow_cloned_doc_independence - Cloned docs share the swapped state --- crates/loro-internal/src/loro.rs | 51 ++++++++++++ crates/loro/src/lib.rs | 43 ++++++++++ crates/loro/tests/loro_rust_test.rs | 121 ++++++++++++++++++++++++++++ 3 files changed, 215 insertions(+) diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index db3888dd4..e25864c97 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -2089,6 +2089,57 @@ impl LoroDoc { self.renew_txn_if_auto_commit(self_options); other.renew_txn_if_auto_commit(other_options); } + + /// Replace the current document state with a shallow snapshot at the given frontiers. + /// + /// This method trims the history in place, discarding operations before the specified + /// frontiers while preserving: + /// - Subscriptions and observers + /// - Configuration + /// - Peer ID + /// - All existing references to the document + /// + /// After this call, the document will only contain history from the specified frontiers + /// onwards. The state at the frontiers becomes the new "shallow root" of the document. + /// + /// # Arguments + /// + /// * `frontiers` - The version to use as the new shallow root. All history before this + /// version will be discarded. + /// + /// # Errors + /// + /// Returns an error if: + /// - The frontiers are not found in the document's history + /// - The document cannot export a shallow snapshot at the given frontiers + /// + /// # Example + /// + /// ```ignore + /// let doc = LoroDoc::new(); + /// // ... perform many operations ... + /// + /// // Trim history to only keep operations from the current state onwards + /// let frontiers = doc.oplog_frontiers(); + /// doc.replace_with_shallow(&frontiers)?; + /// + /// // The document now has a shallow history starting from `frontiers` + /// assert!(doc.is_shallow()); + /// ``` + pub fn replace_with_shallow(&self, frontiers: &Frontiers) -> LoroResult<()> { + // Export a shallow snapshot at the given frontiers + let shallow_bytes = self + .export(ExportMode::shallow_snapshot(frontiers)) + .map_err(|e| LoroError::DecodeError(e.to_string().into()))?; + + // Create a temporary document and import the shallow snapshot + let temp_doc = LoroDoc::from_snapshot(&shallow_bytes)?; + + // Swap the internals - this preserves subscriptions, config, peer_id + self.swap_internals_from(&temp_doc); + + Ok(()) + } } // FIXME: PERF: This method is quite slow because it iterates all the changes diff --git a/crates/loro/src/lib.rs b/crates/loro/src/lib.rs index ee750de87..7a21c3898 100644 --- a/crates/loro/src/lib.rs +++ b/crates/loro/src/lib.rs @@ -1642,6 +1642,49 @@ impl LoroDoc { pub fn swap_internals_from(&self, other: &LoroDoc) { self.doc.swap_internals_from(&other.doc); } + + /// Replace the current document state with a shallow snapshot at the given frontiers. + /// + /// This method trims the history in place, discarding operations before the specified + /// frontiers while preserving: + /// - Subscriptions and observers + /// - Configuration + /// - Peer ID + /// - All existing references to the document + /// + /// After this call, the document will only contain history from the specified frontiers + /// onwards. The state at the frontiers becomes the new "shallow root" of the document. + /// + /// # Arguments + /// + /// * `frontiers` - The version to use as the new shallow root. All history before this + /// version will be discarded. + /// + /// # Errors + /// + /// Returns an error if: + /// - The frontiers are not found in the document's history + /// - The document cannot export a shallow snapshot at the given frontiers + /// + /// # Example + /// + /// ``` + /// use loro::LoroDoc; + /// + /// let doc = LoroDoc::new(); + /// doc.get_text("text").insert(0, "Hello").unwrap(); + /// doc.commit(); + /// + /// // Trim history to only keep operations from the current state onwards + /// let frontiers = doc.oplog_frontiers(); + /// doc.replace_with_shallow(&frontiers).unwrap(); + /// + /// // The document now has a shallow history starting from `frontiers` + /// assert!(doc.is_shallow()); + /// ``` + pub fn replace_with_shallow(&self, frontiers: &Frontiers) -> LoroResult<()> { + self.doc.replace_with_shallow(frontiers) + } } /// It's used to prevent the user from implementing the trait directly. diff --git a/crates/loro/tests/loro_rust_test.rs b/crates/loro/tests/loro_rust_test.rs index faed7ea44..0cf383c71 100644 --- a/crates/loro/tests/loro_rust_test.rs +++ b/crates/loro/tests/loro_rust_test.rs @@ -3699,3 +3699,124 @@ fn has_container_test() { assert!(doc.has_container(&text.id())); assert!(doc.has_container(&list.id())); } + +#[test] +#[parallel] +fn test_replace_with_shallow_basic() { + let doc = LoroDoc::new(); + doc.set_peer_id(1).unwrap(); + let text = doc.get_text("text"); + text.insert(0, "Hello").unwrap(); + doc.commit(); + let frontiers_before = doc.oplog_frontiers(); + text.insert(5, " World").unwrap(); + doc.commit(); + + // Replace with shallow at the earlier frontiers + // Note: shallow_snapshot exports current state with history trimmed to start from frontiers + let value_before = doc.get_deep_value(); + doc.replace_with_shallow(&frontiers_before).unwrap(); + + // Document should be shallow now + assert!(doc.is_shallow()); + // Value should be preserved (current state, not state at frontiers) + assert_eq!(doc.get_deep_value(), value_before); + // Shallow since should be at the frontiers_before + assert_eq!(doc.shallow_since_frontiers(), frontiers_before); +} + +#[test] +#[parallel] +fn test_replace_with_shallow_preserves_peer_id() { + let doc = LoroDoc::new(); + let original_peer_id = 12345u64; + doc.set_peer_id(original_peer_id).unwrap(); + let text = doc.get_text("text"); + text.insert(0, "Hello").unwrap(); + doc.commit(); + + let frontiers = doc.oplog_frontiers(); + doc.replace_with_shallow(&frontiers).unwrap(); + + // Peer ID should be preserved + assert_eq!(doc.peer_id(), original_peer_id); +} + +#[test] +#[parallel] +fn test_replace_with_shallow_can_continue_editing() { + let doc = LoroDoc::new(); + doc.set_peer_id(1).unwrap(); + let text = doc.get_text("text"); + text.insert(0, "Hello").unwrap(); + doc.commit(); + + let frontiers = doc.oplog_frontiers(); + doc.replace_with_shallow(&frontiers).unwrap(); + + // Should be able to continue editing after replace_with_shallow + doc.get_text("text").insert(5, " World").unwrap(); + doc.commit(); + + assert_eq!(doc.get_text("text").to_string(), "Hello World"); + + // Should be able to sync with another doc using snapshot (shallow docs need snapshot, not updates) + let doc2 = LoroDoc::new(); + let snapshot = doc.export(ExportMode::Snapshot).unwrap(); + doc2.import(&snapshot).unwrap(); + assert_eq!(doc2.get_text("text").to_string(), "Hello World"); +} + +#[test] +#[parallel] +fn test_replace_with_shallow_at_intermediate_version() { + let doc = LoroDoc::new(); + doc.set_peer_id(1).unwrap(); + let text = doc.get_text("text"); + text.insert(0, "A").unwrap(); + doc.commit(); + let frontiers_a = doc.oplog_frontiers(); + + text.insert(1, "B").unwrap(); + doc.commit(); + let frontiers_b = doc.oplog_frontiers(); + + text.insert(2, "C").unwrap(); + doc.commit(); + + // Replace at intermediate version (frontiers_a) + // Note: shallow_snapshot exports current state ("ABC") with history trimmed to start from frontiers_a + doc.replace_with_shallow(&frontiers_a).unwrap(); + + // Document should be shallow + assert!(doc.is_shallow()); + // Value should be the current state (ABC), not the state at frontiers_a + assert_eq!(doc.get_text("text").to_string(), "ABC"); + // Shallow since should be at frontiers_a + assert_eq!(doc.shallow_since_frontiers(), frontiers_a); + + // Can still checkout to frontiers_b (which is after shallow root) + doc.checkout(&frontiers_b).unwrap(); + assert_eq!(doc.get_text("text").to_string(), "AB"); +} + +#[test] +#[parallel] +fn test_replace_with_shallow_cloned_doc_independence() { + let doc = LoroDoc::new(); + doc.set_peer_id(1).unwrap(); + let text = doc.get_text("text"); + text.insert(0, "Hello").unwrap(); + doc.commit(); + + // Clone the doc (creates a reference to the same inner) + let doc_clone = doc.clone(); + + let frontiers = doc.oplog_frontiers(); + doc.replace_with_shallow(&frontiers).unwrap(); + + // Both should see the shallow state since they share the same inner + assert!(doc.is_shallow()); + assert!(doc_clone.is_shallow()); + assert_eq!(doc.get_deep_value(), doc_clone.get_deep_value()); +} From d0ece616e80a0b43be87e8c21d1d11c85eda3d74 Mon Sep 17 00:00:00 2001 From: Duane Johnson Date: Thu, 22 Jan 2026 22:45:55 -0700 Subject: [PATCH 5/7] feat: subscription container_id storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **`crates/loro-internal/src/utils/subscription.rs`**: - Added `extract_all()` method to `SubscriberSet` - extracts all active subscriptions with their callbacks, enabling migration **`crates/loro-internal/src/subscription.rs`**: - Added `container_id_map: Mutex>` to `ObserverInner` - Modified `subscribe()` to track ContainerID for each ContainerIdx - Added `get_subscription_container_id(idx)` method for retrieval 1. **Mapping at Observer level**: Keeps SubscriberSet generic while adding Loro-specific tracking 2. **ContainerIdx → ContainerID direction**: Enables looking up the stable ID when extracting subscriptions keyed by ContainerIdx 3. **Thread-safe**: Uses Mutex since subscriptions can be created from multiple threads --- crates/loro-internal/src/subscription.rs | 25 +++++++++++++++++++ .../loro-internal/src/utils/subscription.rs | 21 ++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/crates/loro-internal/src/subscription.rs b/crates/loro-internal/src/subscription.rs index a7817273f..124645655 100644 --- a/crates/loro-internal/src/subscription.rs +++ b/crates/loro-internal/src/subscription.rs @@ -12,6 +12,7 @@ use smallvec::SmallVec; use std::{collections::VecDeque, sync::Arc}; use crate::sync::Mutex; + /// The callback of the local update. pub type LocalUpdateCallback = Box) -> bool + Send + Sync + 'static>; /// The callback of the peer id change. The second argument is the next counter for the peer. @@ -31,6 +32,11 @@ impl LoroDoc { struct ObserverInner { subscriber_set: SubscriberSet, Subscriber>, queue: Arc>>, + /// Maps ContainerIdx to ContainerID for subscription migration. + /// This allows us to re-resolve subscriptions after an arena swap. + /// - Key: ContainerIdx (the current index in the arena) + /// - Value: ContainerID (the stable identifier) + container_id_map: Mutex>, } impl Default for ObserverInner { @@ -38,6 +44,7 @@ impl Default for ObserverInner { Self { subscriber_set: SubscriberSet::new(), queue: Arc::new(Mutex::new(VecDeque::new())), + container_id_map: Mutex::new(FxHashMap::default()), } } } @@ -64,6 +71,12 @@ impl Observer { pub fn subscribe(&self, id: &ContainerID, callback: Subscriber) -> Subscription { let idx = self.arena.register_container(id); let inner = &self.inner; + // Track the ContainerID for this ContainerIdx to enable subscription migration + inner + .container_id_map + .lock() + .unwrap() + .insert(idx, id.clone()); let (sub, enable) = inner.subscriber_set.insert(Some(idx), callback); enable(); sub @@ -147,6 +160,18 @@ impl Observer { true } + + /// Get the ContainerID associated with a ContainerIdx. + /// This is used for subscription migration after arena swaps. + /// Returns None for root subscriptions or if the ContainerIdx is not tracked. + pub fn get_subscription_container_id(&self, idx: ContainerIdx) -> Option { + self.inner + .container_id_map + .lock() + .unwrap() + .get(&idx) + .cloned() + } } #[cfg(test)] diff --git a/crates/loro-internal/src/utils/subscription.rs b/crates/loro-internal/src/utils/subscription.rs index 568258a1d..cfbdcc11e 100644 --- a/crates/loro-internal/src/utils/subscription.rs +++ b/crates/loro-internal/src/utils/subscription.rs @@ -442,6 +442,27 @@ where pub fn may_include(&self, emitter: &EmitterKey) -> bool { self.0.lock().unwrap().subscribers.contains_key(emitter) } + + /// Extract all active subscriptions, removing them from the set. + /// Returns a vector of (emitter_key, callback) pairs. + /// This is useful for migrating subscriptions to a new context. + pub fn extract_all(&self) -> Vec<(EmitterKey, Callback)> { + let mut lock = self.0.lock().unwrap(); + let mut result = Vec::new(); + + for (key, subscribers) in std::mem::take(&mut lock.subscribers) { + if let Either::Left(subs) = subscribers { + for (_, subscriber) in subs { + if subscriber.active.load(Ordering::Relaxed) { + result.push((key.clone(), subscriber.callback)); + } + // The InnerSubscription (_sub) is dropped here, which clears the unsubscribe callback + } + } + } + + result + } } impl Default for SubscriberSet From 2a8c952366a9040ed4f59b7dd492cfb58b061a18 Mon Sep 17 00:00:00 2001 From: Duane Johnson Date: Thu, 22 Jan 2026 23:32:34 -0700 Subject: [PATCH 6/7] feat: preserve subscriptions on replace_with_shallow Fixed subscription preservation in `replace_with_shallow` by: 1. Adding `extract_subscriptions()` and `restore_subscriptions()` methods to `Observer` 2. Modifying `DocState::swap_data_with()` to preserve the `recording_diff` state 3. Integrating subscription migration into `LoroDocInner::replace_with_shallow()` 1. **ContainerID-based migration**: Subscriptions are extracted with their `ContainerID` (stable identifier), then restored by re-resolving to new `ContainerIdx` values after the arena swap 2. **Detached subscriptions**: Restored subscriptions are detached (no unsubscribe handle returned) - the original user handles become orphaned but harmless 3. **Recording state preservation**: The `event_recorder.recording_diff` flag must be preserved across the swap, otherwise events won't be recorded and subscriptions won't fire - `test_replace_with_shallow_preserves_subscriptions` - verifies root subscriptions fire after replace - `test_replace_with_shallow_preserves_container_subscriptions` - verifies container-specific subscriptions fire after replace - [`crates/loro-internal/src/subscription.rs`](crates/loro-internal/src/subscription.rs) - added `extract_subscriptions()`, `restore_subscriptions()`, `container_id_map` tracking - [`crates/loro-internal/src/state.rs`](crates/loro-internal/src/state.rs) - fixed `swap_data_with()` to preserve recording state - [`crates/loro-internal/src/loro.rs`](crates/loro-internal/src/loro.rs) - integrated subscription migration in `replace_with_shallow()` - [`crates/loro-internal/src/utils/subscription.rs`](crates/loro-internal/src/utils/subscription.rs) - added `extract_all()` method to `SubscriberSet` --- crates/loro-internal/src/loro.rs | 12 +++- crates/loro-internal/src/state.rs | 12 ++++ crates/loro-internal/src/subscription.rs | 52 ++++++++++++++ crates/loro/tests/loro_rust_test.rs | 87 ++++++++++++++++++++++++ 4 files changed, 162 insertions(+), 1 deletion(-) diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index e25864c97..700cca570 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -2135,9 +2135,19 @@ impl LoroDoc { // Create a temporary document and import the shallow snapshot let temp_doc = LoroDoc::from_snapshot(&shallow_bytes)?; - // Swap the internals - this preserves subscriptions, config, peer_id + // Extract subscriptions before the swap so we can restore them after + // This captures the ContainerID for each subscription so we can re-resolve + // the ContainerIdx after the arena is swapped + let subscriptions = self.observer.extract_subscriptions(); + + // Swap the internals - this preserves the observer Arc, config, peer_id + // but the arena contents are swapped, invalidating ContainerIdx values self.swap_internals_from(&temp_doc); + // Restore subscriptions with the new arena + // This re-resolves ContainerIDs to new ContainerIdx values + self.observer.restore_subscriptions(subscriptions); + Ok(()) } } diff --git a/crates/loro-internal/src/state.rs b/crates/loro-internal/src/state.rs index f97d93029..0dd5fc85c 100644 --- a/crates/loro-internal/src/state.rs +++ b/crates/loro-internal/src/state.rs @@ -1628,6 +1628,10 @@ impl DocState { assert!(!self.in_txn, "Cannot swap DocState while in transaction"); assert!(!other.in_txn, "Cannot swap DocState while in transaction"); + // Remember if we were recording before the swap + let self_was_recording = self.is_recording(); + let other_was_recording = other.is_recording(); + // Swap the data fields std::mem::swap(&mut self.frontiers, &mut other.frontiers); std::mem::swap(&mut self.store, &mut other.store); @@ -1643,6 +1647,14 @@ impl DocState { self.dead_containers_cache = Default::default(); other.event_recorder = Default::default(); other.dead_containers_cache = Default::default(); + + // Restore recording state if it was active before the swap + if self_was_recording { + self.start_recording(); + } + if other_was_recording { + other.start_recording(); + } } } diff --git a/crates/loro-internal/src/subscription.rs b/crates/loro-internal/src/subscription.rs index 124645655..52ed87e1b 100644 --- a/crates/loro-internal/src/subscription.rs +++ b/crates/loro-internal/src/subscription.rs @@ -172,6 +172,58 @@ impl Observer { .get(&idx) .cloned() } + + /// Extract all subscriptions from the observer, removing them. + /// Returns a vector of (Option, Subscriber) pairs. + /// - None ContainerID means a root subscription + /// - Some(ContainerID) means a container-specific subscription + /// + /// This is used for subscription migration during `replace_with_shallow`. + pub fn extract_subscriptions(&self) -> Vec<(Option, Subscriber)> { + let extracted = self.inner.subscriber_set.extract_all(); + let container_id_map = self.inner.container_id_map.lock().unwrap(); + + extracted + .into_iter() + .map(|(key, callback)| { + let container_id = match key { + None => None, // Root subscription + Some(idx) => container_id_map.get(&idx).cloned(), + }; + (container_id, callback) + }) + .collect() + } + + /// Restore subscriptions that were previously extracted. + /// This re-registers each subscription with the current arena, + /// resolving ContainerIDs to new ContainerIdx values. + /// + /// Note: The returned Subscriptions are detached (no unsubscribe handle is returned). + /// This is intentional - the original Subscription handles held by users remain valid + /// but will no longer unsubscribe (since the original subscriptions were extracted). + pub fn restore_subscriptions(&self, subscriptions: Vec<(Option, Subscriber)>) { + let mut container_id_map = self.inner.container_id_map.lock().unwrap(); + + for (container_id, callback) in subscriptions { + match container_id { + None => { + // Root subscription - no ContainerIdx needed + let (sub, enable) = self.inner.subscriber_set.insert(None, callback); + enable(); + sub.detach(); // Detach so it stays active + } + Some(id) => { + // Container-specific subscription - resolve to new ContainerIdx + let idx = self.arena.register_container(&id); + container_id_map.insert(idx, id); + let (sub, enable) = self.inner.subscriber_set.insert(Some(idx), callback); + enable(); + sub.detach(); // Detach so it stays active + } + } + } + } } #[cfg(test)] diff --git a/crates/loro/tests/loro_rust_test.rs b/crates/loro/tests/loro_rust_test.rs index 0cf383c71..d7c6a7b8f 100644 --- a/crates/loro/tests/loro_rust_test.rs +++ b/crates/loro/tests/loro_rust_test.rs @@ -3820,3 +3820,90 @@ fn test_replace_with_shallow_cloned_doc_independence() { assert!(doc_clone.is_shallow()); assert_eq!(doc.get_deep_value(), doc_clone.get_deep_value()); } + +#[test] +#[parallel] +fn test_replace_with_shallow_preserves_subscriptions() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + let doc = LoroDoc::new(); + doc.set_peer_id(1).unwrap(); + let text = doc.get_text("text"); + text.insert(0, "Hello").unwrap(); + doc.commit(); + + // Set up a subscription + let event_count = Arc::new(AtomicUsize::new(0)); + let event_count_clone = event_count.clone(); + let _sub = doc.subscribe_root(Arc::new(move |_e| { + event_count_clone.fetch_add(1, Ordering::SeqCst); + })); + + // Verify subscription works before replace_with_shallow + doc.get_text("text").insert(5, " World").unwrap(); + doc.commit(); + assert!( + event_count.load(Ordering::SeqCst) >= 1, + "Subscription should fire before replace_with_shallow" + ); + + let count_before_replace = event_count.load(Ordering::SeqCst); + + // Replace with shallow + let frontiers = doc.oplog_frontiers(); + doc.replace_with_shallow(&frontiers).unwrap(); + + // Verify subscription still works after replace_with_shallow + doc.get_text("text").insert(11, "!").unwrap(); + doc.commit(); + + assert!( + event_count.load(Ordering::SeqCst) > count_before_replace, + "Subscription should still fire after replace_with_shallow" + ); +} + +#[test] +#[parallel] +fn test_replace_with_shallow_preserves_container_subscriptions() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + let doc = LoroDoc::new(); + doc.set_peer_id(1).unwrap(); + let text = doc.get_text("text"); + text.insert(0, "Hello").unwrap(); + doc.commit(); + + // Set up a container-specific subscription + let event_count = Arc::new(AtomicUsize::new(0)); + let event_count_clone = event_count.clone(); + let _sub = doc.subscribe( + &text.id(), + Arc::new(move |_e| { + event_count_clone.fetch_add(1, Ordering::SeqCst); + }), + ); + + // Verify subscription works before replace_with_shallow + doc.get_text("text").insert(5, " World").unwrap(); + doc.commit(); + assert!( + event_count.load(Ordering::SeqCst) >= 1, + "Container subscription should fire before replace_with_shallow" + ); + + let count_before_replace = event_count.load(Ordering::SeqCst); + + // Replace with shallow + let frontiers = doc.oplog_frontiers(); + doc.replace_with_shallow(&frontiers).unwrap(); + + // Verify subscription still works after replace_with_shallow + doc.get_text("text").insert(11, "!").unwrap(); + doc.commit(); + + assert!( + event_count.load(Ordering::SeqCst) > count_before_replace, + "Container subscription should still fire after replace_with_shallow" + ); +} From 50aef14640fdb520081f04aa511b98724eadc740 Mon Sep 17 00:00:00 2001 From: Duane Johnson Date: Fri, 23 Jan 2026 07:03:05 -0700 Subject: [PATCH 7/7] feat: add replaceWithShallow to loro-wasm --- crates/loro-wasm/index.ts | 1 + crates/loro-wasm/src/lib.rs | 31 +- .../tests/replace_with_shallow.test.ts | 851 ++++++++++++++++++ 3 files changed, 882 insertions(+), 1 deletion(-) create mode 100644 crates/loro-wasm/tests/replace_with_shallow.test.ts diff --git a/crates/loro-wasm/index.ts b/crates/loro-wasm/index.ts index 51b31103a..866f369c8 100644 --- a/crates/loro-wasm/index.ts +++ b/crates/loro-wasm/index.ts @@ -502,6 +502,7 @@ decorateMethods(LoroDoc.prototype, [ "commit", "getCursorPos", "revertTo", + "replaceWithShallow", "export", "exportJsonUpdates", "exportJsonInIdSpan", diff --git a/crates/loro-wasm/src/lib.rs b/crates/loro-wasm/src/lib.rs index c0421cc93..6fec02fbc 100644 --- a/crates/loro-wasm/src/lib.rs +++ b/crates/loro-wasm/src/lib.rs @@ -1348,6 +1348,34 @@ impl LoroDoc { self.doc.is_shallow() } + /// Replace the current document state with a shallow snapshot at the given frontiers. + /// + /// This method trims the history in place, preserving subscriptions and configuration. + /// After calling this method, the document will only contain history from the specified + /// frontiers onwards. + /// + /// @param frontiers - The frontiers to trim history to + /// + /// @example + /// ```ts + /// import { LoroDoc } from "loro-crdt"; + /// + /// const doc = new LoroDoc(); + /// const text = doc.getText("text"); + /// text.insert(0, "Hello World!"); + /// doc.commit(); + /// const frontiers = doc.frontiers(); + /// // ... more operations ... + /// // Trim history to the frontiers, discarding earlier operations + /// doc.replaceWithShallow(frontiers); + /// ``` + #[wasm_bindgen(js_name = "replaceWithShallow")] + pub fn replace_with_shallow(&self, frontiers: Vec) -> JsResult<()> { + let frontiers = ids_to_frontiers(frontiers)?; + self.doc.replace_with_shallow(&frontiers)?; + Ok(()) + } + /// The doc only contains the history since this version /// /// This is empty if the doc is not shallow. @@ -1519,7 +1547,8 @@ impl LoroDoc { let json = if IN_PRE_COMMIT_CALLBACK.with(|f| f.get()) { self.doc.export_json_in_id_span(id_span) } else { - self.doc.with_barrier(|| self.doc.export_json_in_id_span(id_span)) + self.doc + .with_barrier(|| self.doc.export_json_in_id_span(id_span)) }; let s = serde_wasm_bindgen::Serializer::new().serialize_maps_as_objects(true); let v = json diff --git a/crates/loro-wasm/tests/replace_with_shallow.test.ts b/crates/loro-wasm/tests/replace_with_shallow.test.ts new file mode 100644 index 000000000..d8f2e2f92 --- /dev/null +++ b/crates/loro-wasm/tests/replace_with_shallow.test.ts @@ -0,0 +1,851 @@ +import { describe, expect, it, vi } from "vitest"; +import { + LoroDoc, + LoroMap, + type LoroEventBatch, + type TextDiff, +} from "../bundler/index"; + +function oneMs(): Promise { + return new Promise((r) => setTimeout(r)); +} + +describe("replaceWithShallow", () => { + describe("basic functionality", () => { + it("preserves document data", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + const text = doc.getText("text"); + text.insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + text.insert(5, " World"); + doc.commit(); + + const valueBefore = doc.toJSON(); + doc.replaceWithShallow(frontiersAfterHello); + + expect(doc.toJSON()).toEqual(valueBefore); + }); + + it("makes document shallow", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + expect(doc.isShallow()).toBe(false); + + doc.replaceWithShallow(frontiersAfterHello); + + expect(doc.isShallow()).toBe(true); + }); + + it("returns correct shallowSinceVV", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersBefore = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersBefore); + + const shallowVV = doc.shallowSinceVV().toJSON(); + // "Hello" is 5 chars (counter 0-4), so shallowSinceVV is 4 + expect(shallowVV.get("1")).toBe(4); + }); + + it("preserves peer ID after replace", () => { + const doc = new LoroDoc(); + const originalPeerId = "12345"; + doc.setPeerId(originalPeerId); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + expect(doc.peerId).toBe(BigInt(originalPeerId)); + }); + }); + + describe("continued editing", () => { + it("can insert after replace", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + doc.getText("text").insert(11, "!"); + doc.commit(); + + expect(doc.getText("text").toString()).toBe("Hello World!"); + }); + + it("can sync with other documents via snapshot", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + doc.getText("text").insert(11, "!"); + doc.commit(); + + const doc2 = new LoroDoc(); + doc2.import(doc.export({ mode: "snapshot" })); + + expect(doc2.getText("text").toString()).toBe("Hello World!"); + }); + + it("can create new root container after replace", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + const newMap = doc.getMap("brandNewMap"); + newMap.set("key", "value"); + doc.commit(); + + expect(doc.getMap("brandNewMap").get("key")).toBe("value"); + }); + + it("can create nested containers after replace", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + const list = doc.getList("newList"); + const nestedMap = list.insertContainer(0, new LoroMap()); + nestedMap.set("nested", "value"); + doc.commit(); + + expect( + (doc.getList("newList").get(0) as LoroMap).get("nested"), + ).toBe("value"); + }); + + it("can create multiple new containers after replace", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + doc.getMap("map1").set("a", 1); + doc.getMap("map2").set("b", 2); + doc.getList("list1").insert(0, "x"); + doc.getText("text2").insert(0, "y"); + doc.commit(); + + expect(doc.getMap("map1").get("a")).toBe(1); + expect(doc.getMap("map2").get("b")).toBe(2); + expect(doc.getList("list1").get(0)).toBe("x"); + expect(doc.getText("text2").toString()).toBe("y"); + }); + }); + + describe("subscriptions", () => { + it("doc-level subscriptions continue to fire", async () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + let called = 0; + doc.subscribe(() => { + called += 1; + }); + + doc.getText("text").insert(0, "Hello"); + doc.commit(); + await oneMs(); + expect(called).toBeGreaterThan(0); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + await oneMs(); + + const countBefore = called; + doc.replaceWithShallow(frontiersAfterHello); + + doc.getText("text").insert(11, "!"); + doc.commit(); + await oneMs(); + expect(called).toBeGreaterThan(countBefore); + }); + + it("container subscriptions continue to fire", async () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + const text = doc.getText("text"); + let called = 0; + text.subscribe(() => { + called += 1; + }); + + text.insert(0, "Hello"); + doc.commit(); + await oneMs(); + expect(called).toBeGreaterThan(0); + const frontiersAfterHello = doc.oplogFrontiers(); + + text.insert(5, " World"); + doc.commit(); + await oneMs(); + + const countBefore = called; + doc.replaceWithShallow(frontiersAfterHello); + + doc.getText("text").insert(11, "!"); + doc.commit(); + await oneMs(); + expect(called).toBeGreaterThan(countBefore); + }); + + it("unsubscribe works after replace", async () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + let called = 0; + const unsub = doc.subscribe(() => { + called += 1; + }); + + doc.getText("text").insert(0, "Hello"); + doc.commit(); + await oneMs(); + expect(called).toBeGreaterThan(0); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + await oneMs(); + + doc.replaceWithShallow(frontiersAfterHello); + + const countAfterReplace = called; + unsub(); + + doc.getText("text").insert(11, "!"); + doc.commit(); + await oneMs(); + expect(called).toBe(countAfterReplace); + }); + + it("subscribeLocalUpdates continues to work", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + let updateCount = 0; + doc.subscribeLocalUpdates(() => { + updateCount += 1; + }); + + doc.getText("text").insert(0, "Hello"); + doc.commit(); + expect(updateCount).toBe(1); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + expect(updateCount).toBe(2); + + doc.replaceWithShallow(frontiersAfterHello); + + doc.getText("text").insert(11, "!"); + doc.commit(); + expect(updateCount).toBe(3); + }); + + it("does not trigger LORO_INTERNAL_ERROR", async () => { + const errorSpy = vi.spyOn(console, "error").mockImplementation(() => { }); + try { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.subscribe(() => { }); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + await Promise.resolve(); + + expect( + errorSpy.mock.calls.some((args) => + args.some((arg) => String(arg).includes("[LORO_INTERNAL_ERROR]")), + ), + ).toBe(false); + } finally { + errorSpy.mockRestore(); + } + }); + + it("event content is correct after replace", async () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + const text = doc.getText("text"); + text.insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + text.insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + let lastEvent: LoroEventBatch | undefined; + doc.subscribe((event) => { + lastEvent = event; + }); + + text.insert(11, "!"); + doc.commit(); + await oneMs(); + + expect(lastEvent).toBeDefined(); + if (!lastEvent) throw new Error('lastEvent should be defined') + + expect(lastEvent.events[0].target).toBe(text.id); + expect(lastEvent.events[0].path).toStrictEqual(["text"]); + expect(lastEvent.events[0].diff).toStrictEqual({ + type: "text", + diff: [{ retain: 11 }, { insert: "!" }], + } as TextDiff); + }); + + it("container subscription event content is correct after replace", async () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + const text = doc.getText("text"); + text.insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + text.insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + let lastEvent: LoroEventBatch | undefined; + text.subscribe((event) => { + lastEvent = event; + }); + + text.insert(0, "Say: "); + doc.commit(); + await oneMs(); + + expect(lastEvent).toBeDefined(); + if (!lastEvent) throw new Error('lastEvent should be defined') + + expect(lastEvent.events[0].target).toBe(text.id); + expect(lastEvent.events[0].diff).toStrictEqual({ + type: "text", + diff: [{ insert: "Say: " }], + } as TextDiff); + }); + }); + + describe("handlers", () => { + it("existing handlers remain valid", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + const text = doc.getText("text"); + text.insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + text.insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + const textAfter = doc.getText("text"); + expect(textAfter.toString()).toBe("Hello World"); + + textAfter.insert(11, "!"); + doc.commit(); + expect(textAfter.toString()).toBe("Hello World!"); + }); + + it("can get new handlers after replace", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + const map = doc.getMap("newMap"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + map.set("key", "value"); + doc.commit(); + + expect(doc.getMap("newMap").get("key")).toBe("value"); + }); + }); + + describe("versions and frontiers", () => { + it("oplogFrontiers returns correct value", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersBefore = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + const frontiersAfter = doc.oplogFrontiers(); + + doc.replaceWithShallow(frontiersBefore); + + const frontiersAfterReplace = doc.oplogFrontiers(); + expect(frontiersAfterReplace).toEqual(frontiersAfter); + }); + + it("can checkout within shallow range", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "A"); + doc.commit(); + const frontiersA = doc.oplogFrontiers(); + + doc.getText("text").insert(1, "B"); + doc.commit(); + const frontiersB = doc.oplogFrontiers(); + + doc.getText("text").insert(2, "C"); + doc.commit(); + + doc.replaceWithShallow(frontiersA); + + doc.checkout(frontiersB); + expect(doc.getText("text").toString()).toBe("AB"); + }); + + it("throws when checking out before shallow root", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "A"); + doc.commit(); + + doc.getText("text").insert(1, "B"); + doc.commit(); + const frontiersB = doc.oplogFrontiers(); + + doc.getText("text").insert(2, "C"); + doc.commit(); + + doc.replaceWithShallow(frontiersB); + + expect(() => { + doc.checkout([{ peer: "1", counter: 0 }]); + }).toThrow(); + }); + + it("revertTo before shallow root throws", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "A"); + doc.commit(); + const frontiersA = doc.oplogFrontiers(); + + doc.getText("text").insert(1, "B"); + doc.commit(); + const frontiersB = doc.oplogFrontiers(); + + doc.getText("text").insert(2, "C"); + doc.commit(); + + doc.replaceWithShallow(frontiersB); + + expect(() => { + doc.revertTo(frontiersA); + }).toThrow(); + }); + + it("revertTo within shallow range works", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "A"); + doc.commit(); + const frontiersA = doc.oplogFrontiers(); + + doc.getText("text").insert(1, "B"); + doc.commit(); + const frontiersB = doc.oplogFrontiers(); + + doc.getText("text").insert(2, "C"); + doc.commit(); + + doc.replaceWithShallow(frontiersA); + + doc.revertTo(frontiersB); + doc.commit(); + + expect(doc.getText("text").toString()).toBe("AB"); + }); + }); + + describe("export/import", () => { + it("can export snapshot after replace", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + const snapshot = doc.export({ mode: "snapshot" }); + expect(snapshot.length).toBeGreaterThan(0); + + const doc2 = new LoroDoc(); + doc2.import(snapshot); + expect(doc2.getText("text").toString()).toBe("Hello World"); + }); + + it("can export updates after replace", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + doc.getText("text").insert(11, "!"); + doc.commit(); + + const updates = doc.export({ mode: "update" }); + expect(updates.length).toBeGreaterThan(0); + }); + + it("other docs can import from replaced doc", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + doc.getText("text").insert(11, "!"); + doc.commit(); + + const doc2 = new LoroDoc(); + doc2.import(doc.export({ mode: "snapshot" })); + expect(doc2.getText("text").toString()).toBe("Hello World!"); + expect(doc2.isShallow()).toBe(true); + }); + + it("export updates from shallow doc can be imported", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " World"); + doc.commit(); + + doc.replaceWithShallow(frontiersAfterHello); + + doc.getText("text").insert(11, "!"); + doc.commit(); + + const updates = doc.export({ mode: "update" }); + + const doc2 = new LoroDoc(); + doc2.setPeerId("2"); + doc2.import(doc.export({ mode: "snapshot" })); + + expect(doc2.getText("text").toString()).toBe("Hello World!"); + expect(updates.length).toBeGreaterThan(0); + }); + }); + + describe("concurrent editing", () => { + it("shallow doc syncs with non-shallow doc via snapshot", () => { + const doc1 = new LoroDoc(); + doc1.setPeerId("1"); + doc1.getText("text").insert(0, "Hello"); + doc1.commit(); + const frontiersAfterHello = doc1.oplogFrontiers(); + + doc1.getText("text").insert(5, " there"); + doc1.commit(); + + const doc2 = new LoroDoc(); + doc2.setPeerId("2"); + doc2.import(doc1.export({ mode: "snapshot" })); + + doc1.replaceWithShallow(frontiersAfterHello); + + doc2.getText("text").insert(11, " World"); + doc2.commit(); + + doc1.import(doc2.export({ mode: "snapshot" })); + + expect(doc1.getText("text").toString()).toBe("Hello there World"); + }); + + it("non-shallow doc syncs with shallow doc via snapshot", () => { + const doc1 = new LoroDoc(); + doc1.setPeerId("1"); + doc1.getText("text").insert(0, "Hello"); + doc1.commit(); + const frontiersAfterHello = doc1.oplogFrontiers(); + + doc1.getText("text").insert(5, " there"); + doc1.commit(); + + const doc2 = new LoroDoc(); + doc2.setPeerId("2"); + doc2.import(doc1.export({ mode: "snapshot" })); + + doc1.replaceWithShallow(frontiersAfterHello); + + doc1.getText("text").insert(11, " World"); + doc1.commit(); + + doc2.import(doc1.export({ mode: "snapshot" })); + + expect(doc2.getText("text").toString()).toBe("Hello there World"); + }); + + it("two peers edit after one does replaceWithShallow", () => { + const doc1 = new LoroDoc(); + doc1.setPeerId("1"); + doc1.getText("text").insert(0, "Hello"); + doc1.commit(); + const frontiersAfterHello = doc1.oplogFrontiers(); + + doc1.getText("text").insert(5, " there"); + doc1.commit(); + + const doc2 = new LoroDoc(); + doc2.setPeerId("2"); + doc2.import(doc1.export({ mode: "snapshot" })); + + doc1.replaceWithShallow(frontiersAfterHello); + + doc1.getText("text").insert(11, "!"); + doc1.commit(); + + doc2.getText("text").insert(0, "Say: "); + doc2.commit(); + + const snapshot1 = doc1.export({ mode: "snapshot" }); + const snapshot2 = doc2.export({ mode: "snapshot" }); + + doc1.import(snapshot2); + doc2.import(snapshot1); + + expect(doc1.getText("text").toString()).toBe( + doc2.getText("text").toString(), + ); + expect(doc1.getText("text").toString()).toContain("Say:"); + expect(doc1.getText("text").toString()).toContain("Hello"); + expect(doc1.getText("text").toString()).toContain("!"); + }); + + it("conflict resolution across shallow boundary", () => { + const doc1 = new LoroDoc(); + doc1.setPeerId("1"); + doc1.getText("text").insert(0, "AB"); + doc1.commit(); + const frontiersAfterAB = doc1.oplogFrontiers(); + + doc1.getText("text").insert(2, "CD"); + doc1.commit(); + + const doc2 = new LoroDoc(); + doc2.setPeerId("2"); + doc2.import(doc1.export({ mode: "snapshot" })); + + doc1.replaceWithShallow(frontiersAfterAB); + + doc1.getText("text").insert(1, "X"); + doc1.commit(); + + doc2.getText("text").insert(1, "Y"); + doc2.commit(); + + doc1.import(doc2.export({ mode: "snapshot" })); + doc2.import(doc1.export({ mode: "snapshot" })); + + expect(doc1.getText("text").toString()).toBe( + doc2.getText("text").toString(), + ); + expect(doc1.getText("text").toString()).toContain("X"); + expect(doc1.getText("text").toString()).toContain("Y"); + }); + }); + + describe("cloned document independence", () => { + it("fork before replace creates independent doc", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + const frontiersAfterHello = doc.oplogFrontiers(); + + doc.getText("text").insert(5, " there"); + doc.commit(); + + const forked = doc.fork(); + + doc.replaceWithShallow(frontiersAfterHello); + + expect(doc.isShallow()).toBe(true); + expect(forked.isShallow()).toBe(false); + + doc.getText("text").insert(11, " World"); + doc.commit(); + expect(forked.getText("text").toString()).toBe("Hello there"); + }); + }); + + describe("size reduction", () => { + it("reduces snapshot size", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + + const text = doc.getText("text"); + text.insert(0, "Initial"); + doc.commit(); + const frontiersAfterInitial = doc.oplogFrontiers(); + + for (let i = 0; i < 10; i++) { + doc.setPeerId(BigInt(i + 1)); + text.insert(text.length, `Line ${i}\n`); + doc.commit(); + } + + const snapshotSizeBefore = doc.export({ mode: "snapshot" }).length; + + doc.replaceWithShallow(frontiersAfterInitial); + + const snapshotSizeAfter = doc.export({ mode: "snapshot" }).length; + + expect(snapshotSizeAfter).toBeLessThanOrEqual(snapshotSizeBefore); + expect(doc.isShallow()).toBe(true); + }); + + it("trimming at earlier version reduces size more", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + + const text = doc.getText("text"); + text.insert(0, "A"); + doc.commit(); + const frontiersEarly = doc.oplogFrontiers(); + + for (let i = 0; i < 10; i++) { + text.insert(text.length, `${i}`); + doc.commit(); + } + const frontiersLate = doc.oplogFrontiers(); + + const docEarly = doc.fork(); + const docLate = doc.fork(); + + docEarly.replaceWithShallow(frontiersEarly); + docLate.replaceWithShallow(frontiersLate); + + const sizeEarly = docEarly.export({ mode: "snapshot" }).length; + const sizeLate = docLate.export({ mode: "snapshot" }).length; + + expect(sizeEarly).toBeLessThanOrEqual(sizeLate); + expect(docEarly.toJSON()).toEqual(docLate.toJSON()); + }); + + it("reduces change count with multiple peers", () => { + const doc = new LoroDoc(); + + const text = doc.getText("text"); + for (let i = 0; i < 5; i++) { + doc.setPeerId(BigInt(i + 1)); + text.insert(text.length, `${i}`); + doc.commit(); + } + + const changeCountBefore = doc.changeCount(); + expect(changeCountBefore).toBeGreaterThan(1); + + // Use current frontiers - shallow snapshot collapses all changes into one + const frontiers = doc.oplogFrontiers(); + doc.replaceWithShallow(frontiers); + + const changeCountAfter = doc.changeCount(); + expect(changeCountAfter).toBe(1); + expect(changeCountAfter).toBeLessThan(changeCountBefore); + }); + }); + + describe("error cases", () => { + it("throws on invalid frontiers", () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.getText("text").insert(0, "Hello"); + doc.commit(); + + expect(() => { + doc.replaceWithShallow([{ peer: "999", counter: 100 }]); + }).toThrow(); + }); + }); +});