Skip to content
Open
2 changes: 2 additions & 0 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
require 'event_source/event'
require 'event_source/subscriber'
require 'event_source/operations/codec64'
require 'event_source/operations/enqueue_delayed_message'
require 'event_source/operations/delayed_message_handler'

# Event source provides ability to compose, publish and subscribe to events
module EventSource
Expand Down
37 changes: 37 additions & 0 deletions lib/event_source/operations/delayed_message_handler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

require 'dry/monads'
require 'dry/monads/do'

module EventSource
module Operations
class DelayedMessageHandler
include Dry::Monads[:result, :do]
include EventSource::Command
include EventSource::Logging

def call(payload, metadata)
headers = yield fetch_headers(metadata)
result = yield execute(payload, headers)

Success(result)
end

private

def fetch_headers(metadata)
headers = metadata[:headers].symbolize_keys

Success(headers)
end

# Add retry_count
def execute(payload, headers)
headers[:retry_count] += 1
headers[:retry_service].constantize.new.call(payload, {delay_options: headers})

Success(true)
end
end
end
end
56 changes: 56 additions & 0 deletions lib/event_source/operations/enqueue_delayed_message.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# frozen_string_literal: true

require 'dry/monads'
require 'dry/monads/do'

module EventSource
module Operations
class EnqueueDelayedMessage
include Dry::Monads[:result, :do]
include EventSource::Command
include EventSource::Logging

def call(params)
delay_options = yield validate_retry_count(params[:delay_options])
event = yield build_event(params[:payload], delay_options)
result = yield publish_message(event)

Success(result)
end

private

def validate_retry_count(delay_options)
if delay_options[:retry_count] > delay_options[:retry_limit]
logger.info("Enqueue Delayed message failed, due to remaining retry count is #{delay_options[:retry_limit]}")
Failure("retry limit reached. enqueue failed!!")
else
Success(delay_options)
end
end

def build_event(payload, delay_options)
result = event(delay_options[:event_name], {
attributes: {payload: payload},
headers: delay_options.except(:call_location, :retry_exceptions)
.merge(:'x-delay' => delay_options[:retry_delay])
})

unless Rails.env.test?
logger.info('-' * 100)
logger.info(
"Delayed message publish to AMQP,
event_key: #{delay_options[:event_name]}, attributes: #{payload}, result: #{result}"
)
logger.info('-' * 100)
end

result
end

def publish_message(event)
Success(event.publish)
end
end
end
end
5 changes: 4 additions & 1 deletion lib/event_source/protocols/amqp/bunny_exchange_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ def initialize(channel_proxy, exchange_bindings)
end

def bunny_exchange_for(bindings)
opts = bindings.slice(:durable, :auto_delete, :vhost)
opts[:arguments] = bindings.slice(:'x-delayed-type') if bindings[:'x-delayed-type']
exchange =
Bunny::Exchange.new(
channel_proxy.subject,
bindings[:type],
bindings[:name],
bindings.slice(:durable, :auto_delete, :vhost)
opts
)
exchange.on_return do |return_info, properties, content|
logger.error "Got a returned message: #{content} with return info: #{return_info}, properties: #{properties}"
Expand All @@ -43,6 +45,7 @@ def bunny_exchange_for(bindings)
# @param [Hash] publish_bindings
# @param [Hash] headers
def publish(payload:, publish_bindings:, headers: {})
publish_options = headers.delete(:publish_options) || {}
bunny_publish_bindings = sanitize_bindings((publish_bindings || {}).to_h)
bunny_publish_bindings[:correlation_id] = headers.delete(:correlation_id) if headers[:correlation_id]
bunny_publish_bindings[:headers] = headers unless headers.empty?
Expand Down
4 changes: 4 additions & 0 deletions lib/event_source/protocols/http/faraday_connection_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ def request_content_type
:xml
end
end

def server
@server
end
end
end
end
Expand Down
24 changes: 24 additions & 0 deletions lib/event_source/protocols/http/faraday_request_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ class FaradayRequestProxy

