Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions crates/loro-internal/src/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,4 +763,161 @@ impl SharedArena {
let mut slot = self.inner.parent_resolver.lock().unwrap();
*slot = resolver.map(|f| Arc::new(f) as Arc<ParentResolver>);
}

pub fn check_is_the_same(&self, other: &Self) {
let self_guards = self.get_arena_guards();
let other_guards = other.get_arena_guards();

// Compare container_idx_to_id
assert_eq!(
*self_guards.container_idx_to_id, *other_guards.container_idx_to_id,
"container_idx_to_id mismatch"
);

// Compare container_id_to_idx
assert_eq!(
*self_guards.container_id_to_idx, *other_guards.container_id_to_idx,
"container_id_to_idx mismatch"
);

// Compare parents
assert_eq!(
*self_guards.parents, *other_guards.parents,
"parents mismatch"
);

// Compare root_c_idx
assert_eq!(
*self_guards.root_c_idx, *other_guards.root_c_idx,
"root_c_idx mismatch"
);

// Compare depth
assert_eq!(*self_guards.depth, *other_guards.depth, "depth mismatch");
}

/// Clone container mappings to a new arena, preserving indices
pub fn clone_container_mappings_to(&self, target: &SharedArena) {
let source_ids = self.inner.container_idx_to_id.lock().unwrap();
let source_parents = self.inner.parents.lock().unwrap();

target.with_guards(|guards| {
// Register in exact same order to preserve indices
for id in source_ids.iter() {
guards.register_container(id);
}
// Copy parent relationships
for (child, parent) in source_parents.iter() {
guards.set_parent(*child, *parent);
}
});
}

