Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated purge() and updateMetrics() LUA-scripts to be more deterministic #1501

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 25 additions & 26 deletions src/LuaScripts.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ public static function updateMetrics()
redis.call('sadd', KEYS[2], KEYS[1])

local hash = redis.call('hmget', KEYS[1], 'throughput', 'runtime')

local throughput = hash[1] + 1

local throughput = tonumber(hash[1]) or 0

throughput = throughput + 1

local runtime = 0

if hash[2] then
runtime = ((hash[1] * tonumber(hash[2])) + tonumber(ARGV[1])) / throughput
runtime = ((tonumber(hash[1]) * tonumber(hash[2])) + tonumber(ARGV[1])) / throughput
else
runtime = tonumber(ARGV[1])
end

redis.call('hmset', KEYS[1], 'throughput', throughput, 'runtime', runtime)
LUA;
}
Expand All @@ -50,29 +52,26 @@ public static function purge()
return <<<'LUA'

local count = 0
local cursor = 0
local batch_size = tonumber(100)
local start = tonumber(0)

repeat
-- Iterate over the recent jobs sorted set
local scanner = redis.call('zscan', KEYS[1], cursor)
cursor = scanner[1]

for i = 1, #scanner[2], 2 do
local jobid = scanner[2][i]
local hashkey = ARGV[1] .. jobid
local job = redis.call('hmget', hashkey, 'status', 'queue')

-- Delete the pending/reserved jobs, that match the queue
-- name, from the sorted sets as well as the job hash
if((job[1] == 'reserved' or job[1] == 'pending') and job[2] == ARGV[2]) then
redis.call('zrem', KEYS[1], jobid)
redis.call('zrem', KEYS[2], jobid)
redis.call('del', hashkey)
count = count + 1
end
-- Use ZRANGE to get a range of elements from the sorted set in a deterministic way
local jobs = redis.call('zrange', KEYS[1], start, start + batch_size - 1)

for i = 1, #jobs do
local jobid = jobs[i]
local hashkey = ARGV[1] .. jobid
local job = redis.call('hmget', hashkey, 'status', 'queue')

-- Delete the pending/reserved jobs that match the queue name
if ((job[1] == 'reserved' or job[1] == 'pending') and job[2] == ARGV[2]) then
redis.call('zrem', KEYS[1], jobid)
redis.call('zrem', KEYS[2], jobid)
redis.call('del', hashkey)
count = count + 1
end
until cursor == '0'

end
return count
LUA;
}
Expand Down