From cb3f652853b29935844c42374a2d692b747d4636 Mon Sep 17 00:00:00 2001 From: Salmaan Pehlari Date: Thu, 8 Mar 2018 10:03:48 -0800 Subject: [PATCH 1/4] add setex and zrangebyscore --- lib/logstash/filters/cache_redis.rb | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/lib/logstash/filters/cache_redis.rb b/lib/logstash/filters/cache_redis.rb index 496fb5d..6e5858b 100644 --- a/lib/logstash/filters/cache_redis.rb +++ b/lib/logstash/filters/cache_redis.rb @@ -19,6 +19,13 @@ class LogStash::Filters::CacheRedis < LogStash::Filters::Base # For now only working for rpushnx and llen! config :cmd_key_is_formatted, :validate => :boolean, :default => false + + # expire time for SETEX in seconds + config :ttl, :validate => :number + + # Options for zrangebyscore + config :min_value, :validate => :number + config :max_value, :validate => :number # The hostname(s) of your Redis server(s). Ports may be specified on any # hostname, which will override the global port config. @@ -62,6 +69,8 @@ class LogStash::Filters::CacheRedis < LogStash::Filters::Base config :get, :validate => :string config :set, :validate => :string + + config :setex, :validate => :string config :exists, :validate => :string @@ -108,6 +117,9 @@ class LogStash::Filters::CacheRedis < LogStash::Filters::Base # config :get, :validate => :boolean, :default => false # O(N) config :lget, :validate => :string + + config :zrangebyscore, :validate => :string + public @@ -139,6 +151,10 @@ def filter(event) if @set @redis.set(event.get(@set), event.get(@source)) end + + if @setex + @redis.setex(event.get(@setex), event.get(@ttl), event.get(@source)) + end if @exists @redis.exists(event.get(@exists)) @@ -210,6 +226,12 @@ def filter(event) if @lget event.set(@target, @redis.lrange(event.get(@lget), 0, -1)) end + + if @zrangebyscore + @redis.zrangebyscore(event.get(@zrangebyscore), event.get(@min_value), event.get(@max_value), ":limit => [0, 1]")) + end + + rescue => e @logger.warn("Failed to send event to Redis, retrying after #{@reconnect_interval} seconds...", :event => event, From 7766bc2c51956673a03ce5598acaa4081684c447 Mon Sep 17 00:00:00 2001 From: Salmaan Pehlari Date: Mon, 12 Mar 2018 17:25:29 -0700 Subject: [PATCH 2/4] add SETEX and ZRANGEBYSCORE --- lib/logstash/filters/cache_redis.rb | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/lib/logstash/filters/cache_redis.rb b/lib/logstash/filters/cache_redis.rb index 6e5858b..5f82d43 100644 --- a/lib/logstash/filters/cache_redis.rb +++ b/lib/logstash/filters/cache_redis.rb @@ -21,11 +21,13 @@ class LogStash::Filters::CacheRedis < LogStash::Filters::Base config :cmd_key_is_formatted, :validate => :boolean, :default => false # expire time for SETEX in seconds - config :ttl, :validate => :number + config :expiration, :validate => :number, :default => 600 # Options for zrangebyscore - config :min_value, :validate => :number - config :max_value, :validate => :number + config :min_value, :validate => :string + config :max_value, :validate => :string + config :limit_min, :validate => :number, :default => 0 + config :limit_max, :validate => :number, :default => 1 # The hostname(s) of your Redis server(s). Ports may be specified on any # hostname, which will override the global port config. @@ -153,7 +155,9 @@ def filter(event) end if @setex - @redis.setex(event.get(@setex), event.get(@ttl), event.get(@source)) + #@redis.set(event.get(@setex), event.get(@source)) + #@redis.expire(event.get(@setex), @expiration) + @redis.setex(event.get(@setex), @expiration, event.get(@source)) end if @exists @@ -228,7 +232,10 @@ def filter(event) end if @zrangebyscore - @redis.zrangebyscore(event.get(@zrangebyscore), event.get(@min_value), event.get(@max_value), ":limit => [0, 1]")) + #@logger.warn((@redis.zrangebyscore(event.get(@zrangebyscore), event.get(@min_value), event.get(@max_value))).inspect) + #event.set(@target, @redis.zrangebyscore(event.get(@zrangebyscore), event.get(@min_value), event.get(@max_value))) + event.set(@target, @redis.zrangebyscore(@zrangebyscore, @min_value, @max_value, :limit => [@limit_min,@limit_max])) + #@logger.warn((@redis.zrangebyscore(@zrangebyscore, @min_value, "+inf", :limit => [0,1])).inspect) end From f721dacd06cc97c6bfc2163dba51da021a9889ba Mon Sep 17 00:00:00 2001 From: Salmaan Pehlari Date: Mon, 12 Mar 2018 18:03:17 -0700 Subject: [PATCH 3/4] cleanup --- lib/logstash/filters/cache_redis.rb | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/lib/logstash/filters/cache_redis.rb b/lib/logstash/filters/cache_redis.rb index 5f82d43..56b0614 100644 --- a/lib/logstash/filters/cache_redis.rb +++ b/lib/logstash/filters/cache_redis.rb @@ -23,11 +23,11 @@ class LogStash::Filters::CacheRedis < LogStash::Filters::Base # expire time for SETEX in seconds config :expiration, :validate => :number, :default => 600 - # Options for zrangebyscore + # Options for ZRANGEBYSCORE config :min_value, :validate => :string config :max_value, :validate => :string - config :limit_min, :validate => :number, :default => 0 - config :limit_max, :validate => :number, :default => 1 + config :offset, :validate => :number, :default => 0 + config :count, :validate => :number, :default => 1 # The hostname(s) of your Redis server(s). Ports may be specified on any # hostname, which will override the global port config. @@ -155,8 +155,6 @@ def filter(event) end if @setex - #@redis.set(event.get(@setex), event.get(@source)) - #@redis.expire(event.get(@setex), @expiration) @redis.setex(event.get(@setex), @expiration, event.get(@source)) end @@ -232,10 +230,7 @@ def filter(event) end if @zrangebyscore - #@logger.warn((@redis.zrangebyscore(event.get(@zrangebyscore), event.get(@min_value), event.get(@max_value))).inspect) - #event.set(@target, @redis.zrangebyscore(event.get(@zrangebyscore), event.get(@min_value), event.get(@max_value))) - event.set(@target, @redis.zrangebyscore(@zrangebyscore, @min_value, @max_value, :limit => [@limit_min,@limit_max])) - #@logger.warn((@redis.zrangebyscore(@zrangebyscore, @min_value, "+inf", :limit => [0,1])).inspect) + event.set(@target, @redis.zrangebyscore(@zrangebyscore, @min_value, @max_value, :limit => [@offset,@count])) end From 8f1518503647ab2360bffda177411e8a2950cb00 Mon Sep 17 00:00:00 2001 From: Salmaan Pehlari Date: Mon, 12 Mar 2018 19:49:37 -0700 Subject: [PATCH 4/4] changes to dynamic fields --- lib/logstash/filters/cache_redis.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/logstash/filters/cache_redis.rb b/lib/logstash/filters/cache_redis.rb index 56b0614..c153a47 100644 --- a/lib/logstash/filters/cache_redis.rb +++ b/lib/logstash/filters/cache_redis.rb @@ -25,7 +25,7 @@ class LogStash::Filters::CacheRedis < LogStash::Filters::Base # Options for ZRANGEBYSCORE config :min_value, :validate => :string - config :max_value, :validate => :string + config :max_value, :validate => :string, :default => "+inf" config :offset, :validate => :number, :default => 0 config :count, :validate => :number, :default => 1 @@ -230,7 +230,7 @@ def filter(event) end if @zrangebyscore - event.set(@target, @redis.zrangebyscore(@zrangebyscore, @min_value, @max_value, :limit => [@offset,@count])) + event.set(@target, @redis.zrangebyscore(event.get(@zrangebyscore), event.get(@min_value), event.get(@max_value), :limit => [@offset,@count])) end