Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions lib/rex/io/fiber_scheduler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
require 'fiber'
require 'io/nonblock'
require 'rex/compat'

module Rex
module IO

# This fiber scheduler is heavily base on
# https://blog.monotone.dev/ruby/2020/12/25/ruby-3-fiber.html
class FiberScheduler
def initialize
@readable = {}
@writable = {}
@waiting = {}
@ready = []
@pending = []
@blocking = 0
@urgent = Rex::Compat.pipe
@mutex = Mutex.new
end

# This allows the fiber to be scheduled in the #run thread from another thread
def schedule_fiber(&block)
@mutex.synchronize do
@pending << block
end
# Wake up the scheduler
@urgent.last.write_nonblock('.') rescue nil
end

def run
while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive? or @ready.any? or @pending.any?
# Start any pending fibers
pending_blocks = []
@mutex.synchronize do
pending_blocks = @pending.dup
@pending.clear
end

pending_blocks.each do |block|
fiber = Fiber.new(blocking: false, &block)
fiber.resume
end

begin
readable, writable = ::IO.select(@readable.keys + [@urgent.first], @writable.keys, [], 0.1)
rescue ::IOError
cleanup_closed_ios
next
end

# Drain the urgent pipe
if readable&.include?(@urgent.first)
@urgent.first.read_nonblock(1024) rescue nil
end

readable&.each do |io|
next if io == @urgent.first

if fiber = @readable.delete(io)
fiber.resume
end
end

writable&.each do |io|
if fiber = @writable.delete(io)
fiber.resume
end
end

@waiting.keys.each do |fiber|
if current_time > @waiting[fiber]
@waiting.delete(fiber)
fiber.resume
end
end

ready, @ready = @ready, []
ready.each do |fiber|
fiber.resume
end
end
end

def io_wait(io, events, timeout)
unless (events & ::IO::READABLE).zero?
@readable[io] = Fiber.current
end
unless (events & ::IO::WRITABLE).zero?
@writable[io] = Fiber.current
end

Fiber.yield
events
end

def kernel_sleep(duration = nil)
block(:sleep, duration)
true
end

def block(blocker, timeout = nil)
if timeout
@waiting[Fiber.current] = current_time + timeout
begin
Fiber.yield
ensure
@waiting.delete(Fiber.current)
end
else
@blocking += 1
begin
Fiber.yield
ensure
@blocking -= 1
end
end
end

def unblock(blocker, fiber)
@ready << fiber
io = @urgent.last
io.write_nonblock('.')
end

def close
run
@urgent.each(&:close)
@urgent = nil
end

def closed?
@urgent.nil?
end

def fiber(&block)
fiber = Fiber.new(blocking: false, &block)
fiber.resume
fiber
end

private

def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

def cleanup_closed_ios
@readable.delete_if { |io, _| io.closed? rescue true }
@writable.delete_if { |io, _| io.closed? rescue true }
@waiting.delete_if { |fiber, _| !fiber || !fiber.alive? rescue true }
end
end

end
end
113 changes: 113 additions & 0 deletions lib/rex/io/relay_manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
require 'rex/io/fiber_scheduler'

module Rex
module IO

# An IO RelayManager which will read data from a socket and write it to a sink
# using a background thread.
class RelayManager
attr_reader :thread

def initialize
@thread = nil
@scheduler = FiberScheduler.new
end

# Add a IO relay to the manager. This will start relaying data from the source
# socket to the destination sink immediately. An optional "on_exit" callback
# can be provided which will be called when the socket is closed.
#
# @param sock [::Socket] The source socket that data will be read from.
# @param sink [#write, #call] A data destination where read data will be sent. It is called
# with one parameter, the data to be transferred. If the object exposes a #write method, it will
# be called repeatedly all data is processed and the return value will be used to determine how
# much of the data was written. If the object exposes a #call method, it will be called once and
# must handle processing all the data it is provided.
# @param name [String] A human-friendly name for the relay used in debug output.
# @param on_exit [#call] A callback to be invoked when sink can no longer be read from.
def add_relay(sock, sink: nil, name: nil, on_exit: nil)
@scheduler.schedule_fiber do
relay_fiber(sock, sink, name, on_exit: on_exit)
end

start unless running?
end

# Write all data to the specified IO. This is intended to be used in scenarios
# where partial writes are possible but not desirable.
#
# @param io [#write] An object to write the data to. It must return a number indicating
# how many bytes of the provided data were processed.
# @param data [String] The data that should be written.
def self.io_write_all(io, data)
offset = 0
while offset < data.bytesize
written = io.write(data.byteslice(offset..-1))
offset += written
end
data.bytesize
end

private

def running?
@thread && @thread.alive?
end

def start
return false if running?

@thread = Thread.new { run }
true
end

def run
old_scheduler = Fiber.scheduler
# A fiber scheduler can be set per-thread
Fiber.set_scheduler(@scheduler)

# Run the scheduler (blocks here)
@scheduler.run
ensure
Fiber.set_scheduler(old_scheduler)
end

def relay_fiber(sock, sink, name, on_exit: nil)
loop do
break if sock.closed?

buf = sock.readpartial(32_768)
write_to_sink(sink, buf)
end
rescue EOFError
nil
rescue => e
message = "#{self.class.name}#relay_fiber(name: #{name}): #{e.class} #{e}"
if defined?(elog) # elog is defined by framework, otherwise use stderr
elog(message, error: e)
else
$stderr.puts message
end
ensure
unless sock.closed?
sock.close rescue nil
end

on_exit.call if on_exit
end

def write_to_sink(sink, data)
if sink.respond_to?(:write)
self.class.io_write_all(sink, data)

elsif sink.respond_to?(:call)
sink.call(data)

else
raise ArgumentError, "Unsupported sink type: #{sink.inspect}"
end
end
end

end
end
Loading
Loading