diff --git a/lib/event_source.rb b/lib/event_source.rb index 6a1bfbb3..0260c0c8 100644 --- a/lib/event_source.rb +++ b/lib/event_source.rb @@ -35,6 +35,8 @@ require 'event_source/event' require 'event_source/subscriber' require 'event_source/operations/codec64' +require 'event_source/operations/enqueue_delayed_message' +require 'event_source/operations/delayed_message_handler' # Event source provides ability to compose, publish and subscribe to events module EventSource diff --git a/lib/event_source/operations/delayed_message_handler.rb b/lib/event_source/operations/delayed_message_handler.rb new file mode 100644 index 00000000..734fe1a6 --- /dev/null +++ b/lib/event_source/operations/delayed_message_handler.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require 'dry/monads' +require 'dry/monads/do' + +module EventSource + module Operations + class DelayedMessageHandler + include Dry::Monads[:result, :do] + include EventSource::Command + include EventSource::Logging + + def call(payload, metadata) + headers = yield fetch_headers(metadata) + result = yield execute(payload, headers) + + Success(result) + end + + private + + def fetch_headers(metadata) + headers = metadata[:headers].symbolize_keys + + Success(headers) + end + + # Add retry_count + def execute(payload, headers) + headers[:retry_count] += 1 + headers[:retry_service].constantize.new.call(payload, {delay_options: headers}) + + Success(true) + end + end + end +end diff --git a/lib/event_source/operations/enqueue_delayed_message.rb b/lib/event_source/operations/enqueue_delayed_message.rb new file mode 100644 index 00000000..94f28ad6 --- /dev/null +++ b/lib/event_source/operations/enqueue_delayed_message.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require 'dry/monads' +require 'dry/monads/do' + +module EventSource + module Operations + class EnqueueDelayedMessage + include Dry::Monads[:result, :do] + include EventSource::Command + include EventSource::Logging + + def call(params) + delay_options = yield validate_retry_count(params[:delay_options]) + event = yield build_event(params[:payload], delay_options) + result = yield publish_message(event) + + Success(result) + end + + private + + def validate_retry_count(delay_options) + if delay_options[:retry_count] > delay_options[:retry_limit] + logger.info("Enqueue Delayed message failed, due to remaining retry count is #{delay_options[:retry_limit]}") + Failure("retry limit reached. enqueue failed!!") + else + Success(delay_options) + end + end + + def build_event(payload, delay_options) + result = event(delay_options[:event_name], { + attributes: {payload: payload}, + headers: delay_options.except(:call_location, :retry_exceptions) + .merge(:'x-delay' => delay_options[:retry_delay]) + }) + + unless Rails.env.test? + logger.info('-' * 100) + logger.info( + "Delayed message publish to AMQP, + event_key: #{delay_options[:event_name]}, attributes: #{payload}, result: #{result}" + ) + logger.info('-' * 100) + end + + result + end + + def publish_message(event) + Success(event.publish) + end + end + end +end diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index 1bf62634..70bb79ac 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -23,12 +23,14 @@ def initialize(channel_proxy, exchange_bindings) end def bunny_exchange_for(bindings) + opts = bindings.slice(:durable, :auto_delete, :vhost) + opts[:arguments] = bindings.slice(:'x-delayed-type') if bindings[:'x-delayed-type'] exchange = Bunny::Exchange.new( channel_proxy.subject, bindings[:type], bindings[:name], - bindings.slice(:durable, :auto_delete, :vhost) + opts ) exchange.on_return do |return_info, properties, content| logger.error "Got a returned message: #{content} with return info: #{return_info}, properties: #{properties}" @@ -43,6 +45,7 @@ def bunny_exchange_for(bindings) # @param [Hash] publish_bindings # @param [Hash] headers def publish(payload:, publish_bindings:, headers: {}) + publish_options = headers.delete(:publish_options) || {} bunny_publish_bindings = sanitize_bindings((publish_bindings || {}).to_h) bunny_publish_bindings[:correlation_id] = headers.delete(:correlation_id) if headers[:correlation_id] bunny_publish_bindings[:headers] = headers unless headers.empty? diff --git a/lib/event_source/protocols/http/faraday_connection_proxy.rb b/lib/event_source/protocols/http/faraday_connection_proxy.rb index cb5c4a23..ed58592e 100644 --- a/lib/event_source/protocols/http/faraday_connection_proxy.rb +++ b/lib/event_source/protocols/http/faraday_connection_proxy.rb @@ -306,6 +306,10 @@ def request_content_type :xml end end + + def server + @server + end end end end diff --git a/lib/event_source/protocols/http/faraday_request_proxy.rb b/lib/event_source/protocols/http/faraday_request_proxy.rb index 2575d4db..aae2a03e 100644 --- a/lib/event_source/protocols/http/faraday_request_proxy.rb +++ b/lib/event_source/protocols/http/faraday_request_proxy.rb @@ -41,6 +41,12 @@ class FaradayRequestProxy attr_reader :channel_proxy, :subject, :name, :channel_item, :connection + PUBLISH_OPTION_DEFAULTS = { + interval: 1000, + max: 3, + retry_exceptions: [StandardError, SystemStackError, Faraday::TimeoutError].freeze + } + # @param channel_proxy [EventSource::Protocols::Http::FaradayChannelProxy] Http Channel proxy # @param async_api_channel_item [Hash] channel_bindings Channel definition and bindings # @return [Faraday::Request] @@ -74,6 +80,7 @@ def initialize(channel_proxy, async_api_channel_item) # @param [Hash] publish_bindings AsyncAPI HTTP message bindings # @return [Faraday::Response] response def publish(payload: nil, publish_bindings: {}, headers: {}) + delay_message_options = delay_message_options_for(headers) faraday_publish_bindings = sanitize_bindings(publish_bindings) faraday_publish_bindings[:headers] = (faraday_publish_bindings[:headers] || {}).merge(headers) text_payload = @@ -96,6 +103,23 @@ def publish(payload: nil, publish_bindings: {}, headers: {}) @channel_proxy.enqueue(response) logger.debug 'FaradayRequest#publish response enqueued.' response + rescue *delay_message_options[:retry_exceptions] => e + if delay_message_options[:event_name] + EventSource::Operations::EnqueueDelayedMessage.new.call(payload: payload, proxy: self, delay_options: delay_message_options) + end + end + + def delay_message_options_for(headers) + options = headers.delete(:publish_options) || {} + delay_defaults = PUBLISH_OPTION_DEFAULTS.merge(options[:retry_delay] || {}) + message_delay_defaults = { + :'x-delay' => delay_defaults[:interval], + :retry_limit => delay_defaults[:max], + :retry_count => 0 + }.merge(delay_default.slice(:event_name, :retry_service, :retry_exceptions)) + + message_delay_options = headers.delete(:delay_options) + message_delay_defaults.merge(message_delay_options) end def attach_payload_correlation_id(response, text_payload, payload) diff --git a/lib/event_source/publisher.rb b/lib/event_source/publisher.rb index 5032a0b1..c4ac5d8c 100644 --- a/lib/event_source/publisher.rb +++ b/lib/event_source/publisher.rb @@ -14,7 +14,7 @@ class Publisher < Module # publisher (for example: amqp) # @attr_reader [String] exchange name of the Exchange where event # messages are published - attr_reader :protocol, :publisher_key + attr_reader :protocol, :publisher_key, :options # Internal publisher registry, which is used to identify them globally # @@ -26,23 +26,25 @@ def self.publisher_container @publisher_container ||= Concurrent::Map.new end - def self.[](exchange_ref) + def self.[](exchange_ref, **options) # TODO: validate publisher already exists # raise EventSource::Error::PublisherAlreadyRegisteredError.new(id) if registry.key?(id) - new(exchange_ref.first[0], exchange_ref.first[1]) + new(exchange_ref.first[0], exchange_ref.first[1], options) end # @api private - def initialize(protocol, publisher_key) + def initialize(protocol, publisher_key, options) super() @protocol = protocol @publisher_key = publisher_key + @options = options[:options] || {} end def included(base) self.class.publisher_container[base] = { publisher_key: publisher_key, - protocol: protocol + protocol: protocol, + options: options } base.extend(ClassMethods) @@ -66,7 +68,7 @@ def publish(event) logger.debug "Publisher#publish publish_operation_name: #{publish_operation_name}" publish_operation = find_publish_operation_for(publish_operation_name) - publish_operation.call(event.payload, {headers: event.headers}) + publish_operation.call(event.payload, {headers: event.headers.merge(publish_options: options)}) end def channel_name @@ -124,6 +126,10 @@ def protocol EventSource::Publisher.publisher_container[self][:protocol] end + def options + EventSource::Publisher.publisher_container[self][:options] + end + def logger EventSourceLogger.new.logger end