From 303e22c736aa641bb3c2f3f7ff9d875afd8338e5 Mon Sep 17 00:00:00 2001 From: Raghuram Date: Fri, 29 Apr 2022 13:51:07 -0400 Subject: [PATCH 01/11] updated exchange bindings to fetch x-delayed-type --- lib/event_source/protocols/amqp/bunny_exchange_proxy.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index 1bf62634..8293a3a2 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -28,7 +28,7 @@ def bunny_exchange_for(bindings) channel_proxy.subject, bindings[:type], bindings[:name], - bindings.slice(:durable, :auto_delete, :vhost) + bindings.slice(:durable, :auto_delete, :vhost, :'x-delayed-type') ) exchange.on_return do |return_info, properties, content| logger.error "Got a returned message: #{content} with return info: #{return_info}, properties: #{properties}" From 48d0cc03ab63e7c723cec9101b9f154a6755c0d7 Mon Sep 17 00:00:00 2001 From: Raghuram Date: Fri, 29 Apr 2022 14:22:51 -0400 Subject: [PATCH 02/11] Added debugger for delayed message --- lib/event_source/protocols/amqp/bunny_exchange_proxy.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index 8293a3a2..e3643ef3 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -23,6 +23,8 @@ def initialize(channel_proxy, exchange_bindings) end def bunny_exchange_for(bindings) + + binding.irb if bindings[:name] == 'x-delayed-message' exchange = Bunny::Exchange.new( channel_proxy.subject, From 7f9e1f722e6755e1c99635dc870515a2d88a057f Mon Sep 17 00:00:00 2001 From: Raghuram Date: Fri, 29 Apr 2022 14:28:59 -0400 Subject: [PATCH 03/11] added debug statements --- lib/event_source/protocols/amqp/bunny_exchange_proxy.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index e3643ef3..80d708da 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -23,8 +23,10 @@ def initialize(channel_proxy, exchange_bindings) end def bunny_exchange_for(bindings) - + puts "***************************************************" + puts bindings.inspect binding.irb if bindings[:name] == 'x-delayed-message' + puts "+++++++++++++++++++++++++++++++++++++++++++++++++++" exchange = Bunny::Exchange.new( channel_proxy.subject, From 535834e55217b4f416229d209d6e7125982b6d85 Mon Sep 17 00:00:00 2001 From: Raghuram Date: Fri, 29 Apr 2022 14:32:39 -0400 Subject: [PATCH 04/11] added debug statements --- lib/event_source/protocols/amqp/bunny_exchange_proxy.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index 80d708da..9e751784 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -25,7 +25,7 @@ def initialize(channel_proxy, exchange_bindings) def bunny_exchange_for(bindings) puts "***************************************************" puts bindings.inspect - binding.irb if bindings[:name] == 'x-delayed-message' + binding.irb if bindings[:type] == "x-delayed-message" puts "+++++++++++++++++++++++++++++++++++++++++++++++++++" exchange = Bunny::Exchange.new( From 65c53545391eac0006cbfea2e904f53678a1f272 Mon Sep 17 00:00:00 2001 From: Raghuram Date: Fri, 29 Apr 2022 14:57:53 -0400 Subject: [PATCH 05/11] Fixed exchange params for delayed-message type --- lib/event_source/protocols/amqp/bunny_exchange_proxy.rb | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index 9e751784..83c41e11 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -23,16 +23,14 @@ def initialize(channel_proxy, exchange_bindings) end def bunny_exchange_for(bindings) - puts "***************************************************" - puts bindings.inspect - binding.irb if bindings[:type] == "x-delayed-message" - puts "+++++++++++++++++++++++++++++++++++++++++++++++++++" + opts = bindings.slice(:durable, :auto_delete, :vhost) + opts[:arguments] = bindings.slice(:'x-delayed-type') if bindings[:'x-delayed-type'] exchange = Bunny::Exchange.new( channel_proxy.subject, bindings[:type], bindings[:name], - bindings.slice(:durable, :auto_delete, :vhost, :'x-delayed-type') + opts ) exchange.on_return do |return_info, properties, content| logger.error "Got a returned message: #{content} with return info: #{return_info}, properties: #{properties}" From 6b907f74198c12696c6b68b6bc1dddde96734254 Mon Sep 17 00:00:00 2001 From: Raghuram Date: Wed, 4 May 2022 11:46:24 -0400 Subject: [PATCH 06/11] Load configuration for delayed queue --- lib/event_source/configure/servers.rb | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/lib/event_source/configure/servers.rb b/lib/event_source/configure/servers.rb index 0f14a865..f2dd6ad0 100644 --- a/lib/event_source/configure/servers.rb +++ b/lib/event_source/configure/servers.rb @@ -19,6 +19,13 @@ def to_h end end + DelayConfiguration = Struct.new(:retry_delay, :retry_limit, :retry_exceptions, :event_name, :call_location) do + def to_h + attribute_hash = super() + attribute_hash.compact + end + end + ClientCertificateConfiguration = Struct.new(:client_certificate, :client_key, :client_key_password, :call_location) do def to_h attribute_hash = super() @@ -44,6 +51,13 @@ def client_certificate self.client_certificate_settings = cc_settings end + def delayed_queue + delay_settings = DelayConfiguration.new + delay_settings.call_location = caller(1) + yield(delay_settings) + self.delayed_queue_settings = delay_settings + end + def soap? soap_settings.present? end From 30a8e6ec71bee2d66caf72283840e7f5304a5d15 Mon Sep 17 00:00:00 2001 From: Raghuram Date: Thu, 5 May 2022 13:10:27 -0400 Subject: [PATCH 07/11] Enhanced event source with delay message options and handlers --- lib/event_source.rb | 2 + lib/event_source/configure/servers.rb | 6 +- .../operations/delayed_message_handler.rb | 36 ++++++++++++ .../operations/enqueue_delayed_message.rb | 56 +++++++++++++++++++ .../http/faraday_connection_proxy.rb | 4 ++ .../protocols/http/faraday_request_proxy.rb | 10 ++++ 6 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 lib/event_source/operations/delayed_message_handler.rb create mode 100644 lib/event_source/operations/enqueue_delayed_message.rb diff --git a/lib/event_source.rb b/lib/event_source.rb index 6a1bfbb3..0260c0c8 100644 --- a/lib/event_source.rb +++ b/lib/event_source.rb @@ -35,6 +35,8 @@ require 'event_source/event' require 'event_source/subscriber' require 'event_source/operations/codec64' +require 'event_source/operations/enqueue_delayed_message' +require 'event_source/operations/delayed_message_handler' # Event source provides ability to compose, publish and subscribe to events module EventSource diff --git a/lib/event_source/configure/servers.rb b/lib/event_source/configure/servers.rb index f2dd6ad0..3a9cc9f9 100644 --- a/lib/event_source/configure/servers.rb +++ b/lib/event_source/configure/servers.rb @@ -19,7 +19,7 @@ def to_h end end - DelayConfiguration = Struct.new(:retry_delay, :retry_limit, :retry_exceptions, :event_name, :call_location) do + DelayConfiguration = Struct.new(:retry_delay, :retry_limit, :retry_exceptions, :event_name, :publisher, :call_location) do def to_h attribute_hash = super() attribute_hash.compact @@ -36,7 +36,7 @@ def to_h AmqpConfiguration = Struct.new(:protocol, :ref, :host, :vhost, :port, :url, :user_name, :password, :call_location, :default_content_type) HttpConfiguration = Struct.new(:protocol, :ref, :host, :port, :url, :user_name, :password, :soap_settings, :client_certificate_settings, - :call_location, :default_content_type) do + :delayed_queue_settings, :call_location, :default_content_type) do def soap s_settings = SoapConfiguration.new s_settings.call_location = caller(1) @@ -71,8 +71,10 @@ def to_h main_hash = attribute_hash.compact main_hash.delete(:soap_settings) main_hash.delete(:client_certificate_settings) + main_hash.delete(:delayed_queue_settings) main_hash = main_hash.merge({ soap: soap_settings.to_h }) if soap? main_hash = main_hash.merge({ client_certificate: client_certificate_settings.to_h }) if client_certificate_settings + main_hash = main_hash.merge({ delayed_queue: delayed_queue_settings.to_h }) if delayed_queue_settings main_hash end end diff --git a/lib/event_source/operations/delayed_message_handler.rb b/lib/event_source/operations/delayed_message_handler.rb new file mode 100644 index 00000000..42d2f35d --- /dev/null +++ b/lib/event_source/operations/delayed_message_handler.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +require 'dry/monads' +require 'dry/monads/do' + +module EventSource + module Operations + class DelayedMessageHandler + include Dry::Monads[:result, :do] + include EventSource::Command + include EventSource::Logging + + def call(payload, metadata) + headers = yield fetch_headers(metadata) + result = yield execute(payload, headers) + + Success(result) + end + + private + + def fetch_headers(metadata) + headers = metadata[:headers].symbolize_keys + + Success(headers) + end + + def execute(payload, headers) + headers[:retry_limit] = (headers[:retry_limit].to_i - 1) + headers[:publisher].constantize.new.call(payload, {delay_options: headers}) + + Success(true) + end + end + end +end diff --git a/lib/event_source/operations/enqueue_delayed_message.rb b/lib/event_source/operations/enqueue_delayed_message.rb new file mode 100644 index 00000000..4ede7c26 --- /dev/null +++ b/lib/event_source/operations/enqueue_delayed_message.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require 'dry/monads' +require 'dry/monads/do' + +module EventSource + module Operations + class EnqueueDelayedMessage + include Dry::Monads[:result, :do] + include EventSource::Command + include EventSource::Logging + + def call(params) + delay_options = yield validate_retry_limit(params[:delay_options]) + event = yield build_event(params[:payload], delay_options) + result = yield publish_message(event) + + Success(result) + end + + private + + def validate_retry_limit(delay_options) + if delay_options[:retry_limit] <= 0 + logger.info("Enqueue Delayed message failed, due to remaining retry count is #{delay_options[:retry_limit]}") + Failure("retry limit reached. enqueue failed!!") + else + Success(delay_options) + end + end + + def build_event(payload, delay_options) + result = event(delay_options[:event_name], { + attributes: {payload: payload}, + headers: delay_options.except(:call_location, :retry_exceptions) + .merge(:'x-delay' => delay_options[:retry_delay]) + }) + + unless Rails.env.test? + logger.info('-' * 100) + logger.info( + "Delayed message publish to AMQP, + event_key: #{delay_options[:event_name]}, attributes: #{payload}, result: #{result}" + ) + logger.info('-' * 100) + end + + result + end + + def publish_message(event) + Success(event.publish) + end + end + end +end diff --git a/lib/event_source/protocols/http/faraday_connection_proxy.rb b/lib/event_source/protocols/http/faraday_connection_proxy.rb index cb5c4a23..ed58592e 100644 --- a/lib/event_source/protocols/http/faraday_connection_proxy.rb +++ b/lib/event_source/protocols/http/faraday_connection_proxy.rb @@ -306,6 +306,10 @@ def request_content_type :xml end end + + def server + @server + end end end end diff --git a/lib/event_source/protocols/http/faraday_request_proxy.rb b/lib/event_source/protocols/http/faraday_request_proxy.rb index 2575d4db..ca5a668d 100644 --- a/lib/event_source/protocols/http/faraday_request_proxy.rb +++ b/lib/event_source/protocols/http/faraday_request_proxy.rb @@ -74,6 +74,7 @@ def initialize(channel_proxy, async_api_channel_item) # @param [Hash] publish_bindings AsyncAPI HTTP message bindings # @return [Faraday::Response] response def publish(payload: nil, publish_bindings: {}, headers: {}) + delay_message_options = delay_message_options(headers.delete(:delay_options) || {}) faraday_publish_bindings = sanitize_bindings(publish_bindings) faraday_publish_bindings[:headers] = (faraday_publish_bindings[:headers] || {}).merge(headers) text_payload = @@ -96,6 +97,15 @@ def publish(payload: nil, publish_bindings: {}, headers: {}) @channel_proxy.enqueue(response) logger.debug 'FaradayRequest#publish response enqueued.' response + + rescue *delay_message_options[:retry_exceptions] => e + EventSource::Operations::EnqueueDelayedMessage.new.call(payload: payload, proxy: self, delay_options: delay_message_options) + end + + def delay_message_options(delay_options) + channel_proxy = self.channel_proxy + connection_proxy = channel_proxy.connection_proxy + options = connection_proxy.server.to_h[:delayed_queue].merge(delay_options) end def attach_payload_correlation_id(response, text_payload, payload) From e3e92b772fa4d64cd9b865af9370f8990c681cd3 Mon Sep 17 00:00:00 2001 From: Raghuram Date: Tue, 10 May 2022 11:19:25 -0400 Subject: [PATCH 08/11] removed server configuration changes --- lib/event_source/configure/servers.rb | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/lib/event_source/configure/servers.rb b/lib/event_source/configure/servers.rb index 3a9cc9f9..0f14a865 100644 --- a/lib/event_source/configure/servers.rb +++ b/lib/event_source/configure/servers.rb @@ -19,13 +19,6 @@ def to_h end end - DelayConfiguration = Struct.new(:retry_delay, :retry_limit, :retry_exceptions, :event_name, :publisher, :call_location) do - def to_h - attribute_hash = super() - attribute_hash.compact - end - end - ClientCertificateConfiguration = Struct.new(:client_certificate, :client_key, :client_key_password, :call_location) do def to_h attribute_hash = super() @@ -36,7 +29,7 @@ def to_h AmqpConfiguration = Struct.new(:protocol, :ref, :host, :vhost, :port, :url, :user_name, :password, :call_location, :default_content_type) HttpConfiguration = Struct.new(:protocol, :ref, :host, :port, :url, :user_name, :password, :soap_settings, :client_certificate_settings, - :delayed_queue_settings, :call_location, :default_content_type) do + :call_location, :default_content_type) do def soap s_settings = SoapConfiguration.new s_settings.call_location = caller(1) @@ -51,13 +44,6 @@ def client_certificate self.client_certificate_settings = cc_settings end - def delayed_queue - delay_settings = DelayConfiguration.new - delay_settings.call_location = caller(1) - yield(delay_settings) - self.delayed_queue_settings = delay_settings - end - def soap? soap_settings.present? end @@ -71,10 +57,8 @@ def to_h main_hash = attribute_hash.compact main_hash.delete(:soap_settings) main_hash.delete(:client_certificate_settings) - main_hash.delete(:delayed_queue_settings) main_hash = main_hash.merge({ soap: soap_settings.to_h }) if soap? main_hash = main_hash.merge({ client_certificate: client_certificate_settings.to_h }) if client_certificate_settings - main_hash = main_hash.merge({ delayed_queue: delayed_queue_settings.to_h }) if delayed_queue_settings main_hash end end From 915667ba2519e73283f7ecae174e6fa0f97085e8 Mon Sep 17 00:00:00 2001 From: Raghuram Date: Tue, 10 May 2022 11:21:27 -0400 Subject: [PATCH 09/11] Updated delayed message enqueue and handlers to support retry_count --- lib/event_source/operations/delayed_message_handler.rb | 5 +++-- lib/event_source/operations/enqueue_delayed_message.rb | 8 ++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/event_source/operations/delayed_message_handler.rb b/lib/event_source/operations/delayed_message_handler.rb index 42d2f35d..734fe1a6 100644 --- a/lib/event_source/operations/delayed_message_handler.rb +++ b/lib/event_source/operations/delayed_message_handler.rb @@ -25,9 +25,10 @@ def fetch_headers(metadata) Success(headers) end + # Add retry_count def execute(payload, headers) - headers[:retry_limit] = (headers[:retry_limit].to_i - 1) - headers[:publisher].constantize.new.call(payload, {delay_options: headers}) + headers[:retry_count] += 1 + headers[:retry_service].constantize.new.call(payload, {delay_options: headers}) Success(true) end diff --git a/lib/event_source/operations/enqueue_delayed_message.rb b/lib/event_source/operations/enqueue_delayed_message.rb index 4ede7c26..94f28ad6 100644 --- a/lib/event_source/operations/enqueue_delayed_message.rb +++ b/lib/event_source/operations/enqueue_delayed_message.rb @@ -11,8 +11,8 @@ class EnqueueDelayedMessage include EventSource::Logging def call(params) - delay_options = yield validate_retry_limit(params[:delay_options]) - event = yield build_event(params[:payload], delay_options) + delay_options = yield validate_retry_count(params[:delay_options]) + event = yield build_event(params[:payload], delay_options) result = yield publish_message(event) Success(result) @@ -20,8 +20,8 @@ def call(params) private - def validate_retry_limit(delay_options) - if delay_options[:retry_limit] <= 0 + def validate_retry_count(delay_options) + if delay_options[:retry_count] > delay_options[:retry_limit] logger.info("Enqueue Delayed message failed, due to remaining retry count is #{delay_options[:retry_limit]}") Failure("retry limit reached. enqueue failed!!") else From 306fa15fc4933a9ac33b2293d16f928fb71cbc6c Mon Sep 17 00:00:00 2001 From: Raghuram Date: Tue, 10 May 2022 11:23:03 -0400 Subject: [PATCH 10/11] updated publisher to accept additional options --- lib/event_source/publisher.rb | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/lib/event_source/publisher.rb b/lib/event_source/publisher.rb index 5032a0b1..c4ac5d8c 100644 --- a/lib/event_source/publisher.rb +++ b/lib/event_source/publisher.rb @@ -14,7 +14,7 @@ class Publisher < Module # publisher (for example: amqp) # @attr_reader [String] exchange name of the Exchange where event # messages are published - attr_reader :protocol, :publisher_key + attr_reader :protocol, :publisher_key, :options # Internal publisher registry, which is used to identify them globally # @@ -26,23 +26,25 @@ def self.publisher_container @publisher_container ||= Concurrent::Map.new end - def self.[](exchange_ref) + def self.[](exchange_ref, **options) # TODO: validate publisher already exists # raise EventSource::Error::PublisherAlreadyRegisteredError.new(id) if registry.key?(id) - new(exchange_ref.first[0], exchange_ref.first[1]) + new(exchange_ref.first[0], exchange_ref.first[1], options) end # @api private - def initialize(protocol, publisher_key) + def initialize(protocol, publisher_key, options) super() @protocol = protocol @publisher_key = publisher_key + @options = options[:options] || {} end def included(base) self.class.publisher_container[base] = { publisher_key: publisher_key, - protocol: protocol + protocol: protocol, + options: options } base.extend(ClassMethods) @@ -66,7 +68,7 @@ def publish(event) logger.debug "Publisher#publish publish_operation_name: #{publish_operation_name}" publish_operation = find_publish_operation_for(publish_operation_name) - publish_operation.call(event.payload, {headers: event.headers}) + publish_operation.call(event.payload, {headers: event.headers.merge(publish_options: options)}) end def channel_name @@ -124,6 +126,10 @@ def protocol EventSource::Publisher.publisher_container[self][:protocol] end + def options + EventSource::Publisher.publisher_container[self][:options] + end + def logger EventSourceLogger.new.logger end From 41a12607ca510192ba7f4fb555a66fe305cdede4 Mon Sep 17 00:00:00 2001 From: Raghuram Date: Tue, 10 May 2022 11:25:03 -0400 Subject: [PATCH 11/11] updated faraday request and bunny exechange proxies to handle publish_options inside headers --- .../protocols/amqp/bunny_exchange_proxy.rb | 1 + .../protocols/http/faraday_request_proxy.rb | 28 ++++++++++++++----- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index 83c41e11..70bb79ac 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -45,6 +45,7 @@ def bunny_exchange_for(bindings) # @param [Hash] publish_bindings # @param [Hash] headers def publish(payload:, publish_bindings:, headers: {}) + publish_options = headers.delete(:publish_options) || {} bunny_publish_bindings = sanitize_bindings((publish_bindings || {}).to_h) bunny_publish_bindings[:correlation_id] = headers.delete(:correlation_id) if headers[:correlation_id] bunny_publish_bindings[:headers] = headers unless headers.empty? diff --git a/lib/event_source/protocols/http/faraday_request_proxy.rb b/lib/event_source/protocols/http/faraday_request_proxy.rb index ca5a668d..aae2a03e 100644 --- a/lib/event_source/protocols/http/faraday_request_proxy.rb +++ b/lib/event_source/protocols/http/faraday_request_proxy.rb @@ -41,6 +41,12 @@ class FaradayRequestProxy attr_reader :channel_proxy, :subject, :name, :channel_item, :connection + PUBLISH_OPTION_DEFAULTS = { + interval: 1000, + max: 3, + retry_exceptions: [StandardError, SystemStackError, Faraday::TimeoutError].freeze + } + # @param channel_proxy [EventSource::Protocols::Http::FaradayChannelProxy] Http Channel proxy # @param async_api_channel_item [Hash] channel_bindings Channel definition and bindings # @return [Faraday::Request] @@ -74,7 +80,7 @@ def initialize(channel_proxy, async_api_channel_item) # @param [Hash] publish_bindings AsyncAPI HTTP message bindings # @return [Faraday::Response] response def publish(payload: nil, publish_bindings: {}, headers: {}) - delay_message_options = delay_message_options(headers.delete(:delay_options) || {}) + delay_message_options = delay_message_options_for(headers) faraday_publish_bindings = sanitize_bindings(publish_bindings) faraday_publish_bindings[:headers] = (faraday_publish_bindings[:headers] || {}).merge(headers) text_payload = @@ -97,15 +103,23 @@ def publish(payload: nil, publish_bindings: {}, headers: {}) @channel_proxy.enqueue(response) logger.debug 'FaradayRequest#publish response enqueued.' response - rescue *delay_message_options[:retry_exceptions] => e - EventSource::Operations::EnqueueDelayedMessage.new.call(payload: payload, proxy: self, delay_options: delay_message_options) + if delay_message_options[:event_name] + EventSource::Operations::EnqueueDelayedMessage.new.call(payload: payload, proxy: self, delay_options: delay_message_options) + end end - def delay_message_options(delay_options) - channel_proxy = self.channel_proxy - connection_proxy = channel_proxy.connection_proxy - options = connection_proxy.server.to_h[:delayed_queue].merge(delay_options) + def delay_message_options_for(headers) + options = headers.delete(:publish_options) || {} + delay_defaults = PUBLISH_OPTION_DEFAULTS.merge(options[:retry_delay] || {}) + message_delay_defaults = { + :'x-delay' => delay_defaults[:interval], + :retry_limit => delay_defaults[:max], + :retry_count => 0 + }.merge(delay_default.slice(:event_name, :retry_service, :retry_exceptions)) + + message_delay_options = headers.delete(:delay_options) + message_delay_defaults.merge(message_delay_options) end def attach_payload_correlation_id(response, text_payload, payload)