diff --git a/lib/rex/io/fiber_scheduler.rb b/lib/rex/io/fiber_scheduler.rb new file mode 100644 index 0000000..8b7123f --- /dev/null +++ b/lib/rex/io/fiber_scheduler.rb @@ -0,0 +1,156 @@ +require 'fiber' +require 'io/nonblock' +require 'rex/compat' + +module Rex +module IO + + # This fiber scheduler is heavily base on + # https://blog.monotone.dev/ruby/2020/12/25/ruby-3-fiber.html + class FiberScheduler + def initialize + @readable = {} + @writable = {} + @waiting = {} + @ready = [] + @pending = [] + @blocking = 0 + @urgent = Rex::Compat.pipe + @mutex = Mutex.new + end + + # This allows the fiber to be scheduled in the #run thread from another thread + def schedule_fiber(&block) + @mutex.synchronize do + @pending << block + end + # Wake up the scheduler + @urgent.last.write_nonblock('.') rescue nil + end + + def run + while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive? or @ready.any? or @pending.any? + # Start any pending fibers + pending_blocks = [] + @mutex.synchronize do + pending_blocks = @pending.dup + @pending.clear + end + + pending_blocks.each do |block| + fiber = Fiber.new(blocking: false, &block) + fiber.resume + end + + begin + readable, writable = ::IO.select(@readable.keys + [@urgent.first], @writable.keys, [], 0.1) + rescue ::IOError + cleanup_closed_ios + next + end + + # Drain the urgent pipe + if readable&.include?(@urgent.first) + @urgent.first.read_nonblock(1024) rescue nil + end + + readable&.each do |io| + next if io == @urgent.first + + if fiber = @readable.delete(io) + fiber.resume + end + end + + writable&.each do |io| + if fiber = @writable.delete(io) + fiber.resume + end + end + + @waiting.keys.each do |fiber| + if current_time > @waiting[fiber] + @waiting.delete(fiber) + fiber.resume + end + end + + ready, @ready = @ready, [] + ready.each do |fiber| + fiber.resume + end + end + end + + def io_wait(io, events, timeout) + unless (events & ::IO::READABLE).zero? + @readable[io] = Fiber.current + end + unless (events & ::IO::WRITABLE).zero? + @writable[io] = Fiber.current + end + + Fiber.yield + events + end + + def kernel_sleep(duration = nil) + block(:sleep, duration) + true + end + + def block(blocker, timeout = nil) + if timeout + @waiting[Fiber.current] = current_time + timeout + begin + Fiber.yield + ensure + @waiting.delete(Fiber.current) + end + else + @blocking += 1 + begin + Fiber.yield + ensure + @blocking -= 1 + end + end + end + + def unblock(blocker, fiber) + @ready << fiber + io = @urgent.last + io.write_nonblock('.') + end + + def close + run + @urgent.each(&:close) + @urgent = nil + end + + def closed? + @urgent.nil? + end + + def fiber(&block) + fiber = Fiber.new(blocking: false, &block) + fiber.resume + fiber + end + + private + + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + def cleanup_closed_ios + @readable.delete_if { |io, _| io.closed? rescue true } + @writable.delete_if { |io, _| io.closed? rescue true } + @waiting.delete_if { |fiber, _| !fiber || !fiber.alive? rescue true } + end + end + +end +end \ No newline at end of file diff --git a/lib/rex/io/relay_manager.rb b/lib/rex/io/relay_manager.rb new file mode 100644 index 0000000..199fe95 --- /dev/null +++ b/lib/rex/io/relay_manager.rb @@ -0,0 +1,113 @@ +require 'rex/io/fiber_scheduler' + +module Rex +module IO + +# An IO RelayManager which will read data from a socket and write it to a sink +# using a background thread. +class RelayManager + attr_reader :thread + + def initialize + @thread = nil + @scheduler = FiberScheduler.new + end + + # Add a IO relay to the manager. This will start relaying data from the source + # socket to the destination sink immediately. An optional "on_exit" callback + # can be provided which will be called when the socket is closed. + # + # @param sock [::Socket] The source socket that data will be read from. + # @param sink [#write, #call] A data destination where read data will be sent. It is called + # with one parameter, the data to be transferred. If the object exposes a #write method, it will + # be called repeatedly all data is processed and the return value will be used to determine how + # much of the data was written. If the object exposes a #call method, it will be called once and + # must handle processing all the data it is provided. + # @param name [String] A human-friendly name for the relay used in debug output. + # @param on_exit [#call] A callback to be invoked when sink can no longer be read from. + def add_relay(sock, sink: nil, name: nil, on_exit: nil) + @scheduler.schedule_fiber do + relay_fiber(sock, sink, name, on_exit: on_exit) + end + + start unless running? + end + + # Write all data to the specified IO. This is intended to be used in scenarios + # where partial writes are possible but not desirable. + # + # @param io [#write] An object to write the data to. It must return a number indicating + # how many bytes of the provided data were processed. + # @param data [String] The data that should be written. + def self.io_write_all(io, data) + offset = 0 + while offset < data.bytesize + written = io.write(data.byteslice(offset..-1)) + offset += written + end + data.bytesize + end + + private + + def running? + @thread && @thread.alive? + end + + def start + return false if running? + + @thread = Thread.new { run } + true + end + + def run + old_scheduler = Fiber.scheduler + # A fiber scheduler can be set per-thread + Fiber.set_scheduler(@scheduler) + + # Run the scheduler (blocks here) + @scheduler.run + ensure + Fiber.set_scheduler(old_scheduler) + end + + def relay_fiber(sock, sink, name, on_exit: nil) + loop do + break if sock.closed? + + buf = sock.readpartial(32_768) + write_to_sink(sink, buf) + end + rescue EOFError + nil + rescue => e + message = "#{self.class.name}#relay_fiber(name: #{name}): #{e.class} #{e}" + if defined?(elog) # elog is defined by framework, otherwise use stderr + elog(message, error: e) + else + $stderr.puts message + end + ensure + unless sock.closed? + sock.close rescue nil + end + + on_exit.call if on_exit + end + + def write_to_sink(sink, data) + if sink.respond_to?(:write) + self.class.io_write_all(sink, data) + + elsif sink.respond_to?(:call) + sink.call(data) + + else + raise ArgumentError, "Unsupported sink type: #{sink.inspect}" + end + end +end + +end +end \ No newline at end of file diff --git a/lib/rex/io/socket_abstraction.rb b/lib/rex/io/socket_abstraction.rb index 35d8af2..f29667f 100644 --- a/lib/rex/io/socket_abstraction.rb +++ b/lib/rex/io/socket_abstraction.rb @@ -1,7 +1,8 @@ # -*- coding: binary -*- require 'socket' -require 'fcntl' + +require 'rex/io/relay_manager' module Rex module IO @@ -12,6 +13,7 @@ module IO # ### module SocketAbstraction + ### # # Extension information for required Stream interface. @@ -54,7 +56,7 @@ def initialize_abstraction def cleanup_abstraction lsock.close if lsock and !lsock.closed? - monitor_thread.join if monitor_thread&.alive? + monitor_thread.join if monitor_thread&.alive? && monitor_thread&.object_id != Thread.current.object_id rsock.close if rsock and !rsock.closed? @@ -114,105 +116,27 @@ def localinfo # attr_reader :rsock - module MonitoredRSock - def close - @close_requested = true - @monitor_thread.join - nil - end + protected - def sysclose - self.class.instance_method(:close).bind(self).call + def monitor_rsock(name = 'MonitorRemote') + if respond_to?(:close_write) + on_exit = method(:close_write) + else + on_exit = nil end - attr_reader :close_requested - attr_writer :monitor_thread + monitor_sock(rsock, sink: self, name: name, on_exit: on_exit) end - protected + def monitor_sock(sock, sink:, name:, on_exit: nil) + @relay_manager ||= Rex::IO::RelayManager.new + @relay_manager.add_relay(sock, sink: sink, name: name, on_exit: on_exit) + end - def monitor_rsock(threadname = 'SocketMonitorRemote') - rsock.extend(MonitoredRSock) - rsock.monitor_thread = self.monitor_thread = Rex::ThreadFactory.spawn(threadname, false) do - loop do - closed = rsock.nil? || rsock.close_requested - - if closed - wlog('monitor_rsock: the remote socket has been closed, exiting loop') - break - end - - buf = nil - - begin - s = Rex::ThreadSafe.select([rsock], nil, nil, 0.2) - next if s.nil? || s[0].nil? - rescue Exception => e - wlog("monitor_rsock: exception during select: #{e.class} #{e}") - closed = true - end - - unless closed - begin - buf = rsock.sysread(32_768) - if buf.nil? - closed = true - wlog('monitor_rsock: closed remote socket due to nil read') - end - rescue EOFError => e - closed = true - dlog('monitor_rsock: EOF in rsock') - rescue ::Exception => e - closed = true - wlog("monitor_rsock: exception during read: #{e.class} #{e}") - end - end - - unless closed - total_sent = 0 - total_length = buf.length - while total_sent < total_length - begin - data = buf[total_sent, buf.length] - - # Note that this must be write() NOT syswrite() or put() or anything like it. - # Using syswrite() breaks SSL streams. - sent = write(data) - - # sf: Only remove the data off the queue is write was successful. - # This way we naturally perform a resend if a failure occurred. - # Catches an edge case with meterpreter TCP channels where remote send - # fails gracefully and a resend is required. - if sent.nil? - closed = true - wlog('monitor_rsock: failed writing, socket must be dead') - break - elsif sent > 0 - total_sent += sent - end - rescue ::IOError, ::EOFError => e - closed = true - wlog("monitor_rsock: exception during write: #{e.class} #{e}") - break - end - end - end - - next unless closed - - begin - close_write if respond_to?('close_write') - rescue StandardError - end - - break - end - - rsock.sysclose - end + def monitor_thread + @relay_manager&.thread end - attr_accessor :monitor_thread attr_writer :lsock, :rsock end end diff --git a/spec/rex/io/fiber_scheduler_spec.rb b/spec/rex/io/fiber_scheduler_spec.rb new file mode 100644 index 0000000..bd6c2ce --- /dev/null +++ b/spec/rex/io/fiber_scheduler_spec.rb @@ -0,0 +1,609 @@ +require 'rex/compat' +require 'rex/io/fiber_scheduler' + +RSpec.describe Rex::IO::FiberScheduler do + let(:scheduler) { described_class.new } + + after do + if scheduler && !scheduler.closed? + scheduler.close + end + end + + describe '#initialize' do + it 'initializes with empty state' do + expect(scheduler.instance_variable_get(:@readable)).to eq({}) + expect(scheduler.instance_variable_get(:@writable)).to eq({}) + expect(scheduler.instance_variable_get(:@waiting)).to eq({}) + expect(scheduler.instance_variable_get(:@ready)).to eq([]) + expect(scheduler.instance_variable_get(:@pending)).to eq([]) + expect(scheduler.instance_variable_get(:@blocking)).to eq(0) + end + + it 'creates an urgent pipe for signaling' do + urgent = scheduler.instance_variable_get(:@urgent) + expect(urgent).to be_a(Array) + expect(urgent.first).to be_a(IO) + expect(urgent.last).to be_a(IO) + end + + it 'creates a mutex for thread safety' do + mutex = scheduler.instance_variable_get(:@mutex) + expect(mutex).to be_a(Mutex) + end + end + + describe '#fiber' do + it 'creates and resumes a new non-blocking fiber' do + executed = false + fiber = scheduler.fiber { executed = true } + + expect(fiber).to be_a(Fiber) + expect(executed).to be true + end + + it 'returns the fiber' do + fiber = scheduler.fiber { :result } + expect(fiber).to be_a(Fiber) + end + end + + describe '#schedule_fiber' do + it 'schedules a fiber for later execution' do + executed = false + + scheduler.schedule_fiber { executed = true } + + expect(executed).to be false + pending = scheduler.instance_variable_get(:@pending) + expect(pending.size).to eq(1) + end + + it 'wakes up the scheduler via urgent pipe' do + urgent_pipe = scheduler.instance_variable_get(:@urgent) + + scheduler.schedule_fiber { :test } + + # The urgent pipe should have data + readable, = IO.select([urgent_pipe.first], nil, nil, 0) + expect(readable).to include(urgent_pipe.first) + end + + it 'is thread-safe' do + counter = Mutex.new + completed = [] + + threads = 10.times.map do + Thread.new do + 100.times do |i| + scheduler.schedule_fiber do + scheduler.kernel_sleep(0.1) + counter.synchronize { completed << true } + end + end + end + end + + threads.each(&:join) + + # All fibers should be scheduled + pending = scheduler.instance_variable_get(:@pending) + expect(pending.size).to eq(1000) + + # Now run them concurrently + start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + scheduler.run + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time + + # If sequential, would take 100 seconds (1000 * 0.1) + # Concurrent should be much faster (slightly more than 0.1) + expect(elapsed).to be > 0.1 + expect(elapsed).to be < 1.0 + expect(completed.size).to eq(1000) + end + end + + describe '#run' do + it 'processes pending fibers' do + results = [] + + scheduler.schedule_fiber { results << 1 } + scheduler.schedule_fiber { results << 2 } + scheduler.schedule_fiber { results << 3 } + + run_thread = Thread.new { scheduler.run } + run_thread.join(1) + + expect(results).to contain_exactly(1, 2, 3) + end + + it 'exits when all work is complete' do + scheduler.schedule_fiber { :done } + + expect { scheduler.run }.not_to raise_error + end + + it 'processes readable IO events' do + r, w = Rex::Compat.pipe + result = nil + + scheduler.schedule_fiber do + scheduler.io_wait(r, IO::READABLE, nil) + result = r.read_nonblock(100) + end + + run_thread = Thread.new { scheduler.run } + + w.write("test data") + w.close + run_thread.join(1) + + expect(result).to eq("test data") + + r.close + end + + it 'processes writable IO events' do + r, w = Rex::Compat.pipe + result = nil + + scheduler.schedule_fiber do + scheduler.io_wait(w, IO::WRITABLE, nil) + result = :written + w.write("data") + end + + run_thread = Thread.new { scheduler.run } + run_thread.join(1) + + expect(result).to eq(:written) + + r.close + w.close + end + + it 'processes waiting fibers after timeout' do + start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + resume_time = nil + + scheduler.schedule_fiber do + scheduler.kernel_sleep(0.2) + resume_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + scheduler.run + + elapsed = resume_time - start_time + expect(elapsed).to be >= 0.2 + expect(elapsed).to be < 0.5 + end + + it 'processes ready fibers' do + result = nil + + scheduler.schedule_fiber do + fiber = Fiber.current + scheduler.schedule_fiber do + scheduler.unblock(:test, fiber) + end + scheduler.block(:test) + result = :unblocked + end + + scheduler.run + + expect(result).to eq(:unblocked) + end + + it 'handles blocking count correctly' do + blocking_count = nil + block_started = false + mutex = Mutex.new + cv = ConditionVariable.new + + scheduler.schedule_fiber do + fiber = Fiber.current + scheduler.schedule_fiber do + # Wait until we've checked the blocking count + mutex.synchronize do + cv.wait(mutex) until block_started + end + scheduler.unblock(:test, fiber) + end + + # Signal that we're about to block + mutex.synchronize do + block_started = true + cv.signal + end + + scheduler.block(:test) + blocking_count = scheduler.instance_variable_get(:@blocking) + end + + run_thread = Thread.new { scheduler.run } + + # Wait for the fiber to signal it's blocking + mutex.synchronize do + cv.wait(mutex) until block_started + end + + # Now we know for sure the fiber is blocked + expect(scheduler.instance_variable_get(:@blocking)).to eq(1) + + # Signal the unblocking fiber to proceed + mutex.synchronize { cv.signal } + + # Wait for completion + run_thread.join + + # After unblocking, count should be back to 0 + expect(blocking_count).to eq(0) + end + end + + describe '#io_wait' do + it 'registers fiber for readable IO' do + r, w = Rex::Compat.pipe + registered = false + mutex = Mutex.new + cv = ConditionVariable.new + + scheduler.schedule_fiber do + mutex.synchronize do + registered = true + cv.signal + end + scheduler.io_wait(r, IO::READABLE, nil) + r.read_nonblock(100) + r.close + end + + run_thread = Thread.new { scheduler.run } + + # Wait for fiber to register + mutex.synchronize do + cv.wait(mutex) until registered + end + + readable = scheduler.instance_variable_get(:@readable) + expect(readable.keys).to include(r) + + # Write data then close to trigger completion + w.write("test data") + w.close + + run_thread.join(2) + end + + it 'registers fiber for writable IO' do + r, w = Rex::Compat.pipe + fiber_resumed = false + + scheduler.schedule_fiber do + scheduler.io_wait(w, IO::WRITABLE, nil) + fiber_resumed = true + end + + run_thread = Thread.new { scheduler.run } + run_thread.join(2) + + # The fiber should have been resumed since pipes are immediately writable + expect(fiber_resumed).to be true + + r.close rescue nil + w.close rescue nil + end + + it 'returns events mask' do + r, w = Rex::Compat.pipe + events = nil + + scheduler.schedule_fiber do + events = scheduler.io_wait(r, IO::READABLE, nil) + r.read_nonblock(100) + r.close + end + + run_thread = Thread.new { scheduler.run } + sleep 0.1 + + w.write("data") + w.close + + run_thread.join(2) + + expect(events).to eq(IO::READABLE) + end + end + + describe '#kernel_sleep' do + it 'blocks for specified duration' do + start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + end_time = nil + + scheduler.schedule_fiber do + scheduler.kernel_sleep(0.2) + end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + scheduler.run + + elapsed = end_time - start_time + expect(elapsed).to be >= 0.2 + expect(elapsed).to be < 0.5 + end + + it 'returns true' do + result = nil + + scheduler.schedule_fiber do + result = scheduler.kernel_sleep(0.1) + end + + scheduler.run + + expect(result).to be true + end + + it 'handles nil duration as indefinite block' do + blocked = false + + scheduler.schedule_fiber do + fiber = Fiber.current + scheduler.schedule_fiber do + sleep 0.1 + scheduler.unblock(:sleep, fiber) + end + scheduler.kernel_sleep(nil) + blocked = true + end + + scheduler.run + + expect(blocked).to be true + end + end + + describe '#block' do + it 'increments blocking count for indefinite block' do + block_started = false + unblock_ready = false + mutex = Mutex.new + cv = ConditionVariable.new + + scheduler.schedule_fiber do + fiber = Fiber.current + scheduler.schedule_fiber do + # Wait for signal to unblock + mutex.synchronize do + cv.wait(mutex) until unblock_ready + end + scheduler.unblock(:test, fiber) + end + + # Signal that we're about to block + mutex.synchronize do + block_started = true + cv.signal + end + + scheduler.block(:test) + end + + run_thread = Thread.new { scheduler.run } + + # Wait for fiber to be blocked + mutex.synchronize do + cv.wait(mutex) until block_started + end + + expect(scheduler.instance_variable_get(:@blocking)).to eq(1) + + # Signal unblock and wait for completion + mutex.synchronize do + unblock_ready = true + cv.signal + end + + run_thread.join(2) + + expect(scheduler.instance_variable_get(:@blocking)).to eq(0) + end + + it 'uses waiting queue for timed block' do + block_started = false + mutex = Mutex.new + cv = ConditionVariable.new + + scheduler.schedule_fiber do + # Signal that we're about to block + mutex.synchronize do + block_started = true + cv.signal + end + + scheduler.block(:test, 0.2) + end + + run_thread = Thread.new { scheduler.run } + + # Wait for fiber to start blocking + mutex.synchronize do + cv.wait(mutex) until block_started + end + + waiting = scheduler.instance_variable_get(:@waiting) + expect(waiting.size).to eq(1) + + # Wait for the block to complete naturally + run_thread.join(1) + end + + it 'resumes after timeout' do + resumed = false + + scheduler.schedule_fiber do + scheduler.block(:test, 0.15) + resumed = true + end + + scheduler.run + + expect(resumed).to be true + end + + it 'cleans up waiting queue after resume' do + scheduler.schedule_fiber do + scheduler.block(:test, 0.1) + end + + scheduler.run + + waiting = scheduler.instance_variable_get(:@waiting) + expect(waiting).to be_empty + end + end + + describe '#unblock' do + it 'adds fiber to ready queue' do + fiber = Fiber.new(blocking: false) { Fiber.yield } + fiber.resume + + scheduler.unblock(:test, fiber) + + ready = scheduler.instance_variable_get(:@ready) + expect(ready).to include(fiber) + end + + it 'signals via urgent pipe' do + fiber = Fiber.new(blocking: false) { Fiber.yield } + fiber.resume + + urgent_pipe = scheduler.instance_variable_get(:@urgent) + + scheduler.unblock(:test, fiber) + + readable, = IO.select([urgent_pipe.first], nil, nil, 0.1) + expect(readable).to include(urgent_pipe.first) + end + + it 'allows blocked fiber to resume' do + result = nil + + scheduler.schedule_fiber do + fiber = Fiber.current + scheduler.schedule_fiber do + sleep 0.1 + scheduler.unblock(:test, fiber) + end + scheduler.block(:test) + result = :resumed + end + + scheduler.run + + expect(result).to eq(:resumed) + end + end + + describe '#close' do + it 'completes remaining work' do + result = nil + + scheduler.schedule_fiber { result = :done } + scheduler.close + + expect(result).to eq(:done) + end + + it 'closes urgent pipe' do + urgent = scheduler.instance_variable_get(:@urgent) + original_pipes = urgent.dup + + scheduler.close + + original_pipes.each do |pipe| + expect(pipe.closed?).to be true + end + end + + it 'sets urgent to nil' do + scheduler.close + expect(scheduler.instance_variable_get(:@urgent)).to be_nil + end + end + + describe 'integration scenarios' do + it 'handles multiple concurrent fibers with IO' do + results = [] + pipes = 3.times.map { Rex::Compat.pipe } + + pipes.each_with_index do |(r, w), index| + scheduler.schedule_fiber do + scheduler.io_wait(r, IO::READABLE, nil) + data = r.read_nonblock(100) + results << "fiber#{index}: #{data}" + end + end + + run_thread = Thread.new { scheduler.run } + + pipes.each_with_index do |(r, w), index| + w.write("data#{index}") + w.close + end + + run_thread.join + + expect(results.size).to eq(3) + expect(results).to include("fiber0: data0", "fiber1: data1", "fiber2: data2") + + pipes.each { |r, w| r.close rescue nil } + end + + it 'handles mixed blocking and IO operations' do + results = [] + + scheduler.schedule_fiber do + results << :start + scheduler.kernel_sleep(0.1) + results << :after_sleep + end + + r, w = Rex::Compat.pipe + scheduler.schedule_fiber do + scheduler.io_wait(r, IO::READABLE, nil) + results << :after_io + r.close + end + + run_thread = Thread.new { scheduler.run } + + w.write("data") + w.close + + run_thread.join + + expect(results).to include(:start, :after_sleep, :after_io) + end + + it 'handles nested fiber scheduling' do + results = [] + + scheduler.schedule_fiber do + results << 1 + scheduler.schedule_fiber do + results << 2 + scheduler.schedule_fiber do + results << 3 + end + end + end + + scheduler.run + + expect(results).to eq([1, 2, 3]) + end + end +end \ No newline at end of file diff --git a/spec/rex/io/relay_manager_spec.rb b/spec/rex/io/relay_manager_spec.rb new file mode 100644 index 0000000..8b1cedc --- /dev/null +++ b/spec/rex/io/relay_manager_spec.rb @@ -0,0 +1,415 @@ +require 'rex/io/relay_manager' +require 'socket' + +RSpec.describe Rex::IO::RelayManager do + let(:manager) { described_class.new } + + after do + # Clean up the thread if it's still running + if manager.thread && manager.thread.alive? + manager.thread.kill + manager.thread.join(1) + end + end + + describe '#initialize' do + it 'initializes with no thread' do + expect(manager.thread).to be_nil + end + + it 'creates a FiberScheduler' do + scheduler = manager.instance_variable_get(:@scheduler) + expect(scheduler).to be_a(Rex::IO::FiberScheduler) + end + end + + describe '#add_relay' do + it 'schedules a relay fiber' do + r, w = IO.pipe + sink = StringIO.new + + manager.add_relay(r, sink: sink, name: 'test') + + scheduler = manager.instance_variable_get(:@scheduler) + pending = scheduler.instance_variable_get(:@pending) + + expect(pending.size).to be >= 1 + + w.close + r.close + end + + it 'starts the manager thread if not running' do + r, w = Socket.pair(:UNIX, :STREAM, 0) + sink = StringIO.new + + expect(manager.thread).to be_nil + + manager.add_relay(r, sink: sink, name: 'test') + + expect(manager.thread).to be_a(Thread) + expect(manager.thread.alive?).to be true + + # Close gracefully - write side first + w.close + end + + it 'does not start a new thread if already running' do + r1, w1 = Socket.pair(:UNIX, :STREAM, 0) + r2, w2 = Socket.pair(:UNIX, :STREAM, 0) + sink = StringIO.new + + manager.add_relay(r1, sink: sink, name: 'test1') + first_thread = manager.thread + + manager.add_relay(r2, sink: sink, name: 'test2') + second_thread = manager.thread + + expect(first_thread).to eq(second_thread) + + # Close write sides to trigger EOF, let relays clean up read sides + w1.close + w2.close + end + + it 'relays data from socket to sink with write method' do + r, w = IO.pipe + sink = StringIO.new + + manager.add_relay(r, sink: sink, name: 'test') + + w.write("Hello, World!") + w.close + manager.thread.join(1) + + expect(sink.string).to eq("Hello, World!") + + r.close + end + + it 'relays data from socket to sink with call method' do + r, w = IO.pipe + received_data = [] + sink = proc { |data| received_data << data } + + manager.add_relay(r, sink: sink, name: 'test') + + w.write("Test data") + w.close + manager.thread.join(1) + + expect(received_data.join).to eq("Test data") + + r.close + end + + it 'handles multiple data chunks' do + r, w = IO.pipe + sink = StringIO.new + + manager.add_relay(r, sink: sink, name: 'test') + + w.write("Chunk 1\n") + sleep 0.1 + w.write("Chunk 2\n") + sleep 0.1 + w.write("Chunk 3\n") + w.close + manager.thread.join(1) + + expect(sink.string).to eq("Chunk 1\nChunk 2\nChunk 3\n") + + r.close + end + + it 'calls the on_exit callback when socket closes' do + r, w = IO.pipe + sink = StringIO.new + callback_called = false + on_exit = proc { callback_called = true } + + manager.add_relay(r, sink: sink, name: 'test', on_exit: on_exit) + w.close + manager.thread.join(1) + + expect(callback_called).to be true + end + + it 'closes socket if not already closed' do + r, w = IO.pipe + sink = StringIO.new + + manager.add_relay(r, sink: sink, name: 'test') + w.close + manager.thread.join(1) + + expect(r.closed?).to be true + end + + it 'handles EOFError gracefully' do + r, w = IO.pipe + sink = StringIO.new + + manager.add_relay(r, sink: sink, name: 'test') + w.close + manager.thread.join(1) + + # Should complete without raising error + expect(sink.string).to eq("") + end + + it 'handles already closed socket' do + r, w = IO.pipe + sink = StringIO.new + + r.close + w.close + + expect { + manager.add_relay(r, sink: sink, name: 'test') + manager.thread.join(1) + }.not_to raise_error + end + end + + describe '.io_write_all' do + it 'writes all data to IO in one call' do + io = StringIO.new + data = "Complete data" + + result = described_class.io_write_all(io, data) + + expect(io.string).to eq("Complete data") + expect(result).to eq(data.bytesize) + end + + it 'handles partial writes' do + io = double('io') + data = "0123456789" + + # Simulate partial writes: 3 bytes, then 4 bytes, then 3 bytes + allow(io).to receive(:write).and_return(3, 4, 3) + + result = described_class.io_write_all(io, data) + + expect(io).to have_received(:write).exactly(3).times + expect(result).to eq(10) + end + + it 'writes correct slices on partial writes' do + io = double('io') + data = "ABCDEFGHIJ" + written_data = [] + + allow(io).to receive(:write) do |slice| + written_data << slice + slice.bytesize # Write all provided data + end.and_return(4, 6) + + described_class.io_write_all(io, data) + + expect(written_data).to eq(["ABCDEFGHIJ", "EFGHIJ"]) + end + + it 'handles empty data' do + io = StringIO.new + data = "" + + result = described_class.io_write_all(io, data) + + expect(io.string).to eq("") + expect(result).to eq(0) + end + + it 'handles binary data' do + io = StringIO.new.binmode + data = "\x00\x01\x02\xFF".b + + result = described_class.io_write_all(io, data) + + expect(io.string).to eq(data) + expect(result).to eq(4) + end + end + + describe 'concurrent relays' do + it 'handles multiple concurrent relays' do + pipes = 3.times.map { IO.pipe } + sinks = 3.times.map { StringIO.new } + + pipes.each_with_index do |(r, w), index| + manager.add_relay(r, sink: sinks[index], name: "relay:#{index}") + end + + pipes.each_with_index do |(r, w), index| + w.write("Data from relay #{index}") + w.close + end + + manager.thread.join(1) + + sinks.each_with_index do |sink, index| + expect(sink.string).to eq("Data from relay #{index}") + end + + pipes.each { |r, w| r.close rescue nil } + end + + it 'relays data independently for each relay' do + r1, w1 = IO.pipe + r2, w2 = IO.pipe + sink1 = StringIO.new + sink2 = StringIO.new + + manager.add_relay(r1, sink: sink1, name: 'relay1') + manager.add_relay(r2, sink: sink2, name: 'relay2') + w1.write("First relay") + w2.write("Second relay") + w1.close + w2.close + manager.thread.join(1) + + expect(sink1.string).to eq("First relay") + expect(sink2.string).to eq("Second relay") + end + end + + describe 'error handling' do + it 'raises ArgumentError for unsupported sink type' do + r, w = IO.pipe + unsupported_sink = "not a valid sink" + + manager.add_relay(r, sink: unsupported_sink, name: 'test') + + w.write("data") + w.close + + sleep 1 + + # The error should be caught and logged, relay should stop + expect(r.closed?).to be true + + r.close rescue nil + end + + it 'continues other relays when one fails' do + r1, w1 = IO.pipe + r2, w2 = IO.pipe + bad_sink = "invalid" + good_sink = StringIO.new + + manager.add_relay(r1, sink: bad_sink, name: 'bad') + manager.add_relay(r2, sink: good_sink, name: 'good') + + w1.write("data1") + w2.write("data2") + w1.close + w2.close + manager.thread.join(1) + + # Good relay should still work + expect(good_sink.string).to eq("data2") + end + end + + describe 'large data transfers' do + it 'handles data larger than buffer size' do + r, w = IO.pipe + sink = StringIO.new + large_data = "X" * 100_000 # Larger than 32KB buffer + + manager.add_relay(r, sink: sink, name: 'test') + + w.write(large_data) + w.close + manager.thread.join(1) + + expect(sink.string).to eq(large_data) + end + + it 'relays data in chunks' do + r, w = IO.pipe + chunks_received = [] + sink = proc { |data| chunks_received << data.bytesize } + large_data = "A" * 100_000 + + manager.add_relay(r, sink: sink, name: 'test') + + w.write(large_data) + w.close + manager.thread.join(1) + + # Should have received multiple chunks + expect(chunks_received.size).to be > 1 + expect(chunks_received.sum).to eq(100_000) + end + end + + describe 'fiber scheduler integration' do + it 'sets the fiber scheduler for the thread' do + r, w = IO.pipe + sink = StringIO.new + thread_scheduler = nil + + # Capture the scheduler from inside the thread + allow_any_instance_of(Rex::IO::FiberScheduler).to receive(:run) do + thread_scheduler = Fiber.scheduler + end + + manager.add_relay(r, sink: sink, name: 'test') + w.close + manager.thread.join(1) + + expect(thread_scheduler).to be_a(Rex::IO::FiberScheduler) + end + end + + describe 'callback functionality' do + it 'passes correct data to callable sink' do + r, w = IO.pipe + received_chunks = [] + sink = proc { |data| received_chunks << data } + + manager.add_relay(r, sink: sink, name: 'test') + + w.write("First") + sleep 0.1 + w.write("Second") + w.close + manager.thread.join(1) + + expect(received_chunks).to eq(["First", "Second"]) + end + + it 'calls on_exit even when error occurs' do + r, w = IO.pipe + bad_sink = "invalid" + exit_called = false + on_exit = proc { exit_called = true } + + manager.add_relay(r, sink: bad_sink, name: 'test', on_exit: on_exit) + + w.write("data") + w.close + manager.thread.join(1) + + expect(exit_called).to be true + end + + it 'calls on_exit with normal completion' do + r, w = IO.pipe + sink = StringIO.new + exit_called = false + on_exit = proc { exit_called = true } + + manager.add_relay(r, sink: sink, name: 'test', on_exit: on_exit) + sleep 0.1 + + w.write("data") + w.close + manager.thread.join(1) + + expect(exit_called).to be true + end + end +end \ No newline at end of file