diff --git a/lib/logstash/filters/cache_redis.rb b/lib/logstash/filters/cache_redis.rb index 496fb5d..c153a47 100644 --- a/lib/logstash/filters/cache_redis.rb +++ b/lib/logstash/filters/cache_redis.rb @@ -19,6 +19,15 @@ class LogStash::Filters::CacheRedis < LogStash::Filters::Base # For now only working for rpushnx and llen! config :cmd_key_is_formatted, :validate => :boolean, :default => false + + # expire time for SETEX in seconds + config :expiration, :validate => :number, :default => 600 + + # Options for ZRANGEBYSCORE + config :min_value, :validate => :string + config :max_value, :validate => :string, :default => "+inf" + config :offset, :validate => :number, :default => 0 + config :count, :validate => :number, :default => 1 # The hostname(s) of your Redis server(s). Ports may be specified on any # hostname, which will override the global port config. @@ -62,6 +71,8 @@ class LogStash::Filters::CacheRedis < LogStash::Filters::Base config :get, :validate => :string config :set, :validate => :string + + config :setex, :validate => :string config :exists, :validate => :string @@ -108,6 +119,9 @@ class LogStash::Filters::CacheRedis < LogStash::Filters::Base # config :get, :validate => :boolean, :default => false # O(N) config :lget, :validate => :string + + config :zrangebyscore, :validate => :string + public @@ -139,6 +153,10 @@ def filter(event) if @set @redis.set(event.get(@set), event.get(@source)) end + + if @setex + @redis.setex(event.get(@setex), @expiration, event.get(@source)) + end if @exists @redis.exists(event.get(@exists)) @@ -210,6 +228,12 @@ def filter(event) if @lget event.set(@target, @redis.lrange(event.get(@lget), 0, -1)) end + + if @zrangebyscore + event.set(@target, @redis.zrangebyscore(event.get(@zrangebyscore), event.get(@min_value), event.get(@max_value), :limit => [@offset,@count])) + end + + rescue => e @logger.warn("Failed to send event to Redis, retrying after #{@reconnect_interval} seconds...", :event => event,