Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions queue/abstract.lua
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,10 @@ end

--- Release all session tasks.
local function release_session_tasks(session_uuid)
if box.info.ro then
return
end
Comment on lines +482 to +484
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens in this case:

  1. Instance -> RO.
  2. Disconnect the session.
  3. Instance -> RW.

?

It looks like the session will still be in the space.


local taken_tasks = box.space._queue_taken_2.index.uuid:select{session_uuid}

for _, task in pairs(taken_tasks) do
Expand All @@ -501,26 +505,25 @@ end

function method._on_consumer_disconnect()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

local conn_id = connection.id()
local consumers = box.space._queue_consumers

-- wakeup all waiters
while true do
local waiter = box.space._queue_consumers.index.pk:min{conn_id}
if waiter == nil then
break
end
-- Don't touch the other consumers
if waiter[1] ~= conn_id then
break
end
box.space._queue_consumers:delete{waiter[1], waiter[2]}
local cond = conds[waiter[2]]
for _, waiter in consumers.index.pk:pairs(conn_id, { iterator = 'EQ' }) do
local fid = waiter[2]
local cond = conds[fid]
if cond then
releasing_connections[waiter[2]] = true
cond:signal(waiter[2])
releasing_connections[fid] = true
cond:signal(fid)
end

if not box.info.ro then
consumers:delete{waiter[1], waiter[2]}
end
end

session.disconnect(conn_id)
if not box.info.ro then
session.disconnect(conn_id)
end
end

-- function takes tuples and recreates tube
Expand Down
136 changes: 136 additions & 0 deletions t/240-ro-on-disconnect.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#!/usr/bin/env tarantool

local log = require('log')
local tnt = require('t.tnt')
local test = require('tap').test('')
local fiber = require('fiber')
local queue = require('queue')

local qc = require('queue.compat')
if not qc.check_version({2, 4, 1}) then
log.info('Tests skipped, tarantool version < 2.4.1')
return
end

rawset(_G, 'queue', require('queue'))

local session = require('queue.abstract.queue_session')
local queue_state = require('queue.abstract.queue_state')

test:plan(3)

test:test('on_disconnect handler must be RO-safe', function(test)
test:plan(6)

tnt.cluster.cfg{}
test:ok(tnt.cluster.wait_replica(), 'wait for replica to connect')

queue.cfg{ttr = 0.5, in_replicaset = true}
local tube = queue.create_tube('test_ro_disc', 'fifo', {if_not_exists = true})
test:ok(tube, 'tube created')

local f = fiber.new(function()
queue.tube.test_ro_disc:take(3600)
end)
f:name('queue_waiter_fiber')

local ok = false
for _ = 1, 300 do
if box.space._queue_consumers:count() > 0 then
ok = true
break
end
fiber.sleep(0.01)
end
test:ok(ok, 'waiter registered in _queue_consumers')

box.cfg{read_only = true}
test:ok(box.info.ro, 'instance is RO')

local ok_call, err = pcall(queue._on_consumer_disconnect)
test:ok(ok_call, ('_on_consumer_disconnect() must not fail on RO, err = %s'):format(tostring(err)))

box.cfg{read_only = false}
test:ok(not box.info.ro, 'instance back to RW')
end)

test:test('release_session_tasks: RO-safe', function(test)
test:plan(10)

local tube = queue.create_tube('test_rel', 'fifo', {if_not_exists = true})
test:ok(tube, 'tube created')

-- Create a session + take a task so _queue_taken_2 has a record.
local client = tnt.cluster.connect_master()
test:ok(client.error == nil, 'client connected')

local session_uuid = client:call('queue.identify')
test:ok(session_uuid ~= nil, 'got session_uuid')

test:ok(queue.tube.test_rel:put('data'), 'put task')
local task = client:call('queue.tube.test_rel:take')
test:ok(task ~= nil, 'task taken')

local taken_before = box.space._queue_taken_2.index.uuid:select{session_uuid}
test:is(#taken_before, 1, '_queue_taken_2 has 1 record before')

box.cfg{read_only = true}
test:ok(queue_state.poll(queue_state.states.WAITING, 10), 'state WAITING')
test:ok(box.info.ro, 'instance is RO')

local ok_call, err = pcall(session._on_session_remove, session_uuid)
test:ok(ok_call, ('on_session_remove does not fail on RO, err=%s'):format(tostring(err)))

local taken_after = box.space._queue_taken_2.index.uuid:select{session_uuid}
test:is(#taken_after, 1, '_queue_taken_2 unchanged on RO')

-- Cleanup: back to RW.
box.cfg{read_only = false}
queue_state.poll(queue_state.states.RUNNING, 10)
client:close()
end)

test:test('release_session_tasks: works on RW', function(test)
test:plan(11)

box.cfg{read_only = false}
queue_state.poll(queue_state.states.RUNNING, 10)
test:ok(not box.info.ro, 'instance is RW')

queue.cfg{ttr = 0.5, in_replicaset = true}
local tube = queue.create_tube('test_rel2', 'fifo', {if_not_exists = true})
test:ok(tube, 'tube created')

local client = tnt.cluster.connect_master()
test:ok(client.error == nil, 'client connected')

local session_uuid = client:call('queue.identify')
test:ok(session_uuid ~= nil, 'got session_uuid')

test:ok(queue.tube.test_rel2:put('data2'), 'put task')
local task = client:call('queue.tube.test_rel2:take')
test:ok(task ~= nil, 'task taken')

local taken_before = box.space._queue_taken_2.index.uuid:select{session_uuid}
test:is(#taken_before, 1, '_queue_taken_2 has 1 record before')

-- Call on_session_remove callback directly on RW: must release task.
local ok_call, err = pcall(session._on_session_remove, session_uuid)
test:ok(ok_call, ('on_session_remove ok on RW, err=%s'):format(tostring(err)))

-- taken record must be removed
local taken_after = box.space._queue_taken_2.index.uuid:select{session_uuid}
test:is(#taken_after, 0, '_queue_taken_2 record removed')

-- task must become READY again and be takeable
local task2 = client:call('queue.tube.test_rel2:take', {0})
test:ok(task2 ~= nil, 'task is takeable again after release')
test:is(task2[3], 'data2', 'task data preserved')

client:close()
end)

rawset(_G, 'queue', nil)
tnt.finish()
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :
Loading