-
Notifications
You must be signed in to change notification settings - Fork 31
feat(storage): add Batch trait abstraction for atomic write operations #231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughAdds a batch abstraction (public Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Redis
participant Batch as Batch (RocksBatch/BinlogBatch)
participant RocksDB
Client->>Redis: write command (e.g., HSET/ZADD/LPUSH)
Redis->>Redis: prepare encoded keys & metadata
Redis->>Batch: create_batch()
Redis->>Batch: put/delete per CF (Meta/Data/Score ...)
Redis->>Batch: commit()
alt RocksBatch (standalone)
Batch->>RocksDB: db.write_opt(WriteBatch, WriteOptions)
RocksDB-->>Batch: write result
else BinlogBatch (cluster)
Batch-->>Redis: (TODO: serialize/send via Raft/log)
end
Batch-->>Redis: commit result
Redis-->>Client: response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/storage/src/redis_sets.rs (1)
2182-2252:SDIFFSTORE/*storeoperations are no longer atomic (two commits).At Line 2239-2247 (and similarly Line 2362-2375), the code commits a “clear destination” batch, then calls
self.sadd(...)which commits again. That creates an observable intermediate state (destination empty/partially written) and breaks the typical atomic semantics expected from*STORE.Suggestion: build one batch that (1) deletes old destination meta + members and (2) writes the new meta + members, then commit once. If you want to reuse
sadd, consider adding an internal helper likesadd_into_batch(&mut dyn Batch, ...)that doesn’t commit.Also applies to: 2309-2377
🤖 Fix all issues with AI agents
In @src/storage/src/batch.rs:
- Around line 47-79: The Batch trait currently exposes put/delete as infallible
and RocksBatch implementation uses assert!/expect! which can panic; change the
trait signatures for put and delete to return Result<(), ErrorType> (or make
them return Result<()> using the crate's common error type), update
RocksBatch::{put, delete} to validate column family lookup without assert/expect
and return Err(...) on invalid CF or other failures, and propagate/store any
internal errors so that commit(self: Box<Self>) returns those errors instead of
panicking; update all call sites to handle the new Result return values and
ensure commit still returns Result<()> with any accumulated error.
- Around line 38-45: Run rustfmt on the new module to resolve the formatting
warning: run `cargo fmt` (or apply rustfmt) for src/storage/src/batch.rs so the
use/import block is properly ordered and spaced (std::sync::Arc;
rocksdb::{BoundColumnFamily, WriteBatch, WriteOptions}; snafu::ResultExt;
crate::error::{Result, RocksSnafu}; crate::ColumnFamilyIndex; engine::Engine).
Ensure no extra blank lines or misaligned imports remain so CI formatting check
passes.
- Around line 166-224: BinlogBatch::commit currently returns Ok(()) while doing
nothing; change it to return an explicit not-implemented error (e.g.,
Err(Error::unimplemented or a suitable crate::error::Error variant) from the
commit method) so callers (including create_batch when it may return
BinlogBatch) cannot acknowledge writes that aren’t persisted; update the commit
implementation in the BinlogBatch impl to construct and return that explicit
error and keep the method body otherwise unchanged until Raft append logic is
implemented.
🧹 Nitpick comments (5)
src/storage/src/redis_strings.rs (1)
2107-2151: Potentially unbounded in-memory key collection forDEL/FLUSHDBpaths.At Line 2110-2151 and 2226-2262, keys are collected into a
Vecand then deleted via one batch commit. For large DBs this can spike memory and produce very largeWriteBatches.Consider chunking: delete/commit every N keys (or stream deletes directly into a batch and commit when
batch.count()reaches a threshold).Also applies to: 2226-2262
src/storage/src/redis_lists.rs (2)
319-349:lpop/rpopnow apply deletes + meta update in one batch — good.The new
keys_to_deletecollection and single batch commit (Line 341-349, 412-420, 483-491) is consistent.Also applies to: 390-420, 461-491
754-806: Batch delete/put loops are correct; consider writing directly into batch to reduce allocations.Several paths build
Vec<Vec<u8>>andVec<(Vec<u8>, Vec<u8>)>first (e.g., Line 754-806, 887-937, 1032-1073). Where feasible, you can push operations directly intobatchas you compute them to avoid duplicating key/value buffers.Also applies to: 887-937, 1032-1073
src/storage/src/redis_hashes.rs (2)
112-118: Formatting issue flagged by CI.The batch operations are correct, but the CI cargo fmt check indicates formatting differences. Run
cargo fmtto fix.🧹 Run formatter
cargo fmt --all
289-303: Helper closure pattern works but creates some duplication.The
create_new_hashclosure is duplicated acrosshset,hmset,hsetnx,hincrby, andhincrbyfloat. While the closures capture method-specificbase_meta_key, consider extracting to a shared method that takesbase_meta_keyas a parameter to reduce duplication. This is a nice-to-have refactor for future.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
src/storage/src/batch.rssrc/storage/src/lib.rssrc/storage/src/redis.rssrc/storage/src/redis_hashes.rssrc/storage/src/redis_lists.rssrc/storage/src/redis_sets.rssrc/storage/src/redis_strings.rssrc/storage/src/redis_zsets.rs
🧰 Additional context used
🧬 Code graph analysis (5)
src/storage/src/redis_sets.rs (1)
src/storage/src/batch.rs (2)
new(102-114)new(186-188)
src/storage/src/redis_strings.rs (1)
src/storage/src/batch.rs (2)
new(102-114)new(186-188)
src/storage/src/redis_lists.rs (2)
src/storage/src/batch.rs (1)
new(102-114)src/engine/src/rocksdb_engine.rs (1)
db(38-40)
src/storage/src/redis_hashes.rs (1)
src/storage/src/batch.rs (2)
new(102-114)new(186-188)
src/storage/src/redis_zsets.rs (1)
src/storage/src/batch.rs (2)
new(102-114)new(186-188)
🪛 GitHub Check: cargo fmt (ubuntu-latest)
src/storage/src/batch.rs
[warning] 40-40:
Diff in /home/runner/work/kiwi/kiwi/src/storage/src/batch.rs
src/storage/src/redis_hashes.rs
[warning] 113-113:
Diff in /home/runner/work/kiwi/kiwi/src/storage/src/redis_hashes.rs
[warning] 297-297:
Diff in /home/runner/work/kiwi/kiwi/src/storage/src/redis_hashes.rs
[warning] 622-622:
Diff in /home/runner/work/kiwi/kiwi/src/storage/src/redis_hashes.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: cargo clippy (macos-latest)
- GitHub Check: cargo clippy (windows-latest)
- GitHub Check: build and test (macos-latest)
- GitHub Check: build and test (windows-latest)
- GitHub Check: build and test (ubuntu-latest)
- GitHub Check: cargo clippy (ubuntu-latest)
- GitHub Check: lint
- GitHub Check: test-macos
- GitHub Check: test-windows
- GitHub Check: test
🔇 Additional comments (19)
src/storage/src/lib.rs (1)
42-43: Batch module wiring + re-export looks good.
mod batch;(Line 42) andpub use batch::{Batch, BinlogBatch, RocksBatch};(Line 69) are consistent with the new abstraction.Also applies to: 69-70
src/storage/src/redis_strings.rs (1)
318-325: Batch refactor for string writes is consistent and improves atomicity.The
create_batch() -> put/delete -> commit()pattern applied across setters (e.g., Line 581-588, 635-642, 1063-1079) is consistent and keeps write paths uniform.Also applies to: 389-396, 581-588, 635-642, 698-705, 759-766, 831-838, 1063-1079
src/storage/src/redis_sets.rs (1)
151-166: Batch usage in set mutators (sadd/srem/spop/smove) looks coherent.The conversions to
create_batch()withMetaCF+SetsDataCFoperations and a singlecommit()(e.g., Line 151-166, 494-512, 783-813) match the intended abstraction.Also applies to: 494-512, 627-647, 783-813
src/storage/src/redis_lists.rs (1)
105-108: Listpush_core()batching approach is clean and keeps meta+data updates in one commit.Collecting deletes/puts and committing them together (Line 236-246) aligns well with the new Batch abstraction and should preserve atomicity for each list mutation.
Also applies to: 236-246
src/storage/src/redis_zsets.rs (9)
30-30: Import cleanup looks good.The removal of
WriteBatchfrom the import is consistent with the migration to the new batch abstraction.
99-166: Batch abstraction correctly applied for existing zset updates.The migration from direct
WriteBatchto the new batch abstraction maintains atomicity - all score key deletions, member/score insertions, and meta updates are committed together.
169-189: New zset creation path correctly uses batch abstraction.The batch lifecycle (create → put → commit) is properly implemented for the new zset creation path.
357-376: Score increment batch operations are correct.The batch properly handles the atomic update of: delete old score key → put new member value → put new score key → commit.
791-843: Member removal batch operations are well-structured.The batch correctly handles multi-member deletion with proper meta update logic (delete meta if count reaches zero, otherwise update count). Statistics update is appropriately placed after successful commit.
1510-1528: Lex range removal uses correct batch pattern.The collect-then-batch approach is efficient, and all deletions plus meta updates are atomic.
1629-1647: Rank range removal batch operations are correct.Consistent pattern with other range removal methods.
1728-1746: Score range removal batch operations are correct.Consistent and correct batch usage pattern.
1177-1240: Two-phase batch approach for store operations is acceptable.The destination cleanup uses one batch (lines 1197-1227), then results are added via
zaddwhich uses its own batch. This maintains atomicity within each phase. The lock is correctly released before callingzaddto prevent deadlock.Note: In edge cases where the first batch commits but
zaddfails, the destination would be cleared but not populated. This matches Redis behavior where a failure partway through leaves partial state.src/storage/src/redis_hashes.rs (6)
23-23: Import cleanup is consistent with batch migration.Removed
WriteBatchimport as expected for the batch abstraction migration.
335-339: Batch operations for stale hash reinit are correct.The batch properly updates both meta and data in a single atomic commit.
696-704: Conditional meta update is correct but could be simplified.The meta is only updated when
new_fields_count > 0, which is correct. However, the batch is always created and committed even when no new fields are added (just updates to existing fields). This is fine for correctness but creates a batch commit for updates-only scenarios.
903-914: Integer overflow protection and batch operation are correct.The
checked_addprevents overflow, and the batch commit properly persists the incremented value.
1032-1046: Float overflow protection and batch operation are correct.The
is_finite()check properly guards against NaN/Infinity results before committing.
774-778: HSETNX batch operations are correct.Atomic commit of meta and data for stale hash reinitialization.
0531ae0 to
b86f898
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/storage/src/redis_sets.rs (1)
2206-2276: Atomicity regression risk in*STORE: destination clear and repopulate happen in multiple commits.
sdiffstore()(andclear_and_store_set()used bysinterstore/sunionstore) currently:
- deletes existing destination keys and commits
- calls
sadd()which commits againThis opens race windows (and intermediate visibility) and can violate “single command is atomic” expectations, especially since reads typically don’t take locks.
Suggested fix: do clear + repopulate in one batch commit
- // Write the batch to clear destination first - let mut batch = self.create_batch()?; - ... - batch.commit()?; - - // Now add the new members - let added = self.sadd(destination, &member_refs)?; - Ok(added) + // Clear + write new meta + write new members in ONE batch + let mut batch = self.create_batch()?; + for key_to_del in &keys_to_delete { + batch.delete(ColumnFamilyIndex::SetsDataCF, key_to_del)?; + } + if should_delete_meta { + batch.delete(ColumnFamilyIndex::MetaCF, &dest_base_meta_key)?; + } + + // Build and write destination meta + members here (avoid calling sadd()) + // - create new BaseMetaValue(DataType::Set) with count=members.len() + // - get version from ParsedSetsMetaValue::initial_meta_value() + // - for each member: MemberDataKey::new(destination, version, member).encode() and batch.put(...) + + batch.commit()?; + Ok(members.len() as i32)Also applies to: 2333-2401
🤖 Fix all issues with AI agents
In @src/storage/src/redis.rs:
- Around line 320-360: The code relies on positional indexing of self.handles
(via get_cf_handle/ColumnFamilyIndex) but initialization used filter() which can
remove missing CFs and shift positions, breaking the mapping; change the
implementation to use a stable name-based mapping instead: during DB open
populate a fixed-size Vec<Option<Arc<rocksdb::BoundColumnFamily<'_>>>> or a
HashMap<ColumnFamilyIndex, Arc<...>> keyed by ColumnFamilyIndex (or CF name) so
missing CFs are represented as None rather than removed, update get_cf_handle to
return the handle by lookup (not by index into a filtered Vec), and ensure
create_batch collects handles by calling the new get_cf_handle for each
ColumnFamilyIndex (MetaCF, HashesDataCF, SetsDataCF, ListsDataCF, ZsetsDataCF,
ZsetsScoreCF) so ordering/invariants are preserved.
🧹 Nitpick comments (4)
src/storage/src/batch.rs (1)
95-199: RocksBatch CF index validation is good; consider tightening diagnostics + avoiding repeated custom Location building.
- The “max” in the bounds error reads like an index but is currently
len; considerlen.saturating_sub(1).- You can simplify the error location to
snafu::location!()(and keep the message).src/storage/src/redis_lists.rs (1)
718-806: Nice consolidation to single-commit mutations; consider avoiding pre-staging when not needed.Several paths build
Vecof keys/puts first and then replay intobatch.*. If borrow/lifetime constraints permit, writing directly intobatchas you compute keys would save memory and copies on large lists.Also applies to: 887-937, 1032-1073
src/storage/src/redis_strings.rs (1)
2110-2151: Potential memory spike: del_key/flush_db collect all keys before deleting.For large datasets,
keys_to_deletecan become huge. Consider chunked deletion (e.g., commit every N ops andbatch.clear()), or implement a purpose-built “delete range/prefix” API in the engine later.Also applies to: 2226-2261
src/storage/src/redis_hashes.rs (1)
294-314: Consider extracting repeated helper pattern.The
create_new_hashhelper closure appears with minor variations across multiple methods (also inhsetnxat lines 780-800,hincrbyat lines 905-926, andhincrbyfloatat lines 1061-1082). While the current implementation works correctly and maintains atomicity, extracting this to a shared generic helper method could reduce duplication.Example consolidation approach
A shared method could accept a value encoder:
fn create_new_hash_with_field<F>( &self, key: &[u8], base_meta_key: &[u8], field: &[u8], encode_value: F, ) -> Result<()> where F: FnOnce() -> Vec<u8>, { let mut hashes_meta = HashesMetaValue::new(Bytes::copy_from_slice(&1u64.to_le_bytes())); hashes_meta.inner.data_type = DataType::Hash; let version = hashes_meta.update_version(); let data_key = MemberDataKey::new(key, version, field); let data_value = BaseDataValue::new(encode_value()); let mut batch = self.create_batch()?; batch.put(ColumnFamilyIndex::MetaCF, base_meta_key, &hashes_meta.encode())?; batch.put(ColumnFamilyIndex::HashesDataCF, &data_key.encode()?, &data_value.encode())?; batch.commit() }Then call sites would become:
create_new_hash_with_field(key, &base_meta_key, field, || value.to_string().into_bytes())?
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
src/storage/src/batch.rssrc/storage/src/lib.rssrc/storage/src/redis.rssrc/storage/src/redis_hashes.rssrc/storage/src/redis_lists.rssrc/storage/src/redis_sets.rssrc/storage/src/redis_strings.rssrc/storage/src/redis_zsets.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- src/storage/src/lib.rs
- src/storage/src/redis_zsets.rs
🧰 Additional context used
🧬 Code graph analysis (5)
src/storage/src/redis_sets.rs (1)
src/storage/src/batch.rs (2)
new(113-125)new(221-223)
src/storage/src/redis_lists.rs (2)
src/storage/src/batch.rs (1)
new(113-125)src/engine/src/rocksdb_engine.rs (1)
db(38-40)
src/storage/src/redis_strings.rs (1)
src/storage/src/batch.rs (2)
new(113-125)new(221-223)
src/storage/src/redis_hashes.rs (1)
src/storage/src/batch.rs (2)
new(113-125)new(221-223)
src/storage/src/batch.rs (2)
src/storage/src/redis.rs (1)
new(115-149)src/engine/src/rocksdb_engine.rs (1)
db(38-40)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: test-windows
- GitHub Check: cargo clippy (macos-latest)
🔇 Additional comments (13)
src/storage/src/batch.rs (2)
47-90: Batch trait shape looks good for “commit consumes batch” flow.
commit(self: Box<Self>)aligns withRedis::create_batch() -> Box<dyn Batch>and avoids post-commit reuse footguns.
201-265: BinlogBatch correctly fails commit to avoid silent data loss.Returning an error from
commit()is the right default until Raft integration lands.src/storage/src/redis_lists.rs (2)
105-246: push_core batching integration looks consistent (data CF ops + meta update in one commit).Order of operations (deletes, puts, meta put, commit) is coherent and keeps the metadata update in the same atomic commit.
317-349: Pop paths correctly batch element deletions with metadata update.Keeping meta written even when empty matches the comment/intent and avoids breaking lpushx/rpushx flows.
Also applies to: 388-420, 459-491
src/storage/src/redis_strings.rs (1)
318-325: Batch-based writes are consistently applied across string mutations.These conversions preserve “single logical operation → single commit” and fit the new Batch abstraction well.
Also applies to: 389-396, 581-588, 635-642, 698-705, 759-766, 831-838, 1063-1079, 1137-1153, 1212-1219, 1284-1291, 1836-1884, 1938-1945, 1979-1986
src/storage/src/redis_sets.rs (1)
151-169: Core set mutations migrated cleanly to Batch (single commit).These paths keep metadata + member updates together and look correct.
Also applies to: 472-520, 634-659, 795-837
src/storage/src/redis_hashes.rs (7)
86-122: LGTM! Efficient collect-then-batch pattern.The refactoring correctly implements the batch abstraction with a two-phase approach: first collecting keys that need deletion (lines 86-99), then performing all operations in a single atomic batch (lines 112-121). This ensures consistency between the metadata count and actual deletions.
346-407: LGTM! Batch operations correctly handle all branches.The batch abstraction is properly applied across all three logical branches:
- Stale/empty hash initialization (lines 346-357)
- Existing field updates (lines 376-382)
- New field additions (lines 396-407)
Each branch maintains atomicity guarantees and correctly coordinates metadata updates with data changes.
648-664: LGTM! Efficient multi-field batch pattern.The helper correctly consolidates all field-value pairs into a single batch operation, writing the metadata once followed by all data entries. This is more efficient than per-field batches and maintains atomicity for the entire multi-set operation.
716-755: LGTM! Two-phase batch strategy for existing hashes.The collect-then-batch pattern (lines 716-730 collect, lines 743-754 batch) correctly:
- Checks field existence before batching to determine new field count
- Accumulates all writes before committing
- Conditionally updates metadata only when new fields are added
- Commits everything atomically in a single batch
832-876: LGTM! Batch operations correctly implement set-if-not-exists semantics.Both the stale hash reinitialization (lines 832-843) and new field addition (lines 865-876) paths correctly use atomic batch operations to coordinate metadata and data updates.
959-1032: LGTM! Atomic increment operations with proper overflow handling.The batch-based implementation correctly handles:
- Integer overflow detection before committing (line 993)
- Atomic updates for existing fields (lines 1000-1006)
- Coordinated meta+data writes for new fields (lines 1021-1032)
All branches maintain transactional consistency through the batch abstraction.
1115-1191: LGTM! Atomic float increment with proper validation.The implementation correctly:
- Validates float results are finite before committing (lines 1148-1154)
- Uses atomic batch updates for existing fields (lines 1159-1165)
- Coordinates meta+data writes for new fields (lines 1180-1191)
The batch abstraction maintains consistency across all paths.
b86f898 to
5b92f55
Compare
arana-db#222) This commit introduces a Batch trait to abstract batch write operations, enabling support for both standalone (RocksDB) and future cluster (Raft) modes. Changes: - Add batch.rs module with Batch trait, RocksBatch, and BinlogBatch implementations - Add create_batch() method to Redis struct for creating batch instances - Refactor redis_strings.rs to use Batch trait (2 places) - Refactor redis_hashes.rs to use Batch trait (6 places) - Refactor redis_lists.rs to use Batch trait (7 places) - Refactor redis_sets.rs to use Batch trait (6 places) - Refactor redis_zsets.rs to use Batch trait (8 places) - Add explicit error handling for invalid column family index This abstraction allows seamless switching between direct RocksDB writes and Raft consensus-based writes in the future.
5b92f55 to
0fd2c70
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (10)
src/storage/src/redis_zsets.rs (2)
59-65: Fix score equality check inzadd(currently compares the wrong value).This condition is wrong: it checks
abs(sm.score)instead ofabs(existing_score - sm.score), so it will “skip” updates incorrectly whensm.scoreis near 0.Proposed fix
- if (existing_score, sm.score).1.abs() < f64::EPSILON { + if (existing_score - sm.score).abs() < f64::EPSILON { // Score is the same, skip continue; } else {Also applies to: 99-180, 188-213
1234-1265:zset_store_operation: delete+recreate is not atomic vs concurrent writers (lock gap + two commits).You delete destination in one commit, then (after releasing the lock) call
zadd()which does a second commit. A concurrent writer can observe/modify the destination in-between. Consider an internal “zadd without lock” (called while holding the destination lock) or building the destination contents in the same batch/commit.src/storage/src/redis_sets.rs (3)
472-520: Usesaturating_subwhen decrementing set counts.If metadata is inconsistent/corrupted,
set_meta.count() - removed_countcan underflow. Other modules already usesaturating_sub.Proposed fix
- let new_count = set_meta.count() - removed_count as u64; + let new_count = set_meta.count().saturating_sub(removed_count as u64);
2206-2271:sdiffstore: destination is cleared without a destination lock (race vs concurrent ops).You clear destination keys/meta via batch without holding the destination key lock, then later
sadd()takes the lock. This breaks the per-key lock contract used elsewhere (read-modify-write ops won’t synchronize with the clear).Sketch fix (lock destination during clear phase)
pub fn sdiffstore(&self, destination: &[u8], keys: &[&[u8]]) -> Result<i32> { + let dest_str = String::from_utf8_lossy(destination).to_string(); + let _dest_lock = ScopeRecordLock::new(self.lock_mgr.as_ref(), &dest_str); ... }
2333-2401:clear_and_store_set: Fix self-deadlock caused by nested lock acquisition
clear_and_store_setholds aScopeRecordLockon the destination key, then callsself.sadd(destination, &member_refs)which attempts to acquire the same lock again. Sinceparking_lot::Mutexis not re-entrant, this causes a self-deadlock. Refactor to either:
- Extract the core
saddlogic into a separate internal method that does not acquire the lock, then call it from bothsaddandclear_and_store_setwhile holding the lock once- Perform the
saddoperations directly withinclear_and_store_setwithout callingsaddsrc/storage/src/redis_strings.rs (3)
1106-1155:msetnxexistence check is not type-agnostic (can overwrite live non-string keys).The loop checks only
BaseKey(string) entries in MetaCF. If a key exists as hash/set/zset/list (stored underBaseMetaKey),msetnxcan incorrectly proceed and overwrite it—contradicting the comment “any live key (any type) blocks the batch”.One possible fix: reuse `get_key_type` for existence
pub fn msetnx(&self, kvs: &[(Vec<u8>, Vec<u8>)]) -> Result<bool> { - let db = self.db.as_ref().context(OptionNoneSnafu { - message: "db is not initialized".to_string(), - })?; - - let _cf = self - .get_cf_handle(ColumnFamilyIndex::MetaCF) - .context(OptionNoneSnafu { - message: "cf is not initialized".to_string(), - })?; - - // Check if any key exists and is not expired - for (key, _) in kvs { - let string_key = BaseKey::new(key); - - match db - .get_opt(&string_key.encode()?, &self.read_options) - .context(RocksSnafu)? - { - Some(val) => { - let string_value = ParsedStringsValue::new(&val[..])?; - if !string_value.is_stale() { - return Ok(false); - } - } - None => {} - } - } + // Check if any *live* key exists (any type blocks MSETNX, Redis-compatible). + for (key, _) in kvs { + if self.get_key_type(key).is_ok() { + return Ok(false); + } + }
2226-2261:flush_dbaccumulates unbounded batches on large databases; implement chunked deletes (commit every N keys) or adddelete_rangesupport to the Batch trait.The current implementation iterates all keys across multiple column families, collects them into a single
Vec, then adds all delete operations to a singleWriteBatchbefore committing. On large databases, this causes:
- Unnecessary memory allocation for the entire key vector
- Single massive write batch with thousands/millions of operations
- Increased WAL overhead and compaction pressure
RocksDB 0.23.0 provides
delete_range_cf()for efficient range deletes, but it's not exposed by theBatchtrait. Chunking deletes into smaller batches (commit every 10k–100k keys) would be a low-risk fix; alternatively, extend theBatchtrait to supportdelete_range().
2110-2151: Fix prefix-scan logic and limit batch accumulation indel_key.The current implementation has two issues:
Incorrect prefix matching:
BaseKey::encode()includes the 16-bytereserve2suffix, but composite keys (HashesDataCF, SetsDataCF, etc.) storeversionbytes immediately after the encoded user key. The checkk.starts_with(&encoded)will not match these keys because the byte sequence after the user key is[version][data], not[reserve2]. Useencode_seek_key()or a prefix withoutreserve2instead (seeredis_hashes.rs:225for the correct pattern).Unbounded memory accumulation: Collecting all matched keys into
keys_to_deletebefore deletion can exhaust memory for large composite types. Build and commit the batch incrementally instead (seeredis_multi.rs:462-468for the pattern).src/storage/src/redis_hashes.rs (1)
86-122: Fix clippy/lint failures: avoid&temp_vec()/ needless borrow-to-deref inbatch.put(...).CI is failing with “this expression creates a reference which is immediately dereferenced” at Line 119/350/400/698/751/836/869/963/1025/1119. The typical trigger here is passing
&some_vec(or&some_fn_returning_vec()) where&[u8]is expected. Bind encoded values and pass slices.Concrete fix pattern (apply to all reported lines)
- batch.put( - ColumnFamilyIndex::MetaCF, - &base_meta_key, - &meta_val.encoded(), - )?; + let encoded_meta = meta_val.encoded(); + batch.put( + ColumnFamilyIndex::MetaCF, + base_meta_key.as_slice(), + encoded_meta.as_slice(), + )?;- batch.put( - ColumnFamilyIndex::MetaCF, - &base_meta_key, - &hashes_meta.encode(), - )?; + let encoded_meta = hashes_meta.encode(); + batch.put( + ColumnFamilyIndex::MetaCF, + base_meta_key.as_slice(), + encoded_meta.as_slice(), + )?;- batch.put( - ColumnFamilyIndex::HashesDataCF, - &data_key.encode()?, - &data_value.encode(), - )?; + let encoded_key = data_key.encode()?; + let encoded_val = data_value.encode(); + batch.put( + ColumnFamilyIndex::HashesDataCF, + encoded_key.as_slice(), + encoded_val.as_slice(), + )?;Also applies to: 293-314, 346-358, 376-383, 395-408, 637-755, 779-877, 904-1033, 1060-1192
src/storage/src/redis_lists.rs (1)
1080-1154:rpoplpushis not crash-atomic (two commits); consider a single batch commit spanning both keys.Even with both key locks held,
rpop_internalcommits first andpush_corecommits second; a crash between them can lose/misplace an element. With the new Batch abstraction, it should be possible to accumulate deletes/puts for both source+destination into one batch and commit once.Also applies to: 364-429, 43-255
🤖 Fix all issues with AI agents
In @src/storage/src/batch.rs:
- Around line 56-90: The out-of-bounds error messages for ColumnFamilyIndex use
self.cf_handles.len() as the "max index" but the true maximum valid index is
self.cf_handles.len() - 1; update the error construction in the code paths that
validate cf_idx (referencing cf_handles and the Redis::create_batch() usage) to
report the maximum as self.cf_handles.len().saturating_sub(1) (or handle empty
cf_handles explicitly) so the message shows the correct highest valid index
instead of the length.
- Around line 201-264: The code relies on commented-out selection logic and can
accidentally return a BinlogBatch; add an explicit guard in Redis and
create_batch(): add a bool field like `cluster_mode` to the `Redis` struct (or
use a feature flag), then update `create_batch()` to only construct/return
`BinlogBatch` when `cluster_mode` is true and the required Raft wiring (e.g.,
append log callback) is present; otherwise always return `RocksBatch` and if
`cluster_mode` is true but append-log wiring is missing, return an immediate
error or panic rather than creating a BinlogBatch (leave `BinlogBatch::commit()`
intentional error as a secondary safeguard).
In @src/storage/src/redis_lists.rs:
- Around line 341-349: The code inconsistently handles empty lists:
lpop/rpop/rpop_internal currently preserve list metadata when the list becomes
empty while ltrim/lrem delete it, causing differing "key exists" and
lpushx/rpushx behavior; make this consistent by changing the ltrim and lrem code
paths to preserve metadata instead of deleting the MetaCF entry when a list
becomes empty (or alternatively change lpop/rpop to delete if you prefer that
semantics), updating the batch operations in the affected functions (ltrim,
lrem, and their internal helpers) so they write the parsed_meta.value() to
ColumnFamilyIndex::MetaCF rather than issuing batch.delete, and ensure the same
approach is applied to all referenced code paths (including the rpop_internal
and any other empty-list branches) so key-exists semantics match across
operations.
In @src/storage/src/redis_strings.rs:
- Around line 1063-1079: The mset implementation currently batches puts and
commits without acquiring per-key locks, which can race with ops that rely on
ScopeRecordLock (e.g., incr_decr); modify the mset function to first collect and
sort the keys (use BaseKey::new(key).encode() or the key strings), acquire a
ScopeRecordLock for each key in that sorted order (holding all locks), then
perform create_batch()/batch.put(...) for each kv and finally batch.commit(),
releasing locks after commit; ensure lock acquisition is exception-safe (release
on error) and reference ScopeRecordLock, mset, create_batch, batch.put, and
batch.commit in your change.
🧹 Nitpick comments (3)
src/raft/src/network.rs (1)
386-397: Extract duplicated message type computation into a helper method.The match expression that computes
msg_typeis duplicated betweenadd_authenticationandverify_authentication. This violates the DRY principle and creates a maintenance risk—if the message type representation needs to change, both locations must be updated consistently.♻️ Proposed refactor to eliminate duplication
Add a private helper method to
MessageEnvelope:impl MessageEnvelope { /// Get the message type representation for HMAC computation fn message_type_for_hmac(&self) -> String { match &self.message { RaftMessage::AppendEntries(_) => "AppendEntries".to_string(), RaftMessage::AppendEntriesResponse(_) => "AppendEntriesResponse".to_string(), RaftMessage::Vote(_) => "Vote".to_string(), RaftMessage::VoteResponse(_) => "VoteResponse".to_string(), RaftMessage::InstallSnapshot(_) => "InstallSnapshot".to_string(), RaftMessage::InstallSnapshotResponse(_) => "InstallSnapshotResponse".to_string(), RaftMessage::Heartbeat { from, term } => format!("Heartbeat:{}:{}", from, term), RaftMessage::HeartbeatResponse { from, success } => { format!("HeartbeatResponse:{}:{}", from, success) } } } }Then update both methods to use the helper:
pub fn add_authentication(&mut self, auth: &NodeAuth) -> RaftResult<()> { - // Use a simplified message representation for HMAC - let msg_type = match &self.message { - RaftMessage::AppendEntries(_) => "AppendEntries".to_string(), - RaftMessage::AppendEntriesResponse(_) => "AppendEntriesResponse".to_string(), - RaftMessage::Vote(_) => "Vote".to_string(), - RaftMessage::VoteResponse(_) => "VoteResponse".to_string(), - RaftMessage::InstallSnapshot(_) => "InstallSnapshot".to_string(), - RaftMessage::InstallSnapshotResponse(_) => "InstallSnapshotResponse".to_string(), - RaftMessage::Heartbeat { from, term } => format!("Heartbeat:{}:{}", from, term), - RaftMessage::HeartbeatResponse { from, success } => { - format!("HeartbeatResponse:{}:{}", from, success) - } - }; + let msg_type = self.message_type_for_hmac(); let data_for_hmac = format!( "{}:{}:{}:{}:{}", self.message_id, self.from, self.to, self.timestamp, msg_typepub fn verify_authentication(&self, auth: &NodeAuth) -> bool { if let Some(ref expected_hmac) = self.hmac { - // Recreate the same data format used for HMAC generation - let msg_type = match &self.message { - RaftMessage::AppendEntries(_) => "AppendEntries".to_string(), - RaftMessage::AppendEntriesResponse(_) => "AppendEntriesResponse".to_string(), - RaftMessage::Vote(_) => "Vote".to_string(), - RaftMessage::VoteResponse(_) => "VoteResponse".to_string(), - RaftMessage::InstallSnapshot(_) => "InstallSnapshot".to_string(), - RaftMessage::InstallSnapshotResponse(_) => "InstallSnapshotResponse".to_string(), - RaftMessage::Heartbeat { from, term } => format!("Heartbeat:{}:{}", from, term), - RaftMessage::HeartbeatResponse { from, success } => { - format!("HeartbeatResponse:{}:{}", from, success) - } - }; + let msg_type = self.message_type_for_hmac(); let data_for_hmac = format!( "{}:{}:{}:{}:{}", self.message_id, self.from, self.to, self.timestamp, msg_typeAlso applies to: 411-422
src/storage/src/lib.rs (1)
42-43: Public API expansion: consider feature-gating or sealingBinlogBatchuntil it’s real.Re-exporting
BinlogBatchmakes it part of the stable surface; if it’s intentionally a placeholder, consider hiding behind aclusterfeature or documenting “unstable/experimental” to avoid long-term API constraints.Also applies to: 69-70
src/storage/src/redis_hashes.rs (1)
293-314: Consider de-duplicating the repeatedcreate_new_hashclosures.The same “init meta + write 1+ fields via batch” logic is repeated in
hset,hmset,hsetnx,hincrby,hincrbyfloat. A small private helper (or a generic helper taking an iterator of(field,value_bytes)) would cut repetition and reduce risk of divergence.Also applies to: 637-665, 779-800, 904-927, 1060-1082
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
src/raft/src/network.rssrc/storage/src/batch.rssrc/storage/src/lib.rssrc/storage/src/redis.rssrc/storage/src/redis_hashes.rssrc/storage/src/redis_lists.rssrc/storage/src/redis_sets.rssrc/storage/src/redis_strings.rssrc/storage/src/redis_zsets.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/storage/src/redis.rs
🧰 Additional context used
🧬 Code graph analysis (4)
src/storage/src/redis_lists.rs (3)
src/engine/src/rocksdb_engine.rs (1)
db(38-40)src/storage/src/storage_impl.rs (1)
key(150-150)src/storage/src/base_key_format.rs (1)
key(88-90)
src/storage/src/redis_zsets.rs (1)
src/storage/src/batch.rs (2)
new(113-125)new(221-223)
src/storage/src/batch.rs (2)
src/storage/src/redis.rs (1)
new(115-149)src/engine/src/rocksdb_engine.rs (1)
db(38-40)
src/storage/src/redis_strings.rs (1)
src/engine/src/rocksdb_engine.rs (1)
db(38-40)
🪛 GitHub Check: cargo clippy (ubuntu-latest)
src/storage/src/redis_hashes.rs
[failure] 119-119:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 350-350:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 400-400:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 698-698:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 751-751:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 836-836:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 869-869:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 963-963:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 1025-1025:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 1119-1119:
this expression creates a reference which is immediately dereferenced by the compiler
🪛 GitHub Check: lint
src/storage/src/redis_hashes.rs
[failure] 119-119:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 350-350:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 400-400:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 698-698:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 751-751:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 836-836:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 869-869:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 963-963:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 1025-1025:
this expression creates a reference which is immediately dereferenced by the compiler
[failure] 1119-1119:
this expression creates a reference which is immediately dereferenced by the compiler
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: test-macos
- GitHub Check: test
- GitHub Check: test-windows
- GitHub Check: build and test (macos-latest)
- GitHub Check: build and test (windows-latest)
🔇 Additional comments (8)
src/storage/src/redis_zsets.rs (4)
29-31: Import change looks fine.
321-327:zincrbybatch migration looks correct (delete old score key + write new pair + commit).Also applies to: 383-410
792-877:zrembatching/commit-on-change looks good.
1473-1574: Range-removal paths migrated cleanly to batch+commit.Also applies to: 1670-1697, 1712-1800
src/storage/src/redis_sets.rs (3)
25-26: Import change looks fine.
151-169:saddbatch refactor looks correct (single commit for meta + members).
635-659:spop/smovebatching looks good; multi-key lock ordering insmoveis a nice touch.Also applies to: 795-837
src/storage/src/redis_strings.rs (1)
318-325: Single-key write paths migrated cleanly to batch+commit.Also applies to: 389-396, 581-588, 635-642, 698-705, 759-766, 831-838, 1212-1219, 1284-1291, 1836-1884, 1938-1945, 1979-1986
| pub trait Batch: Send { | ||
| /// Add a put operation to the batch. | ||
| /// | ||
| /// # Arguments | ||
| /// * `cf_idx` - The column family index to write to | ||
| /// * `key` - The key to write | ||
| /// * `value` - The value to write | ||
| /// | ||
| /// # Errors | ||
| /// Returns an error if the column family index is invalid. | ||
| fn put(&mut self, cf_idx: ColumnFamilyIndex, key: &[u8], value: &[u8]) -> Result<()>; | ||
|
|
||
| /// Add a delete operation to the batch. | ||
| /// | ||
| /// # Arguments | ||
| /// * `cf_idx` - The column family index to delete from | ||
| /// * `key` - The key to delete | ||
| /// | ||
| /// # Errors | ||
| /// Returns an error if the column family index is invalid. | ||
| fn delete(&mut self, cf_idx: ColumnFamilyIndex, key: &[u8]) -> Result<()>; | ||
|
|
||
| /// Commit all operations in the batch atomically. | ||
| /// | ||
| /// # Returns | ||
| /// * `Ok(())` - if all operations were committed successfully | ||
| /// * `Err(_)` - if the commit failed | ||
| fn commit(self: Box<Self>) -> Result<()>; | ||
|
|
||
| /// Get the number of operations in the batch. | ||
| fn count(&self) -> u32; | ||
|
|
||
| /// Clear all operations from the batch. | ||
| fn clear(&mut self); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Find ColumnFamilyIndex enum definition
echo "=== Looking for ColumnFamilyIndex enum definition ==="
rg -n "enum\s+ColumnFamilyIndex" -A 10
# 2) Find Redis::create_batch implementation
echo -e "\n=== Looking for create_batch implementation ==="
rg -n "fn\s+create_batch" -A 15
# 3) Check the error message in batch.rs
echo -e "\n=== Checking error messages in batch.rs ==="
rg -n "out of bounds" src/storage/src/batch.rs -B 2 -A 2Repository: arana-db/kiwi
Length of output: 2513
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Get the complete create_batch implementation
echo "=== Full create_batch implementation ==="
sed -n '335,365p' src/storage/src/redis.rsRepository: arana-db/kiwi
Length of output: 1266
Fix error message to accurately report the maximum valid index.
The ColumnFamilyIndex → cf_handles mapping is stable: the enum defines contiguous discriminants (0–5) and Redis::create_batch() populates handles in matching order. However, the out-of-bounds error messages at lines 134 and 163 report self.cf_handles.len() as the max index, when the maximum valid index is actually len - 1.
Error message fix
"Column family index {} out of bounds (max: {})",
idx,
- self.cf_handles.len()
+ self.cf_handles.len().saturating_sub(1)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In @src/storage/src/batch.rs around lines 56 - 90, The out-of-bounds error
messages for ColumnFamilyIndex use self.cf_handles.len() as the "max index" but
the true maximum valid index is self.cf_handles.len() - 1; update the error
construction in the code paths that validate cf_idx (referencing cf_handles and
the Redis::create_batch() usage) to report the maximum as
self.cf_handles.len().saturating_sub(1) (or handle empty cf_handles explicitly)
so the message shows the correct highest valid index instead of the length.
| /// Binlog batch implementation for cluster (Raft) mode. | ||
| /// | ||
| /// This implementation serializes operations to a binlog format and commits | ||
| /// through the Raft consensus layer. | ||
| /// | ||
| /// TODO: Implement when Raft integration is ready. | ||
| #[allow(dead_code)] | ||
| pub struct BinlogBatch { | ||
| // TODO: Add binlog entries | ||
| // entries: Vec<BinlogEntry>, | ||
| // append_log_fn: AppendLogFunction, | ||
| count: u32, | ||
| } | ||
|
|
||
| #[allow(dead_code)] | ||
| impl BinlogBatch { | ||
| /// Create a new BinlogBatch. | ||
| /// | ||
| /// # Arguments | ||
| /// * `append_log_fn` - Function to append log entries to Raft | ||
| pub fn new() -> Self { | ||
| Self { count: 0 } | ||
| } | ||
| } | ||
|
|
||
| impl Default for BinlogBatch { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl Batch for BinlogBatch { | ||
| fn put(&mut self, _cf_idx: ColumnFamilyIndex, _key: &[u8], _value: &[u8]) -> Result<()> { | ||
| // TODO: Implement when Raft integration is ready | ||
| // Create binlog entry and add to entries | ||
| self.count += 1; | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn delete(&mut self, _cf_idx: ColumnFamilyIndex, _key: &[u8]) -> Result<()> { | ||
| // TODO: Implement when Raft integration is ready | ||
| // Create binlog entry and add to entries | ||
| self.count += 1; | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn commit(self: Box<Self>) -> Result<()> { | ||
| // BinlogBatch commit is not yet implemented. | ||
| // Return an error to prevent silent data loss. | ||
| BatchSnafu { | ||
| message: "BinlogBatch commit is not implemented - Raft integration pending".to_string(), | ||
| } | ||
| .fail() | ||
| } | ||
|
|
||
| fn count(&self) -> u32 { | ||
| self.count | ||
| } | ||
|
|
||
| fn clear(&mut self) { | ||
| // TODO: Clear entries | ||
| self.count = 0; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find all references to BinlogBatch
echo "=== References to BinlogBatch ==="
rg -n "BinlogBatch" src/storage/src --type rust -A 2 -B 2
echo -e "\n=== Looking for cluster/raft mode logic ==="
rg -n "cluster|raft|Raft|mode" src/storage/src --type rust -l | head -20Repository: arana-db/kiwi
Length of output: 4282
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check the overall structure of batch.rs
echo "=== Structure of batch.rs ==="
wc -l src/storage/src/batch.rs
# Look at the entire batch.rs to understand the architecture
echo -e "\n=== Full batch.rs content ==="
cat -n src/storage/src/batch.rsRepository: arana-db/kiwi
Length of output: 10560
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Search for where batches are created and selected
echo "=== Looking for batch creation patterns ==="
rg -n "new()" src/storage/src --type rust -B 3 -A 1 | grep -E "(BinlogBatch|StorageBatch|fn new|pub fn|impl.*Batch)" -A 2 -B 2 || true
echo -e "\n=== Looking for mode-based selection ==="
rg -n "mode|Mode|select" src/storage/src --type rust -C 2 | head -50 || trueRepository: arana-db/kiwi
Length of output: 21390
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if BinlogBatch is instantiated anywhere outside of tests
echo "=== Direct BinlogBatch instantiation (excluding tests) ==="
rg -n "BinlogBatch\s*{|BinlogBatch::new|BinlogBatch::default" src/storage/src --type rust | grep -v "test_" || echo "No direct instantiations found outside of tests"
# Check cluster_storage.rs and related modules
echo -e "\n=== cluster_storage.rs content ==="
cat -n src/storage/src/cluster_storage.rs
echo -e "\n=== raft_integration.rs content ==="
cat -n src/storage/src/raft_integration.rsRepository: arana-db/kiwi
Length of output: 13220
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Look for any mode or cluster detection in Redis initialization
echo "=== Redis initialization and mode handling ==="
sed -n '100,200p' src/storage/src/redis.rs | cat -n
echo -e "\n=== Full create_batch() context ==="
sed -n '320,350p' src/storage/src/redis.rs | cat -nRepository: arana-db/kiwi
Length of output: 6110
Address the fragile guard: BinlogBatch selection should be explicitly guarded, not rely on commented code.
Currently, create_batch() always returns RocksBatch because the cluster-mode selection code (lines 336-339 in redis.rs) is commented out and the required append_log_fn field doesn't exist. While this prevents runtime usage today, the safeguard depends on commented code rather than explicit checks.
When Raft integration resumes, add a cluster-mode flag to the Redis struct and update create_batch() to gate BinlogBatch selection with an explicit check—or use a feature flag—to prevent accidental activation before Raft wiring is complete. The intentional error in BinlogBatch::commit() is good, but an early error or panic at batch creation time (if cluster mode is improperly enabled) would be stronger.
🤖 Prompt for AI Agents
In @src/storage/src/batch.rs around lines 201 - 264, The code relies on
commented-out selection logic and can accidentally return a BinlogBatch; add an
explicit guard in Redis and create_batch(): add a bool field like `cluster_mode`
to the `Redis` struct (or use a feature flag), then update `create_batch()` to
only construct/return `BinlogBatch` when `cluster_mode` is true and the required
Raft wiring (e.g., append log callback) is present; otherwise always return
`RocksBatch` and if `cluster_mode` is true but append-log wiring is missing,
return an immediate error or panic rather than creating a BinlogBatch (leave
`BinlogBatch::commit()` intentional error as a secondary safeguard).
| // Commit batch | ||
| let mut batch = self.create_batch()?; | ||
| for key_to_del in deletes { | ||
| batch.delete(ColumnFamilyIndex::ListsDataCF, &key_to_del)?; | ||
| } | ||
| for (k, v) in puts { | ||
| batch.put(ColumnFamilyIndex::ListsDataCF, &k, &v)?; | ||
| } | ||
| batch.put(ColumnFamilyIndex::MetaCF, &meta_key, &parsed_meta.value())?; | ||
| batch.commit()?; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1. Find and inspect the get_db_and_cfs macro definition
echo "=== Searching for get_db_and_cfs macro ==="
rg -n "macro_rules!\s*get_db_and_cfs" -A 15 src/
# 2. Check db.get() calls in redis_lists.rs (how meta is read)
echo -e "\n=== db.get calls in redis_lists.rs ==="
rg -n "db\.get\(" src/storage/src/redis_lists.rs -B 1 -A 1
# 3. Check MetaCF references (how meta is written)
echo -e "\n=== MetaCF references in redis_lists.rs ==="
rg -n "MetaCF|ColumnFamilyIndex::Meta" src/storage/src/redis_lists.rs | head -30Repository: arana-db/kiwi
Length of output: 3801
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if there's a get_cf method or how db is defined in context
echo "=== Checking Engine trait / db definition ==="
fd -t f "engine.rs|db.rs" src/storage/src/ | head -5
# Look for Engine trait definition and get methods
echo -e "\n=== Looking for Engine get/get_cf definitions ==="
rg -n "fn get\(" src/storage/src/ -B 2 -A 5 | head -40
rg -n "fn get_cf\(" src/storage/src/ -B 2 -A 5 | head -40Repository: arana-db/kiwi
Length of output: 1215
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check around the specific line 236-246 to understand the context
echo "=== Context around lines 236-246 in redis_lists.rs ==="
sed -n '225,260p' src/storage/src/redis_lists.rs
# Check other locations mentioned
echo -e "\n=== Context around lines 341-349 ==="
sed -n '330,360p' src/storage/src/redis_lists.rsRepository: arana-db/kiwi
Length of output: 2521
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find the type of db field - check the struct definition
echo "=== Checking db field type in redis_lists.rs ==="
rg -n "db\s*:" src/storage/src/redis_lists.rs -B 5 -A 2 | head -30
# Check the struct definition
echo -e "\n=== Looking for RedisLists/RedisImpl struct ==="
rg -n "^pub struct|^struct" src/storage/src/redis_lists.rs -A 10 | head -40
# Look at get method signature on db
echo -e "\n=== Check how db.get is called in context ==="
sed -n '65,85p' src/storage/src/redis_lists.rsRepository: arana-db/kiwi
Length of output: 105
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find the actual DB wrapper type - check imports and trait bounds
echo "=== Imports and use statements in redis_lists.rs ==="
head -50 src/storage/src/redis_lists.rs
# Check what RocksdbStorage or similar is
echo -e "\n=== Search for storage/db wrapper ==="
rg -n "type DB|struct.*DB|impl.*DB" src/storage/src/ -B 2 -A 5 | head -50Repository: arana-db/kiwi
Length of output: 2655
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# The key question: is db.get() CF-aware or always reads default CF?
# Check if there's a method that takes CF as parameter
echo "=== Checking db methods available ==="
rg -n "fn get|\.get\(" src/storage/src/redis_lists.rs | head -20
# Look for the underlying DB implementation
echo -e "\n=== Check struct definition for db field ==="
sed -n '1,100p' src/storage/src/redis_lists.rs | grep -A 50 "pub struct RedisImpl"Repository: arana-db/kiwi
Length of output: 1065
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find Engine trait definition
echo "=== Engine trait definition ==="
rg -n "trait Engine|pub fn get" --type rust -B 2 -A 5 | grep -A 20 "trait Engine"
# Check kstd or where Engine is defined
echo -e "\n=== Looking for Engine trait (may be external) ==="
rg -n "impl Engine|trait Engine" src/ -B 2 -A 10 | head -60
# Simpler: look at the actual db.get signature in context around line 513
echo -e "\n=== Looking at line 511-520 to understand db type ==="
sed -n '509,525p' src/storage/src/redis_lists.rsRepository: arana-db/kiwi
Length of output: 3566
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find how db is obtained from get_db_and_cfs macro result
echo "=== Expand get_db_and_cfs usage context ==="
sed -n '509,520p' src/storage/src/redis_lists.rs
# The key insight: check what db.get does - is there a get_cf or similar?
echo -e "\n=== Check Redis impl for methods that call db.get ==="
rg -n "db\.get\(|db\.get_cf\(" src/storage/src/ | head -20
# Look at how metadata CF handle is used after get_db_and_cfs
echo -e "\n=== Check if cfs from macro is ever used for reads ==="
rg -n "let.*cfs.*=.*get_db_and_cfs" src/storage/src/redis_lists.rs -A 5Repository: arana-db/kiwi
Length of output: 5662
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Critical finding: redis_sets.rs uses db.get_cf() but redis_lists.rs uses db.get()
# Check if Engine trait has a get_cf method
echo "=== Full Engine trait signature ==="
sed -n '27,100p' src/engine/src/engine.rs
# Check if get_cf is in RocksdbEngine impl
echo -e "\n=== RocksdbEngine methods ==="
sed -n '47,150p' src/engine/src/rocksdb_engine.rsRepository: arana-db/kiwi
Length of output: 5322
🏁 Script executed:
#!/bin/bash
set -euo progfail
# The mismatch: redis_sets uses get_cf, redis_lists does not
# Let's check one example from sets to compare
echo "=== Example from redis_sets.rs (correct pattern) ==="
sed -n '225,235p' src/storage/src/redis_sets.rs
echo -e "\n=== Corresponding example from redis_lists.rs (problematic) ==="
sed -n '800,810p' src/storage/src/redis_lists.rsRepository: arana-db/kiwi
Length of output: 110
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Let's check the specific locations to see the pattern confirmed
echo "=== Line 511 (get meta from default CF) vs batch writes to MetaCF ==="
sed -n '509,525p' src/storage/src/redis_lists.rs
echo -e "\n=== Verify: line 68 also uses db.get without CF ==="
sed -n '65,75p' src/storage/src/redis_lists.rs
echo -e "\n=== Compare with redis_sets.rs which uses get_cf correctly ==="
sed -n '226,235p' src/storage/src/redis_sets.rsRepository: arana-db/kiwi
Length of output: 1677
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Quick verification: count affected locations in redis_lists.rs
echo "=== Total count of db.get calls reading metadata ==="
rg -c "db\.get\(&meta_key\)" src/storage/src/redis_lists.rs
# Verify the macro properly returns CF handle for MetaCF case
echo -e "\n=== Confirming CF is available in macro return ==="
sed -n '59,65p' src/storage/src/redis_lists.rs
# Show one corrected example pattern from sets
echo -e "\n=== Correct pattern from redis_sets.rs ==="
sed -n '227,232p' src/storage/src/redis_sets.rsRepository: arana-db/kiwi
Length of output: 716
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Final confirmation: show side-by-side of the bug in one function
echo "=== lpush_core function: writes to MetaCF but reads from default CF ==="
sed -n '80,90p' src/storage/src/redis_lists.rs
echo "..."
sed -n '240,250p' src/storage/src/redis_lists.rs
# Verify this pattern applies across multiple functions
echo -e "\n=== lpop: same bug pattern ==="
sed -n '300,310p' src/storage/src/redis_lists.rs
echo "..."
sed -n '343,350p' src/storage/src/redis_lists.rs
# Show the correct way should be
echo -e "\n=== What it should look like (from redis_sets.rs) ==="
sed -n '209,212p' src/storage/src/redis_sets.rsRepository: arana-db/kiwi
Length of output: 1978
Critical: Metadata reads and writes target different column families.
All metadata reads use db.get(&meta_key) (default CF) while writes use batch.put(ColumnFamilyIndex::MetaCF, ...) (MetaCF). This breaks all list operations—written metadata will never be read back.
The macro obtains the MetaCF handle but it's discarded (_cfs). Follow the pattern from redis_sets.rs: obtain the CF handle from the macro and use db.get_cf(&cf_meta, &meta_key) for all metadata reads.
Affected locations (12 total reads, all functions):
- Line 68:
push_core(existence check for empty values) - Line 83:
push_core(main metadata read) - Line 302:
lpop_internal - Line 373:
rpop_internal - Line 444:
rpop_internal(internal call) - Line 513:
llen - Line 535:
lindex - Line 588:
lrange - Line 655:
ltrim - Line 724:
lrem - Line 823:
linsert - Line 968:
rpoplpush
| // Commit batch | ||
| let mut batch = self.create_batch()?; | ||
| for key_to_del in keys_to_delete { | ||
| batch.delete(ColumnFamilyIndex::ListsDataCF, &key_to_del)?; | ||
| } | ||
| // Always preserve the metadata for empty lists to allow lpushx/rpushx operations | ||
| batch.put(&meta_key, parsed_meta.value()); | ||
|
|
||
| db.write(batch).context(RocksSnafu)?; | ||
| batch.put(ColumnFamilyIndex::MetaCF, &meta_key, &parsed_meta.value())?; | ||
| batch.commit()?; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty-list handling is inconsistent (preserve meta in lpop/rpop vs delete meta elsewhere).
lpop/rpop/rpop_internal always write metadata even when the list becomes empty (Line 346 / Line 418 / Line 489), but ltrim/lrem may delete meta when empty. This can change “key exists” behavior and lpushx/rpushx semantics. Consider making empty-list deletion/preservation consistent across list ops (and aligned with expected Redis behavior for empty lists).
Also applies to: 412-420, 483-491, 794-806, 923-937
🤖 Prompt for AI Agents
In @src/storage/src/redis_lists.rs around lines 341 - 349, The code
inconsistently handles empty lists: lpop/rpop/rpop_internal currently preserve
list metadata when the list becomes empty while ltrim/lrem delete it, causing
differing "key exists" and lpushx/rpushx behavior; make this consistent by
changing the ltrim and lrem code paths to preserve metadata instead of deleting
the MetaCF entry when a list becomes empty (or alternatively change lpop/rpop to
delete if you prefer that semantics), updating the batch operations in the
affected functions (ltrim, lrem, and their internal helpers) so they write the
parsed_meta.value() to ColumnFamilyIndex::MetaCF rather than issuing
batch.delete, and ensure the same approach is applied to all referenced code
paths (including the rpop_internal and any other empty-list branches) so
key-exists semantics match across operations.
| // Use Batch for atomic operation | ||
| let mut batch = self.create_batch()?; | ||
|
|
||
| // Process all key-value pairs | ||
| for (key, value) in kvs { | ||
| let string_key = BaseKey::new(key); | ||
| let string_value = StringValue::new(value.to_owned()); | ||
| batch.put_cf(&cf, string_key.encode()?, string_value.encode()); | ||
| batch.put( | ||
| ColumnFamilyIndex::MetaCF, | ||
| &string_key.encode()?, | ||
| &string_value.encode(), | ||
| )?; | ||
| } | ||
|
|
||
| // Atomic write of all key-value pairs | ||
| db.write_opt(batch, &self.write_options) | ||
| .context(RocksSnafu)?; | ||
|
|
||
| Ok(()) | ||
| batch.commit() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mset should lock keys (otherwise it bypasses the lock contract used by read-modify-write ops).
mset writes atomically at RocksDB level, but it can interleave with other commands that rely on per-key locks for correctness (e.g., incr_decr). Consider sorting keys and acquiring ScopeRecordLocks for all of them before the batch write.
🤖 Prompt for AI Agents
In @src/storage/src/redis_strings.rs around lines 1063 - 1079, The mset
implementation currently batches puts and commits without acquiring per-key
locks, which can race with ops that rely on ScopeRecordLock (e.g., incr_decr);
modify the mset function to first collect and sort the keys (use
BaseKey::new(key).encode() or the key strings), acquire a ScopeRecordLock for
each key in that sorted order (holding all locks), then perform
create_batch()/batch.put(...) for each kv and finally batch.commit(), releasing
locks after commit; ensure lock acquisition is exception-safe (release on error)
and reference ScopeRecordLock, mset, create_batch, batch.put, and batch.commit
in your change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (6)
src/raft/src/network.rs (1)
383-405: Critical: HMAC payload doesn’t cover message body (integrity bypass).Right now
msg_typeis the only commitment toself.messagefor most variants, so an attacker can alter (e.g.)AppendEntriesfields without changingdata_for_hmac, and the HMAC would still verify. Please include the fullRaftMessage(or a digest of it) in the authenticated bytes, and centralize the construction to avoid drift.Proposed fix (commit HMAC to full message content; remove duplicated match)
impl MessageEnvelope { + fn hmac_payload(&self) -> RaftResult<Vec<u8>> { + // IMPORTANT: commit to the full message content (not just its variant name) + bincode::serialize(&(self.message_id, self.from, self.to, self.timestamp, &self.message)) + .map_err(|e| { + RaftError::Serialization(serde_json::Error::io(std::io::Error::new( + std::io::ErrorKind::InvalidData, + e.to_string(), + ))) + }) + } + /// Add authentication to the message pub fn add_authentication(&mut self, auth: &NodeAuth) -> RaftResult<()> { - // Create serializable data without HMAC for authentication - // Use a simplified message representation for HMAC - let msg_type = match &self.message { - RaftMessage::AppendEntries(_) => "AppendEntries".to_string(), - RaftMessage::AppendEntriesResponse(_) => "AppendEntriesResponse".to_string(), - RaftMessage::Vote(_) => "Vote".to_string(), - RaftMessage::VoteResponse(_) => "VoteResponse".to_string(), - RaftMessage::InstallSnapshot(_) => "InstallSnapshot".to_string(), - RaftMessage::InstallSnapshotResponse(_) => "InstallSnapshotResponse".to_string(), - RaftMessage::Heartbeat { from, term } => format!("Heartbeat:{}:{}", from, term), - RaftMessage::HeartbeatResponse { from, success } => { - format!("HeartbeatResponse:{}:{}", from, success) - } - }; - let data_for_hmac = format!( - "{}:{}:{}:{}:{}", - self.message_id, self.from, self.to, self.timestamp, msg_type - ); - - self.hmac = Some(auth.generate_hmac(data_for_hmac.as_bytes())?); + let payload = self.hmac_payload()?; + self.hmac = Some(auth.generate_hmac(&payload)?); Ok(()) } /// Verify message authentication pub fn verify_authentication(&self, auth: &NodeAuth) -> bool { if let Some(ref expected_hmac) = self.hmac { - // Recreate the same data format used for HMAC generation - let msg_type = match &self.message { - RaftMessage::AppendEntries(_) => "AppendEntries".to_string(), - RaftMessage::AppendEntriesResponse(_) => "AppendEntriesResponse".to_string(), - RaftMessage::Vote(_) => "Vote".to_string(), - RaftMessage::VoteResponse(_) => "VoteResponse".to_string(), - RaftMessage::InstallSnapshot(_) => "InstallSnapshot".to_string(), - RaftMessage::InstallSnapshotResponse(_) => "InstallSnapshotResponse".to_string(), - RaftMessage::Heartbeat { from, term } => format!("Heartbeat:{}:{}", from, term), - RaftMessage::HeartbeatResponse { from, success } => { - format!("HeartbeatResponse:{}:{}", from, success) - } - }; - let data_for_hmac = format!( - "{}:{}:{}:{}:{}", - self.message_id, self.from, self.to, self.timestamp, msg_type - ); - - auth.verify_hmac(data_for_hmac.as_bytes(), expected_hmac) + let payload = match self.hmac_payload() { + Ok(v) => v, + Err(_) => return false, + }; + auth.verify_hmac(&payload, expected_hmac) } else { false } } }Also applies to: 408-432
src/storage/src/redis_sets.rs (1)
2375-2398: Potential atomicity issue: two separate batch commits.Similar to
sdiffstore, this helper method performs two sequential batch commits: clearing the destination (lines 2375-2383 or 2387-2394) and adding members viasadd(line 2398). This breaks atomicity forsinterstoreandsunionstoreoperations that depend on this helper.Both clearing and adding operations should be combined into a single atomic batch to prevent partial state if the second operation fails.
src/storage/src/redis_zsets.rs (3)
59-65: Bug: score equality check inzaddcompares the wrong value.
if (existing_score, sm.score).1.abs() < f64::EPSILONevaluatessm.score.abs(), not(|existing_score - sm.score|). This makes “no-op when score unchanged” behave incorrectly and triggers unnecessary delete+put.Proposed fix
- if (existing_score, sm.score).1.abs() < f64::EPSILON { + if (existing_score - sm.score).abs() < f64::EPSILON { // Score is the same, skip continue; } else {Also applies to: 101-180, 188-213
321-327:zincrby: batch update is good, but lock-dropping + re-callingzaddcan break atomicity under concurrency.When meta is invalid/missing, the function drops
_lockthen callszadd, allowing another writer to interleave. Consider an internalzadd_no_lock(or a shared helper that accepts “lock already held”) to keep the whole operation linearizable.Also applies to: 383-410
1234-1265:zinterstore/zunionstore: destination delete + laterzaddis not atomic.You delete destination under a lock, release the lock, then later
zaddrepopulates. Another operation ondestinationcan slip in between. If strict Redis semantics matter here, this should be refactored to a single locked write plan.src/storage/src/redis_lists.rs (1)
319-334: Metadata decrement should match actual keys found; add validation or assertions when data keys are missing.
keys_to_deleteis populated only whenget_cfreturns a value, butpop_countunconditionally decrements the metadata count and indices. If any expected data key is absent, the count will drift even though fewer items were actually popped. The same issue exists in RPOP, RPOPLPUSH, and LREM. Either assert that all keys exist (if guaranteed by design) or adjust the count decrement to matchkeys_to_delete.len()orresult.len().Also applies to: 341-349, 390-405, 412-420, 461-476, 483-491
🧹 Nitpick comments (7)
src/storage/src/redis_strings.rs (3)
581-588: Batch migration for single-key writes looks consistent (put/delete + commit).Nice mechanical refactor: each mutation now commits through the Batch abstraction, aligning with the PR goal (standalone vs cluster).
Consider a tiny helper to reduce repetition/error-surface (create batch → put MetaCF → commit), since this pattern appears many times in this file.
Also applies to: 635-642, 698-705, 759-766, 831-838, 1212-1219, 1284-1291, 1429-1436, 1836-1843, 1856-1863, 1877-1884, 1938-1945, 1979-1986
2110-2151:del_key: good “collect then batch delete”, but watch memory/latency for large composite keys.Collecting all derived keys across CFs into
keys_to_deletecan be large and then written as one huge batch; consider chunked commits (e.g., commit every N deletions) to cap memory and avoid oversized WriteBatch.
2226-2261:flush_db: same batching concern—consider chunking or CF-level primitives.For large DBs, collecting all keys into memory is costly. If RocksDB drop/truncate CF isn’t an option here, chunked batch commits would at least bound memory.
src/storage/src/redis_lists.rs (2)
718-719:ltrim: meta delete vs update is clear; consider chunking deletes for big lists.Logic is readable and the batch write is clean. For very large lists, the delete-key accumulation can be big; chunking commits would reduce peak memory.
Also applies to: 754-757, 764-767, 773-774, 781-782, 791-792, 794-806
888-892:lrem: batch rewrite is fine; watch O(N) memory for large lists.Reads all elements and rewrites the whole list; if that’s already the intended approach, the batch refactor is consistent. If not, this is a hotspot to optimize later.
Also applies to: 898-904, 914-915, 923-937
src/storage/src/redis_hashes.rs (2)
293-314:hset: batch writes look good; consider deduplicating the “create_new_hash” logic across methods.The inline closure pattern repeats in
hset,hmset,hsetnx,hincrby,hincrbyfloat. A small private helper (e.g.,fn init_hash_with_fields(&self, ...)) would reduce drift risk.Also applies to: 346-358, 376-383, 395-408, 414-416
904-926: Error messages for numeric overflow/underflow should be consistent across string and hash operations.There are inconsistencies in error messaging:
- Integer overflow: hashes report
"integer overflow"(line 994) while strings report"increment or decrement would overflow"(strings.rs:1203)- Non-integer parse errors: hashes report
"hash value is not an integer"(line 987) while strings report"value is not an integer or out of range"(strings.rs:1191)- Float parse errors and NaN/Infinity checks are already aligned
For consistency with Redis semantics and client expectations, align these messages across both data types.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
src/raft/src/network.rssrc/storage/src/batch.rssrc/storage/src/lib.rssrc/storage/src/redis.rssrc/storage/src/redis_hashes.rssrc/storage/src/redis_lists.rssrc/storage/src/redis_sets.rssrc/storage/src/redis_strings.rssrc/storage/src/redis_zsets.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- src/storage/src/lib.rs
- src/storage/src/redis.rs
- src/storage/src/batch.rs
🧰 Additional context used
🧬 Code graph analysis (3)
src/storage/src/redis_strings.rs (2)
src/storage/src/redis.rs (1)
new(115-149)src/engine/src/rocksdb_engine.rs (1)
db(38-40)
src/storage/src/redis_hashes.rs (3)
src/storage/src/base_meta_value_format.rs (2)
new(56-63)new(103-147)src/storage/src/base_key_format.rs (1)
key(88-90)src/storage/src/redis_multi.rs (1)
exists(130-152)
src/storage/src/redis_zsets.rs (1)
src/storage/src/batch.rs (2)
new(113-125)new(221-223)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: cargo clippy (macos-latest)
- GitHub Check: build and test (macos-latest)
- GitHub Check: build and test (windows-latest)
🔇 Additional comments (15)
src/storage/src/redis_sets.rs (5)
25-25: LGTM! Clean import refactoring.The removal of
WriteBatchfrom the imports is correct and aligns with the batch abstraction introduced in this PR.
151-168: LGTM! Atomic batch operations correctly implemented.The refactoring properly uses the batch abstraction with a single atomic commit for all metadata and member updates.
472-523: LGTM! Proper batch accumulation pattern.The code correctly collects keys to delete, then applies all deletions and metadata updates in a single atomic batch commit.
635-659: LGTM! Atomic batch operations for member removal.The batch correctly accumulates all member deletions and metadata updates before committing atomically.
795-837: LGTM! Atomic cross-set move operation.The batch correctly ensures atomicity for the move operation across both source and destination sets with a single commit.
src/storage/src/redis_lists.rs (2)
105-108:push_core: delete/put ordering and meta update in the same commit looks right.Collecting
deletesthenputs, and finally writing meta in the same batch should preserve the intended atomicity for list mutations (especially when reorganizing and bumping version).Also applies to: 125-126, 156-157, 172-173, 184-185, 208-209, 224-225, 236-246
1032-1035:linsert: version bump + delete old version keys + write new version keys is a solid pattern.This avoids in-place shifting complexity and keeps atomicity via a single batch commit.
Also applies to: 1041-1042, 1056-1057, 1063-1073
src/storage/src/redis_zsets.rs (3)
30-30: Import change only.
792-798:zrem: batch delete + meta update in one commit is correct.Nice that meta removal when count hits zero is handled in the same batch.
Also applies to: 824-877
1473-1479: Range removals: batched member+score deletions + meta update are consistent.Pattern is uniform and easy to reason about after the refactor.
Also applies to: 1547-1570, 1670-1693, 1712-1718, 1773-1796
src/storage/src/redis_hashes.rs (4)
23-23: Import change only.
86-122:hdel: collect-then-batch delete is solid.Good: existence checks are done on the snapshot, then deletes + meta update are committed together via Batch.
637-665:hmset: good batching and correct “update meta only if new fields added” optimization.
data_to_write+ conditional meta put keeps writes minimal while still committing atomically.Also applies to: 694-711, 715-755, 758-759
779-800:hsetnx: batch path matches expected semantics.The new-field path updates meta + data in one commit; existing field returns early without writes.
Also applies to: 832-844, 865-877, 883-885
src/storage/src/redis_strings.rs (1)
318-325: No issue detected. MetaCF is explicitly the default RocksDB column family (mapped to "default" in the CF name function). Reads viadb.get_opt()and writes viabatch.put(ColumnFamilyIndex::MetaCF, ...)both target the same CF, so there is no read/write inconsistency.
| let mut batch = self.create_batch()?; | ||
| for key_to_del in &keys_to_delete { | ||
| batch.delete(ColumnFamilyIndex::SetsDataCF, key_to_del)?; | ||
| } | ||
| if should_delete_meta { | ||
| batch.delete(ColumnFamilyIndex::MetaCF, &dest_base_meta_key)?; | ||
| } | ||
| batch.commit()?; | ||
| return Ok(0); | ||
| } | ||
|
|
||
| // Add all difference members to the destination set | ||
| let member_refs: Vec<&[u8]> = diff_members.iter().map(|s| s.as_bytes()).collect(); | ||
|
|
||
| // Write the batch to clear destination first | ||
| db.write(batch).context(RocksSnafu)?; | ||
| let mut batch = self.create_batch()?; | ||
| for key_to_del in &keys_to_delete { | ||
| batch.delete(ColumnFamilyIndex::SetsDataCF, key_to_del)?; | ||
| } | ||
| if should_delete_meta { | ||
| batch.delete(ColumnFamilyIndex::MetaCF, &dest_base_meta_key)?; | ||
| } | ||
| batch.commit()?; | ||
|
|
||
| // Now add the new members | ||
| let added = self.sadd(destination, &member_refs)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential atomicity issue: two separate batch commits.
The operation performs two sequential batch commits: (1) clearing the destination set (lines 2248-2255 and 2263-2270), and (2) adding new members via sadd (line 2273). If the second operation fails or the process crashes between commits, the destination set will be left empty instead of being restored to its original state or containing the expected difference result.
Consider refactoring to combine all operations (clear + add) into a single batch commit to ensure atomicity. This would require inlining the add logic instead of calling sadd, or extending sadd to accept an optional existing batch.
| // Use Batch for atomic operation | ||
| let mut batch = self.create_batch()?; | ||
|
|
||
| // Process all key-value pairs | ||
| for (key, value) in kvs { | ||
| let string_key = BaseKey::new(key); | ||
| let string_value = StringValue::new(value.to_owned()); | ||
| batch.put_cf(&cf, string_key.encode()?, string_value.encode()); | ||
| batch.put( | ||
| ColumnFamilyIndex::MetaCF, | ||
| &string_key.encode()?, | ||
| &string_value.encode(), | ||
| )?; | ||
| } | ||
|
|
||
| // Atomic write of all key-value pairs | ||
| db.write_opt(batch, &self.write_options) | ||
| .context(RocksSnafu)?; | ||
|
|
||
| Ok(()) | ||
| batch.commit() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MSET/MSETNX: batch commit achieves storage-level atomicity; verify semantics under concurrency.
MSET is fine as “all-or-nothing write” at RocksDB batch level. For MSETNX, the existence check happens before the batch write; without multi-key locking or a snapshot+CAS-like guarantee, concurrent writers can violate Redis’s “set none if any exists” semantics. If that’s an accepted limitation today, it’s worth documenting near msetnx.
#!/bin/bash
set -euo pipefail
# See whether multi-key ops have a canonical lock-order helper (or are known best-effort).
rg -n "LockMgr|ScopeRecordLock|lock order|deadlock|multi[-_ ]key" -S src/storage/srcAlso applies to: 1106-1155
关联 Issue
Closes #222
背景
当前存储层直接使用 RocksDB 的 WriteBatch 进行批量写操作,这种实现方式与底层存储引擎强耦合,无法支持后续的集群模式(需要通过 Raft 共识进行写入)。
解决方案
引入 Batch trait 抽象层,将批量写操作从具体实现中解耦:
主要改动
batch.rs模块,包含 Batch trait 及两种实现create_batch()工厂方法redis_strings.rs(2 处)redis_hashes.rs(6 处)redis_lists.rs(7 处)redis_sets.rs(6 处)redis_zsets.rs(8 处)测试情况
Checklist
Summary by CodeRabbit
New Features
Refactor
Tests & Docs
✏️ Tip: You can customize this high-level summary in your review settings.