pub fn swap_inner_contents(&self, other: &SharedArena) {
{
let mut a = self.inner.container_idx_to_id.lock().unwrap();
let mut b = other.inner.container_idx_to_id.lock().unwrap();
std::mem::swap(&mut *a, &mut *b);
}
{
let mut a = self.inner.depth.lock().unwrap();
let mut b = other.inner.depth.lock().unwrap();
std::mem::swap(&mut *a, &mut *b);
}
{
let mut a = self.inner.container_id_to_idx.lock().unwrap();
let mut b = other.inner.container_id_to_idx.lock().unwrap();
std::mem::swap(&mut *a, &mut *b);
}
{
let mut a = self.inner.parents.lock().unwrap();
let mut b = other.inner.parents.lock().unwrap();
std::mem::swap(&mut *a, &mut *b);
}
{
let mut a = self.inner.values.lock().unwrap();
let mut b = other.inner.values.lock().unwrap();
std::mem::swap(&mut *a, &mut *b);
}
{
let mut a = self.inner.root_c_idx.lock().unwrap();
let mut b = other.inner.root_c_idx.lock().unwrap();
std::mem::swap(&mut *a, &mut *b);
}
{
let mut a = self.inner.str.lock().unwrap();
let mut b = other.inner.str.lock().unwrap();
std::mem::swap(&mut *a, &mut *b);
}
{
let mut a = self.inner.parent_resolver.lock().unwrap();
let mut b = other.inner.parent_resolver.lock().unwrap();
std::mem::swap(&mut *a, &mut *b);
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::container::ContainerType;

#[test]
fn test_clone_container_mappings_flat() {
let arena1 = SharedArena::new();
let id1 = ContainerID::new_root("root1", ContainerType::Text);
let id2 = ContainerID::new_root("root2", ContainerType::List);
let idx1 = arena1.register_container(&id1);
let idx2 = arena1.register_container(&id2);

let arena2 = SharedArena::new();
arena1.clone_container_mappings_to(&arena2);

assert_eq!(arena2.id_to_idx(&id1), Some(idx1));
assert_eq!(arena2.id_to_idx(&id2), Some(idx2));

// Verify indices match
assert_eq!(arena2.register_container(&id1), idx1);
assert_eq!(arena2.register_container(&id2), idx2);
}

#[test]
fn test_clone_container_mappings_nested() {
let arena1 = SharedArena::new();
let root = ContainerID::new_root("root", ContainerType::Map);
let root_idx = arena1.register_container(&root);
let child = ContainerID::new_normal(crate::id::ID::new(0, 0), ContainerType::List);
let child_idx = arena1.register_container(&child);
arena1.set_parent(child_idx, Some(root_idx));

let arena2 = SharedArena::new();
arena1.clone_container_mappings_to(&arena2);

assert_eq!(arena2.id_to_idx(&root), Some(root_idx));
assert_eq!(arena2.id_to_idx(&child), Some(child_idx));
assert_eq!(arena2.get_parent(child_idx), Some(root_idx));
}

#[test]
fn test_swap_inner_contents() {
let arena1 = SharedArena::new();
let id1 = ContainerID::new_root("root1", ContainerType::Text);
arena1.register_container(&id1);
arena1.alloc_str("hello");

let arena2 = SharedArena::new();
let id2 = ContainerID::new_root("root2", ContainerType::List);
arena2.register_container(&id2);
arena2.alloc_str("world");

arena1.swap_inner_contents(&arena2);

// arena1 should have arena2's content
assert!(arena1.id_to_idx(&id2).is_some());
assert!(arena1.id_to_idx(&id1).is_none());

// arena2 should have arena1's content
assert!(arena2.id_to_idx(&id1).is_some());
assert!(arena2.id_to_idx(&id2).is_none());
}
}
97 changes: 95 additions & 2 deletions crates/loro-internal/src/loro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,89 @@ impl LoroDoc {
self._apply_diff(diff, &mut Default::default(), false)
}

pub fn replace_with_shallow(&self, frontiers: &Frontiers) -> LoroResult<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea for this part of the implementation is that internally we can first directly call the ExposedShallowSnapshot API to export a snapshot, and then import it into a new document. The new document can be created directly via the FromSnapshot method.

The subsequent steps would include:

  1. Migrate metadata and state:

    • (a) Migrate subscriptions, events, and similar components from the old document to the new one.
    • (b) Assign the Peer ID to the new document.
    • (c) Migrate all relevant state to the new document.
  2. Replace pointers:

    • (a) After the state migration is complete, directly replace the old document’s internal pointer with the pointer from the new document.
    • (b) From that point on, all state references reuse the new document, and the old internal logic of the original document can be entirely discarded.

The benefits of this approach are:

  1. Stronger overall correctness guarantees: we don’t need to introduce extensive internal logic changes.
  2. Reduced maintenance burden: if we modify too much internal logic, every future optimization would require re-examining this area to ensure it hasn’t been broken by new changes.
  3. Built on public APIs: most of the logic in this approach is based on public interfaces.

At the moment, the only real risk lies in the step mentioned above that migrates existing events. To handle this, we would need an additional API, something like replace_doc_inplace. As long as we can guarantee the correctness of this function, we can guarantee the correctness of the entire feature.

Overall, this approach should be more stable and easier to use, and it also gives us a function that we can reuse in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness is the highest priority, so I will take your advice. But I worry a little bit about speed--is creating a new document going to be as fast as mem::swap on the arena?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An additional quirk to address: my understanding of the Subscription closures is that they include the ContainerIdx (NOT the ContainerID) and therefore the indexes in the Arena have to line up perfectly if we want to keep the original Subscriptions the user has available. Unless there's a way around this?

// 1. Commit pending transaction
let (options, guard) = self.implicit_commit_then_stop();

// 2. Export shallow snapshot
let snapshot_bytes = crate::encoding::export_shallow_snapshot(self, frontiers)
.map_err(|e| LoroError::DecodeError(e.to_string().into()))?;

// 3. Create new arena and pre-populate it
let new_arena = SharedArena::new();
self.arena.clone_container_mappings_to(&new_arena);

// 4. Create new components from snapshot
let new_oplog = OpLog::new_with_arena(new_arena.clone());
let new_config = self.config.clone();
let lock_group = LoroLockGroup::new();
let new_state = DocState::new_arc(
std::sync::Weak::new(),
new_arena.clone(),
new_config.clone(),
&lock_group,
);

let new_inner = Arc::new(LoroDocInner {
oplog: Arc::new(lock_group.new_lock(new_oplog, LockKind::OpLog)),
state: new_state.clone(),
config: new_config,
detached: AtomicBool::new(false),
auto_commit: AtomicBool::new(false),
observer: Arc::new(Observer::new(new_arena.clone())),
diff_calculator: Arc::new(
lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
),
txn: Arc::new(lock_group.new_lock(None, LockKind::Txn)),
arena: new_arena.clone(),
local_update_subs: SubscriberSetWithQueue::new(),
peer_id_change_subs: SubscriberSetWithQueue::new(),
pre_commit_subs: SubscriberSetWithQueue::new(),
first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
});

let temp_doc = LoroDoc { inner: new_inner };

let parsed = crate::encoding::parse_header_and_body(&snapshot_bytes, true)?;
crate::encoding::decode_snapshot(&temp_doc, parsed.mode, parsed.body, Default::default())?;

if self.is_detached() {
temp_doc.checkout(&self.state_frontiers())?;
}

// 5. Swap components
self.arena.swap_inner_contents(&new_arena);

{
let mut oplog = self.oplog.lock().unwrap();
let mut new_oplog = temp_doc.oplog.lock().unwrap();
std::mem::swap(&mut oplog.dag, &mut new_oplog.dag);
std::mem::swap(&mut oplog.change_store, &mut new_oplog.change_store);
oplog.free_history_cache();
std::mem::swap(&mut oplog.pending_changes, &mut new_oplog.pending_changes);
std::mem::swap(
&mut oplog.uncommitted_change,
&mut new_oplog.uncommitted_change,
);
}

{
let mut state = self.state.lock().unwrap();
let mut new_state = temp_doc.state.lock().unwrap();

std::mem::swap(&mut state.frontiers, &mut new_state.frontiers);
std::mem::swap(&mut state.store, &mut new_state.store);
Comment on lines +1067 to +1071

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep ContainerStore arena aligned after replace_with_shallow

replace_with_shallow swaps state.store with the temporary document’s store but leaves state.arena untouched, after already swapping inner arena contents between self.arena and new_arena. Since ContainerStore owns its own SharedArena, the doc now has two different arena instances (state.arena vs state.store.arena). Subsequent container registrations and lookups can update only one arena, causing container index mismatches and incorrect resolution after replace_with_shallow (e.g., creating new containers or resolving paths). Consider swapping/updating the store’s arena to match state.arena or rebuilding the store in-place.

Useful? React with 👍 / 👎.

state.clear_dead_containers_cache();
}

self.free_diff_calculator();

drop(guard);
self.renew_txn_if_auto_commit(options);

Ok(())
}

