From e6e3fa45ebe0fd3052f6d4b12956a79022c23a6b Mon Sep 17 00:00:00 2001 From: Jon Calvert Date: Tue, 5 Jan 2016 11:20:43 -0500 Subject: [PATCH 1/2] Add a failing, somewhat contrived spec to demonstrate EXISTS keys being orphaned. We share redis clients between threads (redis obj is threadsafe) and because of this the 2nd thread will be blocking, waiting for the AVAILABLE key and thus the first thread is unable to finish cleanup. Eventually the AVAILABLE key expires but the EXISTS key does not. The failure to succeed with the signal() call could be due to loss of network connection, process crash, etc. Leaving an unexpiring EXISTS key but no AVAILABLE key means that a retry will believe the semaphore exists and wait for the AVAILABLE which never comes. --- spec/semaphore_spec.rb | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/spec/semaphore_spec.rb b/spec/semaphore_spec.rb index 4d7e0b1..6e708c1 100644 --- a/spec/semaphore_spec.rb +++ b/spec/semaphore_spec.rb @@ -160,6 +160,31 @@ sleep 3.0 expect(@redis.keys.count).to eq(original_key_size) end + + it "does not leave an exists key without an expiration when an expiration is specified" do + original_key_size = @redis.keys.count + queue = Queue.new + threads = 2.times.collect do + Thread.new do + Redis::Semaphore.new(:my_semaphore, :redis => @redis, :expiration => 3).lock(5) do + sleep 1 + end + end + end + sleep 4.0 + @redis2 = Redis.new :db => 15 + threads.each{|t| t.kill } #ensure signal step fails + expect(@redis2.ttl("SEMAPHORE:my_semaphore:EXISTS").to_s).to_not eql("-1") + sleep 4.0 #allow blpop timeout to occur + thrd = Thread.new do + Redis::Semaphore.new(:my_semaphore, :redis => @redis2, :expiration => 3).lock(5) do + queue << "work" + end + end + thrd.join(3) + expect(queue.size).to eql(1) + expect(@redis.ttl("SEMAPHORE:my_semaphore:EXISTS").to_s).to_not eql("-1") + end end describe "semaphore without staleness checking" do From 6ac21c3b2fc4eb39cea19669583b2bb24b5d1117 Mon Sep 17 00:00:00 2001 From: Jon Calvert Date: Tue, 5 Jan 2016 13:14:35 -0500 Subject: [PATCH 2/2] Remove the extraneous expire during create, since the subsequent persist will obviate it anyway and since keys can expire during a MULTI transaction anyway. Return the removed statement where we always reset the key expiration prior to attempting to obtain a lock. This prevents the orphaned EXISTS key that never expires. --- lib/redis/semaphore.rb | 5 +---- spec/semaphore_spec.rb | 20 +++++++++++--------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/lib/redis/semaphore.rb b/lib/redis/semaphore.rb index 5f7c639..11eaf62 100644 --- a/lib/redis/semaphore.rb +++ b/lib/redis/semaphore.rb @@ -36,7 +36,7 @@ def exists_or_create! if token == API_VERSION && @redis.get(version_key).nil? @redis.set(version_key, API_VERSION) end - + set_expiration_if_necessary true end end @@ -163,8 +163,6 @@ def simple_mutex(key_name, expires = nil) end def create! - @redis.expire(exists_key, 10) - @redis.multi do @redis.del(grabbed_key) @redis.del(available_key) @@ -173,7 +171,6 @@ def create! end @redis.set(version_key, API_VERSION) @redis.persist(exists_key) - set_expiration_if_necessary end end diff --git a/spec/semaphore_spec.rb b/spec/semaphore_spec.rb index 6e708c1..3a36771 100644 --- a/spec/semaphore_spec.rb +++ b/spec/semaphore_spec.rb @@ -161,23 +161,25 @@ expect(@redis.keys.count).to eq(original_key_size) end - it "does not leave an exists key without an expiration when an expiration is specified" do - original_key_size = @redis.keys.count + it "does not leave a key without expiration if expiration given" do queue = Queue.new - threads = 2.times.collect do + threads = Array.new(2) do Thread.new do - Redis::Semaphore.new(:my_semaphore, :redis => @redis, :expiration => 3).lock(5) do + opts = { redis: @redis, expiration: 3 } + Redis::Semaphore.new(:my_semaphore, opts).lock(5) do sleep 1 end end end sleep 4.0 - @redis2 = Redis.new :db => 15 - threads.each{|t| t.kill } #ensure signal step fails - expect(@redis2.ttl("SEMAPHORE:my_semaphore:EXISTS").to_s).to_not eql("-1") - sleep 4.0 #allow blpop timeout to occur + @redis2 = Redis.new db: 15 + threads.each(&:kill) # ensure signal step fails + exist_key = @redis2.ttl("SEMAPHORE:my_semaphore:EXISTS").to_s + expect(exist_key).to_not eql("-1") + sleep 4.0 # allow blpop timeout to occur thrd = Thread.new do - Redis::Semaphore.new(:my_semaphore, :redis => @redis2, :expiration => 3).lock(5) do + opts = { redis: @redis2, expiration: 3 } + Redis::Semaphore.new(:my_semaphore, opts).lock(5) do queue << "work" end end