attr_reader :channel_proxy, :subject, :name, :channel_item, :connection

PUBLISH_OPTION_DEFAULTS = {
interval: 1000,
max: 3,
retry_exceptions: [StandardError, SystemStackError, Faraday::TimeoutError].freeze
}

# @param channel_proxy [EventSource::Protocols::Http::FaradayChannelProxy] Http Channel proxy
# @param async_api_channel_item [Hash] channel_bindings Channel definition and bindings
# @return [Faraday::Request]
Expand Down Expand Up @@ -74,6 +80,7 @@ def initialize(channel_proxy, async_api_channel_item)
# @param [Hash] publish_bindings AsyncAPI HTTP message bindings
# @return [Faraday::Response] response
def publish(payload: nil, publish_bindings: {}, headers: {})
delay_message_options = delay_message_options_for(headers)
faraday_publish_bindings = sanitize_bindings(publish_bindings)
faraday_publish_bindings[:headers] = (faraday_publish_bindings[:headers] || {}).merge(headers)
text_payload =
Expand All @@ -96,6 +103,23 @@ def publish(payload: nil, publish_bindings: {}, headers: {})
@channel_proxy.enqueue(response)
logger.debug 'FaradayRequest#publish response enqueued.'
response
rescue *delay_message_options[:retry_exceptions] => e
if delay_message_options[:event_name]
EventSource::Operations::EnqueueDelayedMessage.new.call(payload: payload, proxy: self, delay_options: delay_message_options)
end
end

def delay_message_options_for(headers)
options = headers.delete(:publish_options) || {}
delay_defaults = PUBLISH_OPTION_DEFAULTS.merge(options[:retry_delay] || {})
message_delay_defaults = {
:'x-delay' => delay_defaults[:interval],
:retry_limit => delay_defaults[:max],
:retry_count => 0
}.merge(delay_default.slice(:event_name, :retry_service, :retry_exceptions))

message_delay_options = headers.delete(:delay_options)
message_delay_defaults.merge(message_delay_options)
end

def attach_payload_correlation_id(response, text_payload, payload)
Expand Down
18 changes: 12 additions & 6 deletions lib/event_source/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Publisher < Module
# publisher (for example: amqp)
# @attr_reader [String] exchange name of the Exchange where event
# messages are published
attr_reader :protocol, :publisher_key
attr_reader :protocol, :publisher_key, :options

# Internal publisher registry, which is used to identify them globally
#
Expand All @@ -26,23 +26,25 @@ def self.publisher_container
@publisher_container ||= Concurrent::Map.new
end

def self.[](exchange_ref)
def self.[](exchange_ref, **options)
# TODO: validate publisher already exists
# raise EventSource::Error::PublisherAlreadyRegisteredError.new(id) if registry.key?(id)
new(exchange_ref.first[0], exchange_ref.first[1])
new(exchange_ref.first[0], exchange_ref.first[1], options)
end

# @api private
def initialize(protocol, publisher_key)
def initialize(protocol, publisher_key, options)
super()
@protocol = protocol
@publisher_key = publisher_key
@options = options[:options] || {}
end

def included(base)
self.class.publisher_container[base] = {
publisher_key: publisher_key,
protocol: protocol
protocol: protocol,
options: options
}
base.extend(ClassMethods)

Expand All @@ -66,7 +68,7 @@ def publish(event)

logger.debug "Publisher#publish publish_operation_name: #{publish_operation_name}"
publish_operation = find_publish_operation_for(publish_operation_name)
publish_operation.call(event.payload, {headers: event.headers})
publish_operation.call(event.payload, {headers: event.headers.merge(publish_options: options)})
end

def channel_name
Expand Down Expand Up @@ -124,6 +126,10 @@ def protocol
EventSource::Publisher.publisher_container[self][:protocol]
end

def options
EventSource::Publisher.publisher_container[self][:options]
end

def logger
EventSourceLogger.new.logger
end
Expand Down