/// Calculate the diff between two versions so that apply diff on a will make the state same as b.
///
/// NOTE: This method will make the doc enter the **detached mode**.
Expand Down Expand Up @@ -1365,15 +1448,25 @@ impl LoroDoc {
Ok(())
}

/// NOTE: The caller of this method should ensure the txn is locked and set to None
/// NOTE: The caller of this method should ensure the txn is committed and no
/// concurrent transaction can be started during this operation. This is typically
/// achieved by either:
/// 1. Holding the txn lock guard (from `implicit_commit_then_stop()`)
/// 2. Being called within `with_barrier()` which commits and holds the guard
///
/// The `is_locked()` assertion was removed because:
/// - `LoroMutex::is_locked()` uses `try_lock().is_err()` which is race-prone
/// and unreliable for same-thread lock detection
/// - Internal callers in `shallow_snapshot.rs` legitimately call this without
/// holding the txn lock directly, but are protected at the public API level
/// by `with_barrier()` in methods like `export()`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note: this comment needs review. It was generated by Claude, and I believe it is correct, but without expertise in this library I am not 100% sure it is correct. The reason for removing the assert!(self.txn.is_locked()) assertion below is that _checkout_without_emitting does not need to be called within a lock if it is during replace_with_shallow.

#[instrument(level = "info", skip(self))]
pub(crate) fn _checkout_without_emitting(
&self,
frontiers: &Frontiers,
to_shrink_frontiers: bool,
to_commit_then_renew: bool,
) -> Result<(), LoroError> {
assert!(self.txn.is_locked());
let from_frontiers = self.state_frontiers();
loro_common::info!(
"checkout from={:?} to={:?} cur_vv={:?}",
Expand Down
46 changes: 45 additions & 1 deletion crates/loro-internal/src/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub use change_store::{BlockChangeRef, ChangeStore};
pub struct OpLog {
pub(crate) dag: AppDag,
pub(crate) arena: SharedArena,
change_store: ChangeStore,
pub(crate) change_store: ChangeStore,
history_cache: Mutex<ContainerHistoryCache>,
/// Pending changes that haven't been applied to the dag.
/// A change can be imported only when all its deps are already imported.
Expand Down Expand Up @@ -86,6 +86,22 @@ impl OpLog {
}
}

#[inline]
pub(crate) fn new_with_arena(arena: SharedArena) -> Self {
let cfg = Configure::default();
let change_store = ChangeStore::new_mem(&arena, cfg.merge_interval_in_s.clone());
Self {
history_cache: Mutex::new(ContainerHistoryCache::new(change_store.clone(), None)),
dag: AppDag::new(change_store.clone()),
change_store,
arena,
pending_changes: Default::default(),
batch_importing: false,
configure: cfg,
uncommitted_change: None,
}
}

#[inline]
pub fn dag(&self) -> &AppDag {
&self.dag
Expand Down Expand Up @@ -621,6 +637,34 @@ impl OpLog {

max_timestamp
}

pub fn check_is_the_same(&self, other: &Self) {
self.dag.check_dag_correctness();
other.dag.check_dag_correctness();
assert_eq!(self.shallow_since_vv(), other.shallow_since_vv());
assert_eq!(self.frontiers(), other.frontiers());
assert_eq!(self.vv(), other.vv());

let self_vv = self.vv();

for (&peer, &end) in self_vv.iter() {
let start = self.shallow_since_vv().get(&peer).copied().unwrap_or(0);
if start >= end {
continue;
}
let span = IdSpan::new(peer, start, end);
let self_changes: Vec<_> = self.change_store.iter_changes(span).collect();
let other_changes: Vec<_> = other.change_store.iter_changes(span).collect();
assert_eq!(self_changes.len(), other_changes.len());
for (c1, c2) in self_changes.iter().zip(other_changes.iter()) {
assert_eq!(c1.id, c2.id);
assert_eq!(c1.lamport, c2.lamport);
assert_eq!(c1.deps, c2.deps);
assert_eq!(c1.timestamp, c2.timestamp);
assert_eq!(c1.ops.len(), c2.ops.len());
}
}
}
}

#[derive(Debug)]
Expand Down
11 changes: 11 additions & 0 deletions crates/loro-internal/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1308,13 +1308,24 @@ impl DocState {
Ok(())
}

pub(crate) fn clear_dead_containers_cache(&mut self) {
self.dead_containers_cache.clear();
}

/// Check whether two [DocState]s are the same. Panic if not.
///
/// Compared to check equality on `get_deep_value`, this function also checks the equality on richtext
/// styles and states that are not reachable from the root.
///
pub fn check_arena_is_the_same(&self, other: &Self) {
self.arena.check_is_the_same(&other.arena);
}

/// This is only used for test.
pub(crate) fn check_is_the_same(&mut self, other: &mut Self) {
assert_eq!(self.frontiers, other.frontiers, "Frontiers mismatch");
self.store.check_is_the_same(&other.store);

fn get_entries_for_state(
arena: &SharedArena,
state: &mut State,
Expand Down
11 changes: 11 additions & 0 deletions crates/loro-internal/src/state/container_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,17 @@ impl ContainerStore {
}
}
}

pub fn check_is_the_same(&self, other: &Self) {
// Compare shallow_root_store
match (&self.shallow_root_store, &other.shallow_root_store) {
(Some(a), Some(b)) => {
assert_eq!(a.shallow_root_frontiers, b.shallow_root_frontiers);
}
(None, None) => {}
_ => panic!("shallow_root_store mismatch"),
}
}
}

#[cfg(test)]
Expand Down
Loading
Loading