From 5ca1ae344d35570051234123ccef758c5c5b6bf6 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 11 Feb 2026 16:49:32 +0100 Subject: [PATCH 1/2] composefs-oci: add support for splitfdstream Assisted-by: Claude Opus 4.6 Signed-off-by: Giuseppe Scrivano --- crates/composefs-oci/Cargo.toml | 4 +- crates/composefs-oci/src/lib.rs | 8 +- crates/composefs-oci/src/oci_image.rs | 47 +++ crates/composefs-oci/src/splitfdstream.rs | 439 ++++++++++++++++++++++ 4 files changed, 496 insertions(+), 2 deletions(-) create mode 100644 crates/composefs-oci/src/splitfdstream.rs diff --git a/crates/composefs-oci/Cargo.toml b/crates/composefs-oci/Cargo.toml index 6c38d3a9..f027f003 100644 --- a/crates/composefs-oci/Cargo.toml +++ b/crates/composefs-oci/Cargo.toml @@ -16,11 +16,13 @@ fn-error-context = "0.2" async-compression = { version = "0.4.0", default-features = false, features = ["tokio", "zstd", "gzip"] } bytes = { version = "1", default-features = false } composefs = { workspace = true } +splitfdstream = { workspace = true } containers-image-proxy = { version = "0.9.2", default-features = false } hex = { version = "0.4.0", default-features = false } indicatif = { version = "0.17.0", default-features = false, features = ["tokio"] } oci-spec = { version = "0.8.0", default-features = false } -rustix = { version = "1.0.0", features = ["fs"] } +rustix = { version = "1.0.0", features = ["fs", "net"] } +serde_json = { version = "1.0", default-features = false, features = ["std"] } sha2 = { version = "0.10.1", default-features = false } tar = { version = "0.4.38", default-features = false } tokio = { version = "1.24.2", features = ["rt-multi-thread"] } diff --git a/crates/composefs-oci/src/lib.rs b/crates/composefs-oci/src/lib.rs index bbb41c84..ce78254d 100644 --- a/crates/composefs-oci/src/lib.rs +++ b/crates/composefs-oci/src/lib.rs @@ -15,8 +15,14 @@ pub mod image; pub mod oci_image; pub mod skopeo; +pub mod splitfdstream; pub mod tar; +pub use splitfdstream::{ + import_complete_image_from_splitfdstream, import_from_splitfdstream, + CompleteImageImportResult, +}; + use std::{collections::HashMap, io::Read, sync::Arc}; use anyhow::{bail, ensure, Context, Result}; @@ -39,7 +45,7 @@ pub use skopeo::{pull_image, PullResult}; type ContentAndVerity = (String, ObjectID); -fn layer_identifier(diff_id: &str) -> String { +pub(crate) fn layer_identifier(diff_id: &str) -> String { format!("oci-layer-{diff_id}") } diff --git a/crates/composefs-oci/src/oci_image.rs b/crates/composefs-oci/src/oci_image.rs index 256168ec..ec5c06ce 100644 --- a/crates/composefs-oci/src/oci_image.rs +++ b/crates/composefs-oci/src/oci_image.rs @@ -530,6 +530,53 @@ pub fn write_manifest( Ok((manifest_digest.to_string(), id)) } +/// Writes a manifest to the repository from raw JSON bytes. +/// +/// Unlike [`write_manifest`], this preserves the exact JSON bytes from the +/// original source, avoiding digest mismatches from re-serialization. +pub fn write_manifest_raw( + repo: &Arc>, + manifest_json: &[u8], + manifest_digest: &str, + config_verity: &ObjectID, + layer_verities: &HashMap, ObjectID>, + reference: Option<&str>, +) -> Result<(String, ObjectID)> { + let content_id = manifest_identifier(manifest_digest); + + if let Some(verity) = repo.has_stream(&content_id)? { + if let Some(name) = reference { + tag_image(repo, manifest_digest, name)?; + } + return Ok((manifest_digest.to_string(), verity)); + } + + let computed = hash(manifest_json); + ensure!( + manifest_digest == computed, + "Manifest digest mismatch: expected {manifest_digest}, got {computed}" + ); + + let manifest: ImageManifest = + serde_json::from_slice(manifest_json).context("parsing manifest JSON")?; + + let mut stream = repo.create_stream(OCI_MANIFEST_CONTENT_TYPE); + + let config_key = format!("config:{}", manifest.config().digest()); + stream.add_named_stream_ref(&config_key, config_verity); + + for (diff_id, verity) in layer_verities { + stream.add_named_stream_ref(diff_id, verity); + } + + stream.write_external(manifest_json)?; + + let oci_ref = reference.map(oci_ref_path); + let id = repo.write_stream(stream, &content_id, oci_ref.as_deref())?; + + Ok((manifest_digest.to_string(), id)) +} + /// Checks if a manifest exists. pub fn has_manifest( repo: &Repository, diff --git a/crates/composefs-oci/src/splitfdstream.rs b/crates/composefs-oci/src/splitfdstream.rs new file mode 100644 index 00000000..3a90062f --- /dev/null +++ b/crates/composefs-oci/src/splitfdstream.rs @@ -0,0 +1,439 @@ +//! Client for importing container images and layers from a splitfdstream server. +//! +//! Implements the JSON-RPC 2.0 + SCM_RIGHTS protocol used by the Go +//! `jsonrpc-fdpass-go` transport. Each JSON message carries an `"fds"` field +//! that tells how many file descriptors accompany it via ancillary data. +//! FDs are batched (max 8 per sendmsg); overflow batches arrive as `"fds"` +//! notifications. + +use std::collections::HashMap; +use std::fs::File; +use std::io::{Cursor, IoSliceMut, Read, Write}; +use std::mem::MaybeUninit; +use std::os::fd::OwnedFd; +use std::os::unix::net::UnixStream; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{bail, Context, Result}; +use oci_spec::image::{ImageConfiguration, ImageManifest}; +use rustix::net::{RecvAncillaryBuffer, RecvAncillaryMessage, RecvFlags}; + +use composefs::fsverity::FsVerityHashValue; +use composefs::repository::Repository; +use splitfdstream::{Chunk, SplitfdstreamReader}; + +use crate::skopeo::TAR_LAYER_CONTENT_TYPE; + +/// Read buffer size for recvmsg data. +const READ_BUFFER_SIZE: usize = 4096; + +/// Persistent receive state, equivalent to the Go `Receiver` struct. +struct Receiver<'a> { + socket: &'a UnixStream, + buffer: Vec, + fd_queue: Vec, +} + +impl<'a> Receiver<'a> { + fn new(socket: &'a UnixStream) -> Self { + Self { + socket, + buffer: Vec::new(), + fd_queue: Vec::new(), + } + } + + /// Read more data (and possibly FDs) from the socket. + /// Mirrors Go `Receiver.readMoreData` / `recvWithFDs`. + fn read_more_data(&mut self) -> Result { + let mut buf = [0u8; READ_BUFFER_SIZE]; + let mut iov = [IoSliceMut::new(&mut buf)]; + + // Ancillary buffer sized for 8 file descriptors (Go: MaxFDsPerMessage). + let mut cmsg_space = [MaybeUninit::uninit(); rustix::cmsg_space!(ScmRights(8))]; + let mut cmsg_buffer = RecvAncillaryBuffer::new(&mut cmsg_space); + + let result = rustix::net::recvmsg( + self.socket, + &mut iov, + &mut cmsg_buffer, + RecvFlags::CMSG_CLOEXEC, + ) + .context("recvmsg")?; + + let bytes = result.bytes; + if bytes > 0 { + self.buffer.extend_from_slice(&buf[..bytes]); + } + + for msg in cmsg_buffer.drain() { + if let RecvAncillaryMessage::ScmRights(fds) = msg { + self.fd_queue.extend(fds); + } + } + + Ok(bytes) + } + + /// Receive the next complete JSON-RPC message with its file descriptors. + /// Mirrors Go `Receiver.Receive` + `tryParseMessage`. + fn receive(&mut self) -> Result<(serde_json::Value, Vec)> { + loop { + // Try to parse a complete JSON value from buffered data. + if !self.buffer.is_empty() { + let mut stream = serde_json::Deserializer::from_slice(&self.buffer) + .into_iter::(); + match stream.next() { + Some(Ok(value)) => { + let consumed = stream.byte_offset(); + self.buffer.drain(..consumed); + + // Dequeue FDs indicated by the "fds" field. + let fd_count = + value.get("fds").and_then(|v| v.as_u64()).unwrap_or(0) as usize; + + // FDs may arrive across multiple recvmsg calls on + // SOCK_STREAM; keep reading until we have enough. + while self.fd_queue.len() < fd_count { + let n = self.read_more_data()?; + if n == 0 && self.fd_queue.len() < fd_count { + bail!( + "connection closed: message expects {} FDs \ + but only {} received", + fd_count, + self.fd_queue.len() + ); + } + } + + let fds = self.fd_queue.drain(..fd_count).collect(); + return Ok((value, fds)); + } + Some(Err(e)) if e.is_eof() => { /* incomplete, need more data */ } + Some(Err(e)) => return Err(e).context("JSON framing error"), + None => { /* empty, need more data */ } + } + } + + let n = self.read_more_data()?; + if n == 0 { + bail!("connection closed before complete message received"); + } + } + } +} + +// --- Object storage helpers --- + +fn store_fd_to_repo( + repo: &Repository, + fd: OwnedFd, + size: u64, +) -> Result { + let tmpfile = repo.create_object_tmpfile()?; + let mut src = File::from(fd); + let mut dst = File::from(tmpfile); + std::io::copy(&mut src, &mut dst).context("copying to repository")?; + repo.finalize_object_tmpfile(dst, size) +} + +// --- Public entry point --- + +/// Import a container layer from a splitfdstream server into the repository. +/// +/// Receives file descriptors from the server and stores each one to the +/// repository immediately (then closes it) to avoid hitting the per-process +/// file descriptor limit. +pub fn import_from_splitfdstream( + repo: &Arc>, + socket_path: impl AsRef, + diff_id: &str, + layer_id: Option<&str>, + parent_id: Option<&str>, + reference: Option<&str>, +) -> Result { + let effective_layer_id = layer_id.unwrap_or(diff_id); + let socket = + UnixStream::connect(socket_path.as_ref()).context("connecting to splitfdstream server")?; + + // Build request JSON. + let mut params = serde_json::json!({ "layerId": effective_layer_id }); + if let Some(pid) = parent_id { + params["parentId"] = serde_json::json!(pid); + } + let request = serde_json::json!({ + "jsonrpc": "2.0", + "method": "GetSplitFDStream", + "params": params, + "id": 1 + }); + + let request_bytes = serde_json::to_vec(&request).context("serializing request")?; + (&socket) + .write_all(&request_bytes) + .context("writing request")?; + + let mut receiver = Receiver::new(&socket); + + // Receive the initial response. + let (resp_value, initial_fds) = receiver.receive().context("receiving response")?; + + if let Some(err) = resp_value.get("error") { + let msg = err + .get("message") + .and_then(|m| m.as_str()) + .unwrap_or("unknown error"); + bail!("splitfdstream server error: {msg}"); + } + + let result = resp_value + .get("result") + .context("missing 'result' in response")?; + let total_fds = result + .get("totalFDs") + .and_then(|v| v.as_u64()) + .context("missing 'totalFDs' in result")? as usize; + + // Process FDs as they arrive: store content FDs to the repo immediately + // so we don't hold hundreds of open file descriptors (which would exceed + // the per-process rlimit). + // + // allFDs[0] is a memfd with the stream data; allFDs[1:] are content FDs. + let mut stored_objects: Vec<(ObjectID, u64)> = Vec::new(); + let mut stream_data: Option> = None; + let mut fds_processed: usize = 0; + + let process_fd = |fd: OwnedFd, + fds_processed: &mut usize, + stream_data: &mut Option>, + stored_objects: &mut Vec<(ObjectID, u64)>| + -> Result<()> { + if *fds_processed == 0 { + let mut file = File::from(fd); + let mut data = Vec::new(); + file.read_to_end(&mut data) + .context("reading stream data from memfd")?; + *stream_data = Some(data); + } else { + let stat = rustix::fs::fstat(&fd).context("fstat on received fd")?; + let size = stat.st_size as u64; + let object_id = + store_fd_to_repo(repo, fd, size).context("storing fd to repository")?; + stored_objects.push((object_id, size)); + } + *fds_processed += 1; + Ok(()) + }; + + // Process the initial batch. + for fd in initial_fds { + process_fd( + fd, + &mut fds_processed, + &mut stream_data, + &mut stored_objects, + )?; + } + + // Receive follow-up "fds" notification batches until we have all FDs. + while fds_processed < total_fds { + let (_notif, batch_fds) = receiver.receive().context("receiving fd batch")?; + for fd in batch_fds { + process_fd( + fd, + &mut fds_processed, + &mut stream_data, + &mut stored_objects, + )?; + } + } + + let stream_data = stream_data.context("no stream data received (0 FDs)")?; + + // Parse the splitfdstream and build a composefs splitstream. + let content_identifier = crate::layer_identifier(diff_id); + repo.ensure_stream( + &content_identifier, + TAR_LAYER_CONTENT_TYPE, + |writer| { + let mut reader = SplitfdstreamReader::new(Cursor::new(&stream_data)); + while let Some(chunk) = reader.next_chunk().context("reading splitfdstream chunk")? { + match chunk { + Chunk::Inline(data) => { + writer.write_inline(data); + } + Chunk::External(fd_index) => { + let idx = fd_index as usize; + if idx >= stored_objects.len() { + bail!( + "splitfdstream references fd index {idx} \ + but only {} content fds received", + stored_objects.len() + ); + } + let (ref object_id, size) = stored_objects[idx]; + writer.add_external_size(size); + writer.write_reference(object_id.clone())?; + } + } + } + Ok(()) + }, + reference, + ) +} + +/// Result of importing a complete image via splitfdstream. +#[derive(Debug)] +pub struct CompleteImageImportResult { + /// SHA-256 digest of the manifest. + pub manifest_digest: String, + /// fs-verity hash of the stored manifest. + pub manifest_verity: ObjectID, + /// SHA-256 digest of the config. + pub config_digest: String, + /// fs-verity hash of the stored config. + pub config_verity: ObjectID, + /// Per-layer (diff_id, fs-verity hash) pairs. + pub layer_verities: Vec<(String, ObjectID)>, + /// Number of layers imported. + pub layers_imported: usize, + /// Total compressed size in bytes across all layers. + pub total_size_bytes: u64, +} + +/// Import a complete OCI image from a splitfdstream server into the repository. +/// +/// Fetches image metadata (manifest, config, layer IDs) via the `GetImage` RPC, +/// then imports each layer individually via `GetSplitFDStream` calls. +pub fn import_complete_image_from_splitfdstream( + repo: &Arc>, + socket_path: impl AsRef, + image_id: &str, + reference: Option<&str>, +) -> Result> { + let socket = UnixStream::connect(socket_path.as_ref()) + .context("connecting to splitfdstream server")?; + + let request = serde_json::json!({ + "jsonrpc": "2.0", + "method": "GetImage", + "params": { "imageId": image_id }, + "id": 1 + }); + + let request_bytes = serde_json::to_vec(&request).context("serializing request")?; + (&socket) + .write_all(&request_bytes) + .context("writing request")?; + + let mut receiver = Receiver::new(&socket); + let (resp_value, _fds) = receiver.receive().context("receiving response")?; + + if let Some(err) = resp_value.get("error") { + let msg = err + .get("message") + .and_then(|m| m.as_str()) + .unwrap_or("unknown error"); + bail!("splitfdstream server error: {msg}"); + } + + let result = resp_value + .get("result") + .context("missing 'result' in response")?; + + let manifest_json = result + .get("manifest") + .and_then(|v| v.as_str()) + .context("missing 'manifest' in response")?; + let config_json = result + .get("config") + .and_then(|v| v.as_str()) + .context("missing 'config' in response")?; + let storage_layer_ids: Vec = result + .get("layerDigests") + .and_then(|v| v.as_array()) + .context("missing 'layerDigests' in response")? + .iter() + .map(|v| v.as_str().unwrap_or("").to_string()) + .collect(); + + let manifest: ImageManifest = + serde_json::from_str(manifest_json).context("parsing image manifest")?; + let config: ImageConfiguration = + serde_json::from_str(config_json).context("parsing image configuration")?; + + let manifest_digest = crate::hash(manifest_json.as_bytes()); + let config_digest = crate::hash(config_json.as_bytes()); + + let layers = manifest.layers(); + let diff_ids = config.rootfs().diff_ids(); + + if storage_layer_ids.len() != layers.len() { + bail!( + "server returned {} storage layer IDs but manifest has {} layers", + storage_layer_ids.len(), + layers.len() + ); + } + if storage_layer_ids.len() != diff_ids.len() { + bail!( + "server returned {} storage layer IDs but config has {} diff IDs", + storage_layer_ids.len(), + diff_ids.len() + ); + } + + let mut imported_layers = Vec::new(); + let mut total_size_bytes = 0u64; + + for (i, (layer_desc, diff_id)) in layers.iter().zip(diff_ids.iter()).enumerate() { + let layer_verity = import_from_splitfdstream( + repo, + &socket_path, + diff_id, + Some(&storage_layer_ids[i]), + None, + None, + )?; + + imported_layers.push((diff_id.to_string(), layer_verity)); + total_size_bytes += layer_desc.size() as u64; + } + + let mut layer_refs = HashMap::new(); + for (diff_id, (_, layer_verity)) in diff_ids.iter().zip(&imported_layers) { + layer_refs.insert(diff_id.clone().into_boxed_str(), layer_verity.clone()); + } + + let (_, config_verity) = crate::write_config(repo, &config, layer_refs) + .context("storing image configuration")?; + + let mut layer_digest_to_verity = HashMap::new(); + for (layer_desc, (_, layer_verity)) in layers.iter().zip(&imported_layers) { + layer_digest_to_verity + .insert(layer_desc.digest().to_string().into_boxed_str(), layer_verity.clone()); + } + + let (_, manifest_verity) = crate::oci_image::write_manifest_raw( + repo, + manifest_json.as_bytes(), + &manifest_digest, + &config_verity, + &layer_digest_to_verity, + reference, + ) + .context("storing image manifest")?; + + let layers_imported = imported_layers.len(); + Ok(CompleteImageImportResult { + manifest_digest, + manifest_verity, + config_digest, + config_verity, + layer_verities: imported_layers, + layers_imported, + total_size_bytes, + }) +} From a6cef1ee4a9673d5c8c83d398dfd51604f3943a0 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 11 Feb 2026 16:48:35 +0100 Subject: [PATCH 2/2] cfsctl: add import-image-splitfdstream command Add a CLI subcommand to import a complete OCI image from a containers-storage splitfdstream server over a UNIX socket. Assisted-by: Claude Opus 4.6 Signed-off-by: Giuseppe Scrivano --- Cargo.toml | 1 + crates/cfsctl/Cargo.toml | 3 ++- crates/cfsctl/src/main.rs | 31 +++++++++++++++++++++++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c0eed637..ef6c1488 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ composefs-ioctls = { version = "0.3.0", path = "crates/composefs-ioctls", defaul composefs-oci = { version = "0.3.0", path = "crates/composefs-oci", default-features = false } composefs-boot = { version = "0.3.0", path = "crates/composefs-boot", default-features = false } composefs-http = { version = "0.3.0", path = "crates/composefs-http", default-features = false } +splitfdstream = { version = "0.3.0", path = "crates/splitfdstream", default-features = false } [profile.dev.package.sha2] # this is *really* slow otherwise diff --git a/crates/cfsctl/Cargo.toml b/crates/cfsctl/Cargo.toml index 850a9a14..ec7c67c0 100644 --- a/crates/cfsctl/Cargo.toml +++ b/crates/cfsctl/Cargo.toml @@ -13,7 +13,7 @@ version.workspace = true [features] default = ['pre-6.15', 'oci'] http = ['composefs-http'] -oci = ['composefs-oci'] +oci = ['composefs-oci', 'splitfdstream'] rhel9 = ['composefs/rhel9'] 'pre-6.15' = ['composefs/pre-6.15'] @@ -25,6 +25,7 @@ composefs = { workspace = true } composefs-boot = { workspace = true } composefs-oci = { workspace = true, optional = true } composefs-http = { workspace = true, optional = true } +splitfdstream = { workspace = true, optional = true } env_logger = { version = "0.11.0", default-features = false } hex = { version = "0.4.0", default-features = false } rustix = { version = "1.0.0", default-features = false, features = ["fs", "process"] } diff --git a/crates/cfsctl/src/main.rs b/crates/cfsctl/src/main.rs index 512817a2..708c786e 100644 --- a/crates/cfsctl/src/main.rs +++ b/crates/cfsctl/src/main.rs @@ -84,6 +84,16 @@ enum OciCommand { digest: String, name: Option, }, + /// Imports a complete image from a splitfdstream server into the repository. + ImportImageSplitfdstream { + /// Path to the splitfdstream server socket + socket: PathBuf, + /// The image ID (manifest digest or tag) + image_id: String, + /// Tag name for imported image + #[clap(long)] + tag: Option, + }, /// Lists the contents of a tar stream LsLayer { /// the name of the stream to list, either a stream ID in format oci-config-: or a reference in 'ref/' @@ -316,6 +326,27 @@ where )?; println!("{}", object_id.to_id()); } + OciCommand::ImportImageSplitfdstream { + socket, + image_id, + tag, + } => { + let result = composefs_oci::import_complete_image_from_splitfdstream( + &Arc::new(repo), + &socket, + &image_id, + tag.as_deref(), + )?; + + println!("Imported complete image:"); + println!(" Manifest: {}", result.manifest_digest); + println!(" Config: {}", result.config_digest); + println!(" Layers: {}", result.layers_imported); + println!(" Size: {} bytes", result.total_size_bytes); + if let Some(tag_name) = tag { + println!(" Tagged: {}", tag_name); + } + } OciCommand::LsLayer { name } => { composefs_oci::ls_layer(&repo, &name)?; }