Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ composefs-ioctls = { version = "0.3.0", path = "crates/composefs-ioctls", defaul
composefs-oci = { version = "0.3.0", path = "crates/composefs-oci", default-features = false }
composefs-boot = { version = "0.3.0", path = "crates/composefs-boot", default-features = false }
composefs-http = { version = "0.3.0", path = "crates/composefs-http", default-features = false }
splitfdstream = { version = "0.3.0", path = "crates/splitfdstream", default-features = false }

[profile.dev.package.sha2]
# this is *really* slow otherwise
Expand Down
3 changes: 2 additions & 1 deletion crates/cfsctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ version.workspace = true
[features]
default = ['pre-6.15', 'oci']
http = ['composefs-http']
oci = ['composefs-oci']
oci = ['composefs-oci', 'splitfdstream']
rhel9 = ['composefs/rhel9']
'pre-6.15' = ['composefs/pre-6.15']

Expand All @@ -25,6 +25,7 @@ composefs = { workspace = true }
composefs-boot = { workspace = true }
composefs-oci = { workspace = true, optional = true }
composefs-http = { workspace = true, optional = true }
splitfdstream = { workspace = true, optional = true }
env_logger = { version = "0.11.0", default-features = false }
hex = { version = "0.4.0", default-features = false }
rustix = { version = "1.0.0", default-features = false, features = ["fs", "process"] }
Expand Down
32 changes: 32 additions & 0 deletions crates/cfsctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ enum OciCommand {
digest: String,
name: Option<String>,
},
/// Imports a layer from a splitfdstream server into the repository.
ImportLayerSplitfdstream {
/// Path to the splitfdstream server socket
socket: PathBuf,
/// The layer digest (e.g., sha256:abc123...)
digest: String,
/// Optional reference name for the layer
name: Option<String>,
/// Layer ID to request from splitfdstream server (defaults to digest)
#[clap(long)]
layer_id: Option<String>,
/// Parent layer ID for delta computation
#[clap(long)]
parent_id: Option<String>,
},
/// Lists the contents of a tar stream
LsLayer {
/// the name of the stream to list, either a stream ID in format oci-config-<hash_type>:<hash_digest> or a reference in 'ref/'
Expand Down Expand Up @@ -316,6 +331,23 @@ where
)?;
println!("{}", object_id.to_id());
}
OciCommand::ImportLayerSplitfdstream {
socket,
digest,
name,
layer_id,
parent_id,
} => {
let object_id = composefs_oci::import_from_splitfdstream(
&Arc::new(repo),
&socket,
&digest,
layer_id.as_deref(),
parent_id.as_deref(),
name.as_deref(),
)?;
println!("{}", object_id.to_id());
}
OciCommand::LsLayer { name } => {
composefs_oci::ls_layer(&repo, &name)?;
}
Expand Down
4 changes: 3 additions & 1 deletion crates/composefs-oci/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ fn-error-context = "0.2"
async-compression = { version = "0.4.0", default-features = false, features = ["tokio", "zstd", "gzip"] }
bytes = { version = "1", default-features = false }
composefs = { workspace = true }
splitfdstream = { workspace = true }
containers-image-proxy = { version = "0.9.2", default-features = false }
hex = { version = "0.4.0", default-features = false }
indicatif = { version = "0.17.0", default-features = false, features = ["tokio"] }
oci-spec = { version = "0.8.0", default-features = false }
rustix = { version = "1.0.0", features = ["fs"] }
rustix = { version = "1.0.0", features = ["fs", "net"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
sha2 = { version = "0.10.1", default-features = false }
tar = { version = "0.4.38", default-features = false }
tokio = { version = "1.24.2", features = ["rt-multi-thread"] }
Expand Down
5 changes: 4 additions & 1 deletion crates/composefs-oci/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
pub mod image;
pub mod oci_image;
pub mod skopeo;
pub mod splitfdstream;
pub mod tar;

pub use splitfdstream::import_from_splitfdstream;

use std::{collections::HashMap, io::Read, sync::Arc};

use anyhow::{bail, ensure, Context, Result};
Expand All @@ -39,7 +42,7 @@ pub use skopeo::{pull_image, PullResult};

type ContentAndVerity<ObjectID> = (String, ObjectID);

fn layer_identifier(diff_id: &str) -> String {
pub(crate) fn layer_identifier(diff_id: &str) -> String {
format!("oci-layer-{diff_id}")
}

Expand Down
285 changes: 285 additions & 0 deletions crates/composefs-oci/src/splitfdstream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
//! Client for importing container layers from a splitfdstream server.
//!
//! Implements the JSON-RPC 2.0 + SCM_RIGHTS protocol used by the Go
//! `jsonrpc-fdpass-go` transport. Each JSON message carries an `"fds"` field
//! that tells how many file descriptors accompany it via ancillary data.
//! FDs are batched (max 8 per sendmsg); overflow batches arrive as `"fds"`
//! notifications.

use std::fs::File;
use std::io::{Cursor, IoSliceMut, Read, Write};
use std::mem::MaybeUninit;
use std::os::fd::OwnedFd;
use std::os::unix::net::UnixStream;
use std::path::Path;
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use rustix::net::{RecvAncillaryBuffer, RecvAncillaryMessage, RecvFlags};

use composefs::fsverity::FsVerityHashValue;
use composefs::repository::Repository;
use splitfdstream::{Chunk, SplitfdstreamReader};

use crate::skopeo::TAR_LAYER_CONTENT_TYPE;

// --- Transport layer (matches Go jsonrpc-fdpass-go) ---

/// Read buffer size for recvmsg data (Go: ReadBufferSize).
const READ_BUFFER_SIZE: usize = 4096;

/// Persistent receive state, equivalent to the Go `Receiver` struct.
struct Receiver<'a> {
socket: &'a UnixStream,
buffer: Vec<u8>,
fd_queue: Vec<OwnedFd>,
}

impl<'a> Receiver<'a> {
fn new(socket: &'a UnixStream) -> Self {
Self {
socket,
buffer: Vec::new(),
fd_queue: Vec::new(),
}
}

/// Read more data (and possibly FDs) from the socket.
/// Mirrors Go `Receiver.readMoreData` / `recvWithFDs`.
fn read_more_data(&mut self) -> Result<usize> {
let mut buf = [0u8; READ_BUFFER_SIZE];
let mut iov = [IoSliceMut::new(&mut buf)];

// Ancillary buffer sized for 8 file descriptors (Go: MaxFDsPerMessage).
let mut cmsg_space = [MaybeUninit::uninit(); rustix::cmsg_space!(ScmRights(8))];
let mut cmsg_buffer = RecvAncillaryBuffer::new(&mut cmsg_space);

let result = rustix::net::recvmsg(
self.socket,
&mut iov,
&mut cmsg_buffer,
RecvFlags::CMSG_CLOEXEC,
)
.context("recvmsg")?;

let bytes = result.bytes;
if bytes > 0 {
self.buffer.extend_from_slice(&buf[..bytes]);
}

for msg in cmsg_buffer.drain() {
if let RecvAncillaryMessage::ScmRights(fds) = msg {
self.fd_queue.extend(fds);
}
}

Ok(bytes)
}

/// Receive the next complete JSON-RPC message with its file descriptors.
/// Mirrors Go `Receiver.Receive` + `tryParseMessage`.
fn receive(&mut self) -> Result<(serde_json::Value, Vec<OwnedFd>)> {
loop {
// Try to parse a complete JSON value from buffered data.
if !self.buffer.is_empty() {
let mut stream = serde_json::Deserializer::from_slice(&self.buffer)
.into_iter::<serde_json::Value>();
match stream.next() {
Some(Ok(value)) => {
let consumed = stream.byte_offset();
self.buffer.drain(..consumed);

// Dequeue FDs indicated by the "fds" field.
let fd_count =
value.get("fds").and_then(|v| v.as_u64()).unwrap_or(0) as usize;

// FDs may arrive across multiple recvmsg calls on
// SOCK_STREAM; keep reading until we have enough.
while self.fd_queue.len() < fd_count {
let n = self.read_more_data()?;
if n == 0 && self.fd_queue.len() < fd_count {
bail!(
"connection closed: message expects {} FDs \
but only {} received",
fd_count,
self.fd_queue.len()
);
}
}

let fds = self.fd_queue.drain(..fd_count).collect();
return Ok((value, fds));
}
Some(Err(e)) if e.is_eof() => { /* incomplete, need more data */ }
Some(Err(e)) => return Err(e).context("JSON framing error"),
None => { /* empty, need more data */ }
}
}

let n = self.read_more_data()?;
if n == 0 {
bail!("connection closed before complete message received");
}
}
}
}

// --- Object storage helpers ---

fn store_fd_to_repo<ObjectID: FsVerityHashValue>(
repo: &Repository<ObjectID>,
fd: OwnedFd,
size: u64,
) -> Result<ObjectID> {
let tmpfile = repo.create_object_tmpfile()?;
let mut src = File::from(fd);
let mut dst = File::from(tmpfile);
std::io::copy(&mut src, &mut dst).context("copying to repository")?;
repo.finalize_object_tmpfile(dst, size)
}

// --- Public entry point ---

/// Import a container layer from a splitfdstream server into the repository.
///
/// Receives file descriptors from the server and stores each one to the
/// repository immediately (then closes it) to avoid hitting the per-process
/// file descriptor limit.
pub fn import_from_splitfdstream<ObjectID: FsVerityHashValue>(
repo: &Arc<Repository<ObjectID>>,
socket_path: impl AsRef<Path>,
diff_id: &str,
layer_id: Option<&str>,
parent_id: Option<&str>,
reference: Option<&str>,
) -> Result<ObjectID> {
let effective_layer_id = layer_id.unwrap_or(diff_id);
let socket =
UnixStream::connect(socket_path.as_ref()).context("connecting to splitfdstream server")?;

// Build request JSON.
let mut params = serde_json::json!({ "layerId": effective_layer_id });
if let Some(pid) = parent_id {
params["parentId"] = serde_json::json!(pid);
}
let request = serde_json::json!({
"jsonrpc": "2.0",
"method": "GetSplitFDStream",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for PoC however bigger picture I think what we need to do is add this method to https://github.com/containers/skopeo/blob/main/cmd/skopeo/proxy.go and then wrap it with https://github.com/bootc-dev/containers-image-proxy-rs/ right?

(And maybe a side quest in there is "port proxy to jsonrpc-fdpass" as new API)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or in other words... look at what I did in #218 the key thing I think here is fast-pathing the containers-storage: transport using this.

I think in order to do that we really do need to have not just an "import a single layer" as a CLI verb but do the whole "fetch manifest+config + layers" right?

"params": params,
"id": 1
});

let request_bytes = serde_json::to_vec(&request).context("serializing request")?;
(&socket)
.write_all(&request_bytes)
.context("writing request")?;

let mut receiver = Receiver::new(&socket);

// Receive the initial response.
let (resp_value, initial_fds) = receiver.receive().context("receiving response")?;

if let Some(err) = resp_value.get("error") {
let msg = err
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("unknown error");
bail!("splitfdstream server error: {msg}");
}

let result = resp_value
.get("result")
.context("missing 'result' in response")?;
let total_fds = result
.get("totalFDs")
.and_then(|v| v.as_u64())
.context("missing 'totalFDs' in result")? as usize;

// Process FDs as they arrive: store content FDs to the repo immediately
// so we don't hold hundreds of open file descriptors (which would exceed
// the per-process rlimit).
//
// allFDs[0] is a memfd with the stream data; allFDs[1:] are content FDs.
let mut stored_objects: Vec<(ObjectID, u64)> = Vec::new();
let mut stream_data: Option<Vec<u8>> = None;
let mut fds_processed: usize = 0;

let process_fd = |fd: OwnedFd,
fds_processed: &mut usize,
stream_data: &mut Option<Vec<u8>>,
stored_objects: &mut Vec<(ObjectID, u64)>|
-> Result<()> {
if *fds_processed == 0 {
let mut file = File::from(fd);
let mut data = Vec::new();
file.read_to_end(&mut data)
.context("reading stream data from memfd")?;
*stream_data = Some(data);
} else {
let stat = rustix::fs::fstat(&fd).context("fstat on received fd")?;
let size = stat.st_size as u64;
let object_id =
store_fd_to_repo(repo, fd, size).context("storing fd to repository")?;
stored_objects.push((object_id, size));
}
*fds_processed += 1;
Ok(())
};

// Process the initial batch.
for fd in initial_fds {
process_fd(
fd,
&mut fds_processed,
&mut stream_data,
&mut stored_objects,
)?;
}

// Receive follow-up "fds" notification batches until we have all FDs.
while fds_processed < total_fds {
let (_notif, batch_fds) = receiver.receive().context("receiving fd batch")?;
for fd in batch_fds {
process_fd(
fd,
&mut fds_processed,
&mut stream_data,
&mut stored_objects,
)?;
}
}

let stream_data = stream_data.context("no stream data received (0 FDs)")?;

// Parse the splitfdstream and build a composefs splitstream.
let content_identifier = crate::layer_identifier(diff_id);
repo.ensure_stream(
&content_identifier,
TAR_LAYER_CONTENT_TYPE,
|writer| {
let mut reader = SplitfdstreamReader::new(Cursor::new(&stream_data));
while let Some(chunk) = reader.next_chunk().context("reading splitfdstream chunk")? {
match chunk {
Chunk::Inline(data) => {
writer.write_inline(data);
}
Chunk::External(fd_index) => {
let idx = fd_index as usize;
if idx >= stored_objects.len() {
bail!(
"splitfdstream references fd index {idx} \
but only {} content fds received",
stored_objects.len()
);
}
let (ref object_id, size) = stored_objects[idx];
writer.add_external_size(size);
writer.write_reference(object_id.clone())?;
}
}
}
Ok(())
},
reference,
)
}
Loading