Skip to content

Conversation

@Ozodimgba
Copy link
Contributor

Parallel encoding with Rayon threadpool

Problem

Pre-encoding in from_geyser() blocks validator threads during serialization. Even though encoding happens once per message, the validator must wait for each encode to complete before processing the next notification.

Solution

Move encoding from from_geyser() to geyser_loop() and parallelize using a dedicated Rayon threadpool. Messages arrive un-encoded, get batched (up to 31), then parallel encoded before broadcast. This moves CPU-intensive work off validator threads onto dedicated encoder threads.

Changes

New module: parallel.rs

  • ParallelEncoder struct with dedicated Rayon threadpool
  • Bridge thread connects async tokio to sync Rayon via blocking_recv
  • Oneshot channel returns encoded batch to async context
  • Small batch optimization: batches < 4 encode inline to avoid overhead

yellowstone-grpc-proto/src/plugin/message.rs

  • Remove TransactionEncoder::pre_encode() call from MessageTransactionInfo::from_geyser()
  • Remove AccountEncoder::pre_encode() call from MessageAccountInfo::from_geyser()

yellowstone-grpc-geyser/src/grpc.rs

  • Add parallel_encoder parameter to geyser_loop()
  • Create ParallelEncoder::new(4) at startup
  • Call parallel_encoder.encode(batch).await before broadcast

yellowstone-grpc-geyser/src/metrics.rs

  • Add GEYSER_BATCH_SIZE histogram to track batch size distribution

Why this works

  • Arc::get_mut() succeeds because we have sole ownership of messages in geyser_loop before broadcast
  • 97.5% of batches hit 25-31 items (measured via GEYSER_BATCH_SIZE metric), justifying parallelization overhead
  • Dedicated 4-thread pool isolates encoding work from validator and tokio runtime
  • Validator threads return faster from notify_transaction/notify_account

Testing

  • test_parallel_encoder_transactions: verifies transactions get encoded
  • test_parallel_encoder_accounts: verifies accounts get encoded
  • test_small_batch_uses_sync: verifies batches < 4 skip threadpool overhead
  • test_mixed_batch: verifies mixed transaction/account batches work

Expected impact

Metric Before After
Validator thread blocked on encode Yes No
Encoding parallelism Sequential per message Parallel per batch
Batch size visibility None GEYSER_BATCH_SIZE histogram

@Ozodimgba Ozodimgba requested a review from lvboudre January 27, 2026 09:56
@Ozodimgba Ozodimgba marked this pull request as draft January 27, 2026 09:56

// Run geyser message loop
let (messages_tx, messages_rx) = mpsc::unbounded_channel();
let parallel_encoder = ParallelEncoder::new(4); // number should be in a config not hard coded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you said, make this configurable please.

let (tx, rx) = mpsc::unbounded_channel();

std::thread::Builder::new()
.name("encoder-bridge".into())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you name it geyser-encoding-bridge instead so when we list thread can grep all geyser related threads using geyser* please.

});

let _ = response.send(batch);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a log::info!("existing encoder bridge loop) when you leave please to facilitate troubelshooting.


let (tx, rx) = mpsc::unbounded_channel();

std::thread::Builder::new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spawning thread without every joining the handle is typically a code smell.
In the case of the geyser plugin we must make sure we close all threads before leaving the "uninstall" plugin method, otherwise it the .so binary will still be mapped in memory.


// Run geyser message loop
let (messages_tx, messages_rx) = mpsc::unbounded_channel();
let parallel_encoder = ParallelEncoder::new(4); // number should be in a config not hard coded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure you can get a hold of the parallel_encoder so we can gracefully shut it down when unloading a geyser plugin.

}

impl ParallelEncoder {
pub fn new(num_threads: usize) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure you also return JoinHandle so code elsewhere can join it during unload procedure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants