Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocked queries #52

Open
vschiavoni opened this issue Nov 13, 2015 · 3 comments
Open

Blocked queries #52

vschiavoni opened this issue Nov 13, 2015 · 3 comments

Comments

@vschiavoni
Copy link
Member

There are scenarios where the concurrency level allowed by the Splay runtime is not sufficient.
One of them is the execution of the following protocol. It consists of the T-Kad protocol, gossip-based construction of the KAD DHT.
The problematic scenario occurs when deployed over a cluster of 600 splayds and using exactly 600 nodes. Each of the nodes issue 500 queries more or less concurrently.

The attached plot gantt.pdf shows that queries (on the y-axis) get slower and slower (longer blue bars on the x-axis).

We might need a simpler test case to identify and possibly optimise the runtime.

-------------------------------------------------------------------------------
-- modules
-------------------------------------------------------------------------------

require"splay.base"
rpc = require"splay.rpc"
bit = require"bit"
rpc.l_o.level=1
misc = require "splay.misc"
crypto = require "crypto"

-- addition to allow local run
PARAMS={}
local cmd_line_args=nil
if not job then --outside the sandbox
    if #arg < 2 then  
        print("lua ", arg[0], " my_position nb_nodes")  
        os.exit()  
    else        
        local pos, total = tonumber(arg[1]), tonumber(arg[2])  
        local utils = require("splay.utils")
        job = utils.generate_job(pos, total, 20001) 
        cmd_line_args=arg[3]    
    end
end

if arg~=nil then
    if cmd_line_args==nil then cmd_line_args=arg[1] end
    if cmd_line_args~=nil and cmd_line_args~="" then
        print("ARGS: ",cmd_line_args)   
        for _,v in pairs(misc.split(cmd_line_args,":")) do
            local t=misc.split(v,"=")
            PARAMS[t[1]]=t[2]
        end
    end
end


rpc.server(job.me.port)

-------------------------------------------------------------------------------
-- current node
-------------------------------------------------------------------------------
-- 31 bit is currently the maximal id space: BitOp library provides operations only in the range of signed 32 bit numbers
bits = 31
function compute_id(o) return string.sub(crypto.evp.new("sha1"):digest(o), 1, bits/ 4) end

me = {}
me.peer = job.me
me.age = 0
me.id = compute_id(job.me.ip..job.me.port)

function num(k)
    if k.id then return tonumber("0x"..k.id) else return tonumber("0x"..k) end
end

-------------------------------------------------------------------------------
-- parameters
-------------------------------------------------------------------------------

max_time = 800

--T-KAD params
TKAD_MESSAGE = tonumber(PARAMS["TKAD_MESSAGE"]) or 10
TKAD_VIEW = tonumber(PARAMS["TKAD_VIEW"]) or 10
GOSSIP_TIME = tonumber(PARAMS["GOSSIP_TIME"]) or 7
K_SIZE = tonumber(PARAMS["K_SIZE"]) or 3
TKAD_CONVERGE = PARAMS["TKAD_CONVERGE"] or false
TKAD_LOOKUP = PARAMS["TKAD_LOOKUP"] or true
PARTNER_SELECT = tonumber(PARAMS["PARTNER_SELECT"]) or 0

--PSS params
PSS_VIEW_SIZE =tonumber(PARAMS["PSS_VIEW_SIZE"]) or 10
PSS_SHUFFLE_SIZE =  tonumber(PARAMS["PSS_SHUFFLE_SIZE"]) or math.floor(PSS_VIEW_SIZE / 2 + 0.5)
PSS_SHUFFLE_PERIOD = tonumber(PARAMS["PSS_SHUFFLE_PERIOD"]) or 10

bytesSent = 0
bytesReceived = 0


-- ############################################################################
--  Peer Sampling Service
-- ############################################################################

PSS = {

    view = {},
    view_copy = {},
    c = PSS_VIEW_SIZE,
    exch = PSS_SHUFFLE_SIZE,
    S = math.floor(PSS_VIEW_SIZE/ 2 + 0.5),
    H = 0,
    SEL = "rand", -- could also be "tail"
    view_copy_lock = events.lock(),

    -- utilities
    print_table = function(t)
        print("[ (size "..#t..")")
        for i=1,#t do
            print("  "..i.." : ".."["..t[i].peer.ip..":"..t[i].peer.port.."] - age: "..t[i].age.." - id: "..t[i].id)
        end
        print("]")
    end,

    set_of_peers_to_string = function(v)
        ret = ""; for i=1,#v do ret = ret..v[i].id.." " end
        return ret
    end,

    print_set_of_peers = function(v,message)    
        if message then log:print(message) end
        log:print(PSS.set_of_peers_to_string(v))
    end,

    print_view = function(message)
        if message then log:print(message) end
        log:print("PSS VIEW_CONTENT "..job.position.." "..PSS.set_of_peers_to_string(PSS.view))
    end,

    -- peer sampling functions

    pss_selectPartner= function()
        if #PSS.view > 0 then
            if PSS.SEL == "rand" then return math.random(#PSS.view) end
            if PSS.SEL == "tail" then
                local ret_ind = -1 ; local ret_age = -1
                for i,p in pairs(PSS.view) do
                    if (p.age > ret_age) then ret_ind = i;ret_age=p.age end
                end
                assert (not (ret_ind == -1))
                return ret_ind
            end
        else
            return false
        end
    end,

    same_peer_but_different_ages = function(a,b)
        return a.peer.ip == b.peer.ip and a.peer.port == b.peer.port
    end,

    same_peer = function(a,b)
        return PSS.same_peer_but_different_ages(a,b) and a.age == b.age
    end,

    pss_selectToSend = function()
        -- create a new return buffer
        local toSend = {}
        -- append the local node view age 0
        table.insert(toSend,{peer={ip=job.me.ip,port=job.me.port},age=0,id=me.id})
        -- shuffle view
        PSS.view = misc.shuffle(PSS.view)
        -- move oldest H items to the end of the view
        --- 1. copy the view
        local tmp_view = misc.dup(PSS.view)
        --- 2. sort the items based on the age
        table.sort(tmp_view,function(a,b) return a.age < b.age end)
        --- 3. get the H largest aged elements from the tmp_view, remove them from the view 
        ---    (we assume there are no duplicates in the view at this point!)
        ---    and put them at the end of the view
        for i=(#tmp_view-PSS.H+1),#tmp_view do
            local ind = -1
            for j=1,#PSS.view do
                if PSS.same_peer(tmp_view[i],PSS.view[j]) then ind=j; break end
            end
            assert (not (ind == -1))
            elem = table.remove(PSS.view,ind)
            PSS.view[#PSS.view+1] = elem
        end

        -- append the first exch-1 elements of view to toSend
        for i=1,(PSS.exch-1) do
            toSend[#toSend+1]=PSS.view[i]
        end     

        return toSend
    end,

    pss_selectToKeep = function(received)
        local selectToKeepStart= misc.time()    
        -- concatenate the view and the received set of view items
        for j=1,#received do PSS.view[#PSS.view+1] = received[j] end

        -- remove duplicates from view
        -- note that we can't rely on sorting the table as we need its order later
        local i = 1 
        while i < #PSS.view-1 do
            for j=i+1,#PSS.view do
                if PSS.same_peer_but_different_ages(PSS.view[i],PSS.view[j]) then
                    -- delete the oldest
                    if PSS.view[i].age < PSS.view[j].age then 
                        table.remove(PSS.view,j)
                    else
                        table.remove(PSS.view,i)
                    end
                    i = i - 1 -- we need to retest for i in case there is one more duplicate
                    break
                end
            end
            i = i + 1
        end

        -- remove the min(H,#view-c) oldest items from view
        local o = math.min(PSS.H,#PSS.view-PSS.c)
        while o > 0 do
            -- brute force -- remove the oldest
            local oldest_index = -1
            local oldest_age = -1
            for i=1,#PSS.view do 
                if oldest_age < PSS.view[i].age then
                    oldest_age = PSS.view[i].age
                    oldest_index = i
                end
            end
            assert (not (oldest_index == -1))
            table.remove(PSS.view,oldest_index)
            o = o - 1
        end

        -- remove the min(S,#view-c) head items from view
        o = math.min(PSS.S,#PSS.view-PSS.c)
        while o > 0 do
            table.remove(PSS.view,1) -- not optimal
            o = o - 1
        end

        -- in the case there still are too many peers in the view, remove at random
        while #PSS.view > PSS.c do table.remove(PSS.view,math.random(#PSS.view)) end

        assert (#PSS.view <= PSS.c)
        --log:print("PSS_SELECT_TO_KEEP ", ( misc.time() - selectToKeepStart ) )        
    end,

    ongoing_at_rpc=false,

    is_init = false,

    pss_passive_thread = function(from,buffer)
        if PSS.ongoing_at_rpc or not PSS.is_init then
            return false
        end

        --PSS.print_view("passive_thread ("..job.position.."): entering")
        --PSS.print_set_of_peers(buffer,"passive_thread ("..job.position.."): received from "..from)
        local ret = PSS.pss_selectToSend()
        PSS.pss_selectToKeep(buffer)
        --PSS.print_view("passive_thread ("..job.position.."): after selectToKeep")
        return ret
    end,

    pss_send_at_rpc = function(peer,pos,buf)
        local ok, r = rpc.acall(peer,{"PSS.pss_passive_thread", pos, buf},PSS_SHUFFLE_PERIOD/2)
        return ok,r
    end,

    pss_active_thread = function()
        PSS.ongoing_at_rpc=true
        -- select a partner
        local exchange_aborted=true
        local exchange_retry=2
        for i=1,exchange_retry do --up to 2 attemps per round, re-do in case of conflict 
            partner_ind = PSS.pss_selectPartner()
            if not partner_ind then
                log:print("pss_active_thread: pss view is empty, no partner can be selected")
                return
            end
            partner = PSS.view[partner_ind]
            -- remove the partner from the view
            table.remove(PSS.view,partner_ind)
            -- select what to send to the partner
            buffer = PSS.pss_selectToSend()
            --PSS.print_set_of_peers(buffer,"active_thread ("..job.position.."): sending to "..partner.id)

            -- send to the partner
            local rpcStart=misc.time()
            local ok, r = PSS.pss_send_at_rpc(partner.peer,job.position, buffer) -- rpc.acall(partner.peer,{"PSS.pss_passive_thread", job.position, buffer},PSS_SHUFFLE_PERIOD/2)
            --log:print("PSS.pss_passive_thread.RPC ",  misc.time() - rpcStart  )

            if ok then
                -- select what to keep etc.
                local received = r[1]
                if received==false then
                    log:print("PSS received false due to ongoing RPC or yet uninitialized view, will try again in a short while")
                    events.sleep(math.random()) 
                    --the call was aborted due to pending RPC at peer's node
                else
                    exchange_aborted=false 
                    --PSS.print_set_of_peers(received,"active_thread ("..job.position.."): received from "..partner.id)
                    PSS.pss_selectToKeep(received)
                    --PSS.print_view("active_thread ("..job.position.."): after selectToKeep")
                end
            else
                -- peer not replying? remove it from view!
                log:print("on peer ("..job.position..") peer "..partner.id.." did not respond -- removing it from the view")
                log:warning("PSS.pss_passive_thread RPC error:", r)
                table.remove(PSS.view,partner_ind)
            end     
            if exchange_aborted==false then break end
        end

        PSS.view_copy_lock:lock()
        local viewCopyLock = misc.time()
        PSS.view_copy = misc.dup(PSS.view)
        --log:print("PSS_VIEW_COPY_LOCK_HELD ", ( misc.time() - viewCopyLock ) )
        PSS.view_copy_lock:unlock()
        for _,v in ipairs(PSS.view) do
                v.age = v.age+1
        end
        -- now, allow to have an incoming passive thread request
        PSS.ongoing_at_rpc=false
    end,

    -- API
    pss_getPeer = function()
        PSS.view_copy_lock:lock()
        local getPeerLockHeldStart = misc.time()

        local peer = PSS.view_copy[math.random(#PSS.view_copy)] 

        --log:print("PSS_GET_PEER_LOCK_HELD_VIEW_COPY ", ( misc.time() - getPeerLockHeldStart ) )
        PSS.view_copy_lock:unlock()

        return peer
    end,

    pss_init = function()
        -- ideally, would perform a random walk on an existing overlay
        -- but here we emerge from the void, so let's use the Splay provided peers.
        -- Ages are taken randomly in [0..c] but could be 0 as well.
        local indexes = {}
        for i=1,#job.nodes do indexes[#indexes+1]=i end
        table.remove(indexes,job.position) --remove myself
        local selected_indexes = misc.random_pick(indexes,math.min(PSS.c,#indexes)) 
        for _,v in ipairs(selected_indexes) do
                local a_peer = job.nodes[v]
                local hashed_index =  compute_id(a_peer.ip..a_peer.port)
                PSS.view[#PSS.view+1] = 
                {peer=a_peer,age=math.random(PSS.c),id=hashed_index}
        end
        PSS.view_copy = misc.dup(PSS.view)
        PSS.is_init = true
        assert (#PSS.view == math.min(PSS.c,#indexes))
        --PSS.print_view("PSS initial view")
    end,

    log_view = function()
        -- called once to log the view
        events.sleep(10.5*PSS_SHUFFLE_PERIOD)
        log:print("VIEW_CONTENT "..job.position.." "..PSS.set_of_peers_to_string(PSS.view))
    end,

}

-- ############################################################################
--  T-KAD
-- ############################################################################

TKAD = {
    view = {},
    routing_table = {},
    v = TKAD_VIEW,
    k = K_SIZE,
    m = TKAD_MESSAGE,
    view_lock = events.lock(),
    rt_lock = events.lock(),
    cycle = 0,
    c_lock = events.lock(),
    ideal_rt = {},
    keys = {},
    responsible = {},
    opt_links = 0,
    mand_links = 0,


-------------------------------------------------------------------------------
-- debug
-------------------------------------------------------------------------------

    display_view = function(v, which)
        local display = table.concat({which,"\n"})
        for i,w in ipairs(v) do
            display = table.concat({display, " ",num(w)})
        end
        log:print(display.."\n")
    end,


    display_rt = function()
        log:print("ROUTING TABLE:", num(me.id))
        for i = 0, bits do
            if TKAD.routing_table[i] and #TKAD.routing_table[i] > 0 then
                local out = ""
                for j,v in ipairs(TKAD.routing_table[i]) do
                    out = table.concat({out,num(v.id)," | "})
                end
                log:print(i, out)
            end
        end
    end,

    debug = function(c)
        log:print("TKAD cycle:", c)
        log:print(TKAD.display_rt())
    end,

-------------------------------------------------------------------------------
-- utilities
-------------------------------------------------------------------------------

    remove_dup = function(set)
        for i,v in ipairs(set) do
            local j = i+1
            while(j <= #set and #set > 0) do
                if v.id == set[j].id then
                    table.remove(set,j)
                else j = j + 1
                end
            end
        end
    end,

    --keep n first elelements from t
    keep_n = function(t,n)
        for i = #t, n+1, -1 do
            table.remove(t,i)
        end
    end,

    same_node = function(n1,n2)
        local peer_first
        if n1.peer then peer_first = n1.peer else peer_first = n1 end
        local peer_second
        if n2.peer then peer_second = n2.peer else peer_second = n2 end
        return peer_first.port == peer_second.port and peer_first.ip == peer_second.ip
    end,

    remove_node  = function(t, node)
        local j = 1
        for i = 1, #t do
            if TKAD.same_node(t[j],node) then table.remove(t, j)
            else j = j+1 end
        end
    end,

    --flatten a two-dimensional array
    flatten = function(t)
        result = {}
        for i,v in ipairs(t) do
            for j,w in ipairs(v) do
                result[#result+1] = w
            end
        end
        return result
    end,

    -- computes the diff between two ids based on the number of bits in which they differ
    xor_diff = function(n,m)
        local xor_result = bit.bxor(n,m)
        local diff = 0
        while xor_result > 0 do
            diff = diff + xor_result%2
            --arithmetic right shift
            xor_result = bit.arshift(xor_result,1)
        end
        return diff
    end,

    --ranks nodes according to the number of differing bits in their IDs;
    --used for selecting TKAD peer and creating TKAD message
    xor_rank = function(set, partner)
        table.sort(set, function (a,b) return TKAD.xor_diff(num(a), num(partner)) < TKAD.xor_diff(num(b), num(partner)) end)
    end,


    xor_rank_pure = function(set, partner)
        table.sort(set, function (a,b) return bit.bxor(num(a), num(partner)) < bit.bxor(num(b), num(partner)) end)
    end,

    already_in = function(t,n)
        for i,v in ipairs(t) do
            if v.id == n.id then return i end
        end
        return -1
    end,

    --computes the number of the k-bucket for the given node
    bucket_num = function(p)
        local n = bit.bxor(num(p), num(me))
        local counter = 0   
        while n ~= 0 do
        --logical rightshift
            n = bit.rshift(n, 1)
            counter = counter+1 
        end
        return bits-counter
    end,

    latency = function(n)
        local latency = rpc.ping(n.peer)
        if latency then return latency end
    end,


    remove_failed_node = function(node)
        TKAD.view_lock:lock()
        TKAD.remove_node(TKAD.view, node)
        TKAD.view_lock:unlock() 
        TKAD.rt_lock:lock()
        TKAD.remove_node(TKAD.routing_table, node)
        TKAD.rt_lock:unlock()

    end,

-------------------------------------------------------------------------------
-- Convergence
-------------------------------------------------------------------------------

    hash_all = function()
        local ids = {}
        for i,v in ipairs(job.nodes) do
            if not TKAD.same_node(v,me) then
                local hashed_index = compute_id(v.ip..v.port)
                ids[#ids+1] = hashed_index
            end
        end
        return ids
    end,

    precompute_routing_table = function()
        local entries = {}
        if TKAD_CONVERGE then 
            for i = 0, bits do TKAD.ideal_rt[i] = {} end
            local ids = TKAD.hash_all()
            for i,v in ipairs(ids) do
                local buck = TKAD.bucket_num(v)
                if entries[buck] then entries[buck] = entries[buck]+1
                else entries[buck] = 1 end
                --print(v.id, TKAD.bit_out(v), buck) 
                table.insert(TKAD.ideal_rt[buck], v)        
            end
            for i,v in pairs(entries) do
                if v == 1 then TKAD.mand_links = TKAD.mand_links+1
                else
                    if v > TKAD.k then
                        TKAD.mand_links = TKAD.mand_links+1
                        TKAD.opt_links = TKAD.opt_links+TKAD.k-1
                    else
                        TKAD.mand_links = TKAD.mand_links+1
                        TKAD.opt_links = TKAD.opt_links+v-1
                    end
                end
        end
        --TKAD.display_ideal_rt()
        log:print("COMPLETE_VIEW_STATE "..me.id.." mandatory ".. TKAD.mand_links.." optional "..TKAD.opt_links)
        end
    end,

    display_ideal_rt = function()
        log:print("IDEAL ROUTING TABLE ", num(me.id))
        for i = 0, bits do
            if TKAD.ideal_rt[i] and #TKAD.ideal_rt[i] > 0 then
                local out = ""
                for j,v in ipairs(TKAD.ideal_rt[i]) do
                    out = out..num(v).." | "
                end
                log:print(i, out)
            end
        end
    end,


-- Every node must know at least one other node in each of its non-empty subtrees
    check_convergence = function(c,thread)
        if TKAD_CONVERGE then
            local opt_entries, mand_entries = 0,0
            for i = 0, #TKAD.routing_table do
                if TKAD.routing_table[i] and #TKAD.routing_table[i]>0 then
                    mand_entries = mand_entries+1
                    if #TKAD.routing_table[i]>1 then
                        opt_entries = opt_entries+#TKAD.routing_table[i]-1
                    end
                end
            end
            log:print(table.concat({thread," TMAN_cycle ",c," CURRENT_VIEW_STATE ", me.id, " mandatory ",mand_entries/(TKAD.mand_links/100), " optional ",opt_entries/(TKAD.opt_links/100)," bytes_sent ", bytesSent," bytes_received ", bytesReceived}))
            log:print("opt_entries", opt_entries, "mand_entries", mand_entries)
        end
    end,



-------------------------------------------------------------------------------
-- Lookup and routing efficiency
-- alfa = 1 (number of find_node RPCs sent asynchronously)
-------------------------------------------------------------------------------
--return the index of the first node in the results that has not been queried yet, if no such node found returns -1
    already_sent = function(results, sent_set)
        for i,v in ipairs(results) do
            local sent = false
            for _, w in ipairs(sent_set) do
                if v.peer == w.peer then sent = true break end
            end
            if not sent then return i end
        end
        return -1
    end,

--updates the results list by merging it with set and keeping k closest nodes
    update_results = function(results, set, key)
        local updated = misc.merge(results, set)
        TKAD.remove_dup(updated)
        TKAD.xor_rank_pure(updated, key)
        TKAD.keep_n(updated, TKAD.k)
        return updated
    end,

--selects k closest nodes
    k_closest_bucket_nodes = function(key)
        local results, buck = {}, TKAD.bucket_num(key)
        for i = #TKAD.routing_table, buck, -1 do
            if #TKAD.routing_table[i] > 0 then results = misc.merge(results, TKAD.routing_table[i]) end
        end
        if #results == TKAD.k then return results
        elseif #results > TKAD.k then local set = {} results = TKAD.update_results(results, set, key) return results end
        buck = buck-1   
        while buck >= 0 do
            if #TKAD.routing_table[buck] > 0 then results = misc.merge(results, TKAD.routing_table[buck]) end
            buck = buck-1
            if #results == TKAD.k then return results
            elseif #results > TKAD.k then local set = {} results = TKAD.update_results(results, set, key) return results end        
        end
        return results
    end,


    callback =function(signal, results)
        events.fire(signal,results)
    end,


    find_node = function(signal, me, key)
        events.thread(function()
        local closest = TKAD.k_closest_bucket_nodes(key)
        rpc.call(me.peer, {'TKAD.callback', signal, closest})
        end)
    end,

    find = function(key)
        --the first nodes to contact are k nodes from the closest k-bucket
        results = TKAD.k_closest_bucket_nodes(key)
        local hops = 0
        local start_time = tostring(misc.time())
        local sent = {}
        while true do
            local index = TKAD.already_sent(results, sent)
            if  index == -1 then return results, hops, start_time 
            else 
                local current = results[index]
                --display_view(results, "results before update from: "..num(current).."("..bit_out(current.id).."):")
                local signal = current.id..key
                sent[#sent+1] = current
                rpc.call(current.peer, {'TKAD.find_node', signal, me, key})
                local p_results = events.wait(signal)
                --display_view(p_results, "p_results from node: "..num(current).."("..bit_out(current.id).."):")
                results = TKAD.update_results(results, p_results, key)
                --display_view(results, "results after update from: "..num(current).."("..bit_out(current.id).."):")
                hops = hops + 1
            end
        end
    end,

    lookup = function(key)
        log:print("LOOKUP_START ".. num(key).."("..key ..")")
        local nearest_k, hops, start_time = TKAD.find(key)
        local elapsed = misc.time()-start_time
        log:print("LOOKUP_END")
        local correct = TKAD.check_correctness(key, nearest_k)
        local res=nil
        if correct then res = "CORRECT_FOUND " else res = "CORRECT_NOT_FOUND " end
        local msg = res.."LOOKUP KEY "..num(key).." HOPS "..hops.." DELAY "..elapsed.." RESPONSIBLE: "
        for i,v in ipairs(nearest_k) do
            msg = msg..table.concat({" ",num(v)})
        end
        log:print(msg)
    end,

-------------------------------------------------------------------------------
-- Routing correctness
-------------------------------------------------------------------------------

    precompute_resp_nodes = function()
        local ids = TKAD.hash_all()
        ids[#ids+1] = me.id
        for i,k in ipairs(TKAD.keys) do
            TKAD.xor_rank_pure(ids, k)
            TKAD.responsible[k] = {}
            --local out="resp for "..k.." with bucket_num "..TKAD.bucket_num(k)..": " 
            for i=1, TKAD.k do
                table.insert(TKAD.responsible[k],ids[i])
--              out=out.." "..num(ids[i])
            end
--          log:print(out)
        end
    end,

    check_correctness = function(key,closest)
        for i,v in ipairs(closest) do
            --log:print("closest", i, num(v))
            local match = false
            for j,w in ipairs(TKAD.responsible[key]) do
                if num(v) == num(w) then match = true end
            end
            if not match then return false end
        end
        return true
    end,

-------------------------------------------------------------------------------
-- TMAN
-------------------------------------------------------------------------------

    init = function()
        for i = 0, bits - 1 do TKAD.routing_table[i] = {} end
        TKAD.view = misc.random_pick(PSS.view, TKAD.v)
        TKAD.check_convergence(TKAD.cycle,"TKAD.init")
    end,

    -- ranks view according to xor_diff from self and selects a random node from the first m nodes
    select_peer = function()
        return misc.random_pick(TKAD.view)
    end,

    -- select peer from pss view, different from self
    select_peer_pss = function()
        PSS.view_copy_lock:lock()
        local i 
        repeat i = math.random(#PSS.view_copy)
        until not TKAD.same_node(PSS.view_copy[i], me)
        local partner = PSS.view_copy[i]
        PSS.view_copy_lock:unlock()
        return partner
    end,


    create_message = function(partner)
        TKAD.view_lock:lock()
        local buffer = misc.dup(TKAD.view)
        TKAD.view_lock:unlock()
        TKAD.rt_lock:lock()
        buffer = misc.merge(buffer,PSS.view, TKAD.flatten(TKAD.routing_table))
        TKAD.rt_lock:unlock()
        buffer[#buffer+1] = me
        TKAD.remove_dup(buffer)
        TKAD.remove_node(buffer, partner)
        return buffer
    end,

    --merges TKAD view with the received message
    update_view = function(received)
        TKAD.view_lock:lock()
        TKAD.view = misc.merge(TKAD.view, received)
        TKAD.remove_dup(TKAD.view)
        TKAD.xor_rank_pure(TKAD.view,me)
        --TKAD.xor_rank(TKAD.view,me)
        TKAD.keep_n(TKAD.view, TKAD.v)
        TKAD.view_lock:unlock()
    end,

    -- add nodes from the received to the corresponding k-buckets
    update_prefix_table = function(received)
        TKAD.rt_lock:lock()
        for i,v in ipairs(received) do
            local buck = TKAD.bucket_num(v)
            if not TKAD.routing_table[buck] then
                TKAD.routing_table[buck] = {}
                table.insert(TKAD.routing_table[buck], v)
            else
                if #TKAD.routing_table[buck] < TKAD.k then --bucket is not full
                --check if this element is already in the bucket: yes - move it to the tail, no - add it to the tail
                    local index = TKAD.already_in(TKAD.routing_table[buck], v)
                    if index > 0 then 
                    table.insert(TKAD.routing_table[buck], table.remove(TKAD.routing_table[buck], index))
                    else table.insert(TKAD.routing_table[buck], v) end          
                else --bucket full: ping the element at the head, if there is response - move to the tail; no response - evict and push the new element to the tail
                    if TKAD.latency(TKAD.routing_table[buck][1]) then 
                        table.insert(TKAD.routing_table[buck], table.remove(TKAD.routing_table[buck], 1))
                    else
                        table.remove(TKAD.routing_table[buck], 1)
                        table.insert(TKAD.routing_table[buck], v)
                    end
                end
            end
        end
        TKAD.rt_lock:unlock()
    end,

    passive_thread = function(received,sender)
        local buffer = TKAD.create_message(sender)
        TKAD.update_view(received)
        TKAD.update_prefix_table(received)
        return buffer
    end,

    select_partner = function(partner_select_code, loc_cycle)
        if partner_select_code == 0 then --alternating
            if loc_cycle <= 2 then return TKAD.select_peer_pss()
            else 
                if loc_cycle%2==0 then return TKAD.select_peer_pss() else return TKAD.select_peer() end
            end
        elseif partner_select_code == 1 then --only pss
            return TKAD.select_peer_pss()
        elseif partner_select_code == 2 then --only view
            return TKAD.select_peer()
        end 
    end,

    active_thread = function()
        TKAD.c_lock:lock()
        local loc_cycle = TKAD.cycle + 1
        TKAD.cycle = TKAD.cycle + 1
        TKAD.c_lock:unlock()        
        local partner = TKAD.select_partner(PARTNER_SELECT,loc_cycle)
        local buffer = TKAD.create_message(partner)
        local try = 0
        local ok, res = rpc.acall(partner.peer, {'TKAD.passive_thread', buffer, me})
        while not ok do
            try = try + 1
            if try <= 3 then
                log:print("TKAD active thread: no response from:"..partner.id.. ": "..tostring(res).." => try again")
                events.sleep(math.random(try * 30, try * 60))
                ok, res = rpc.acall(partner.peer, {'TKAD.passive_thread', buffer, me})
            else
                log:print("TKAD active thread: no response from:"..partner.id..": "..tostring(res).."  => removing it from view")
                TKAD.remove_failed_node(partner)
                break
            end
        end
        if ok then
            local received = res[1]
            TKAD.update_view(received)
            TKAD.update_prefix_table(received)
            --resource_stats()
            TKAD.check_convergence(loc_cycle,"TKAD.active_thread")
            --TKAD.debug(loc_cycle)
        end
    end,    
    }


-------------------------------------------------------------------------------
-- Main loop
-------------------------------------------------------------------------------


function resource_stats()
    --log:print("MEMORY_USED_Kb ", gcinfo())    
    local ts,tr = socket.stats()
    local tot_KB_sent=misc.bitcalc(ts).kilobytes
    local tot_KB_recv=misc.bitcalc(tr).kilobytes
    --log:print("BANDWIDTH_TOTAL ",tot_KB_sent, tot_KB_recv)
    --log:print("BANDWIDTH_RATE  ", (tot_KB_sent - bytesSent )/STATS_PERIOD, (tot_KB_recv - bytesReceived) /STATS_PERIOD)
    bytesSent = tot_KB_sent
    bytesReceived = tot_KB_recv
end


function terminator()
  events.sleep(max_time)
  os.exit()
end

function main()
-- this thread will be in charge of killing the node after max_time seconds
    events.thread(terminator)

    log:print("UP: "..job.me.ip..":"..job.me.port.."id "..me.id)

-- init random number generator
    math.randomseed(job.position*os.time())

-- wait for all nodes to start up (conservative)
  events.sleep(5)

-- desynchronize the nodes
    local desync_wait = (GOSSIP_TIME * math.random())
  log:print("waiting for "..desync_wait.." to desynchronize")
    events.sleep(desync_wait)

    PSS.pss_init()
    PSS_thread = events.periodic(PSS_SHUFFLE_PERIOD, PSS.pss_active_thread)
    events.sleep(60)

    TKAD.precompute_routing_table()
    TKAD.init() 
    events.sleep(20)

    log:print("VIEW_CONSTRUCTION_START_TIME", misc.time())
    TKAD_thread = events.periodic(GOSSIP_TIME, TKAD.active_thread)

    events.sleep(200)

    events.kill(TKAD_thread)
    events.kill(PSS_thread)

    if TKAD_LOOKUP then
        --generate keys
        for i = 1, 50 do
            TKAD.keys[#TKAD.keys+1] = compute_id(math.random())
        end

        --precompute responsible nodes
        TKAD.precompute_resp_nodes()        
        events.sleep(20)

        --start lookups
        for i,v in ipairs(TKAD.keys) do
            events.thread(function() TKAD.lookup(v) end)
            events.sleep(1)
        end   
    end
end

events.thread(main)
events.loop()
@etriviere
Copy link

Hi

How can you be sure this is a problem with the concurrency? A simple way to check this could be with random chains of RPCs between nodes in a network, to see if there are more delays than what is expected from serving the currently processed events/ messages.

Or do you mean we should have a multi-threaded support for splay nodes? This would make the complexity of development for each node much larger, and I am not sure this is so much of a benefit.

best
Etienne

On 13 Nov 2015, at 16:56, Valerio Schiavoni [email protected] wrote:

There are scenarios where the concurrency level allowed by the Splay runtime is not sufficient.
One of them is the execution of the following protocol. It consists of the T-Kad protocol, gossip-based construction of the KAD DHT.
The problematic scenario occurs when deployed over a cluster of 600 splayds and using exactly 600 nodes. Each of the nodes issue 500 queries more or less concurrently.

The attached plot gantt.pdfhttps://github.com/splay-project/splay/files/34101/gantt.pdf shows that queries (on the y-axis) get slower and slower (longer blue bars on the x-axis).

We might need a simpler test case to identify and possibly optimise the runtime.


-- modules

require"splay.base"
rpc = require"splay.rpc"
bit = require"bit"
rpc.l_o.level=1
misc = require "splay.misc"
crypto = require "crypto"

-- addition to allow local run
PARAMS={}
local cmd_line_args=nil
if not job then --outside the sandbox
if #arg < 2 then
print("lua ", arg[0], " my_position nb_nodes")
os.exit()
else
local pos, total = tonumber(arg[1]), tonumber(arg[2])
local utils = require("splay.utils")
job = utils.generate_job(pos, total, 20001)
cmd_line_args=arg[3]
end
end

if arg~=nil then
if cmd_line_args==nil then cmd_line_args=arg[1] end
if cmd_line_args~=nil and cmd_line_args~="" then
print("ARGS: ",cmd_line_args)
for _,v in pairs(misc.split(cmd_line_args,":")) do
local t=misc.split(v,"=")
PARAMS[t[1]]=t[2]
end
end
end

rpc.server(job.me.port)


-- current node

-- 31 bit is currently the maximal id space: BitOp library provides operations only in the range of signed 32 bit numbers
bits = 31
function compute_id(o) return string.sub(crypto.evp.new("sha1"):digest(o), 1, bits/ 4) end

me = {}
me.peer = job.me
me.age = 0
me.id = compute_id(job.me.ip..job.me.port)

function num(k)
if k.id then return tonumber("0x"..k.id) else return tonumber("0x"..k) end
end


-- parameters

max_time = 800

--T-KAD params
TKAD_MESSAGE = tonumber(PARAMS["TKAD_MESSAGE"]) or 10
TKAD_VIEW = tonumber(PARAMS["TKAD_VIEW"]) or 10
GOSSIP_TIME = tonumber(PARAMS["GOSSIP_TIME"]) or 7
K_SIZE = tonumber(PARAMS["K_SIZE"]) or 3
TKAD_CONVERGE = PARAMS["TKAD_CONVERGE"] or false
TKAD_LOOKUP = PARAMS["TKAD_LOOKUP"] or true
PARTNER_SELECT = tonumber(PARAMS["PARTNER_SELECT"]) or 0

--PSS params
PSS_VIEW_SIZE =tonumber(PARAMS["PSS_VIEW_SIZE"]) or 10
PSS_SHUFFLE_SIZE = tonumber(PARAMS["PSS_SHUFFLE_SIZE"]) or math.floor(PSS_VIEW_SIZE / 2 + 0.5)
PSS_SHUFFLE_PERIOD = tonumber(PARAMS["PSS_SHUFFLE_PERIOD"]) or 10

bytesSent = 0
bytesReceived = 0

-- ############################################################################
-- Peer Sampling Service
-- ############################################################################

PSS = {

view = {},
view_copy = {},
c = PSS_VIEW_SIZE,
exch = PSS_SHUFFLE_SIZE,
S = math.floor(PSS_VIEW_SIZE/ 2 + 0.5),
H = 0,
SEL = "rand", -- could also be "tail"
view_copy_lock = events.lock(),

-- utilities
print_table = function(t)
print("[ (size "..#t..")")
for i=1,#t do
print(" "..i.." : ".."["..t[i].peer.ip..":"..t[i].peer.port.."] - age: "..t[i].age.." - id: "..t[i].id)
end
print("]")
end,

set_of_peers_to_string = function(v)
ret = ""; for i=1,#v do ret = ret..v[i].id.." " end
return ret
end,

print_set_of_peers = function(v,message)
if message then log:print(message) end
log:print(PSS.set_of_peers_to_string(v))
end,

print_view = function(message)
if message then log:print(message) end
log:print("PSS VIEW_CONTENT "..job.position.." "..PSS.set_of_peers_to_string(PSS.view))
end,

-- peer sampling functions

pss_selectPartner= function()
if #PSS.view > 0 then
if PSS.SEL == "rand" then return math.random(#PSS.view) end
if PSS.SEL == "tail" then
local ret_ind = -1 ; local ret_age = -1
for i,p in pairs(PSS.view) do
if (p.age > ret_age) then ret_ind = i;ret_age=p.age end
end
assert (not (ret_ind == -1))
return ret_ind
end
else
return false
end
end,

same_peer_but_different_ages = function(a,b)
return a.peer.ip == b.peer.ip and a.peer.port == b.peer.port
end,

same_peer = function(a,b)
return PSS.same_peer_but_different_ages(a,b) and a.age == b.age
end,

pss_selectToSend = function()
-- create a new return buffer
local toSend = {}
-- append the local node view age 0
table.insert(toSend,{peer={ip=job.me.ip,port=job.me.port},age=0,id=me.id})
-- shuffle view
PSS.view = misc.shuffle(PSS.view)
-- move oldest H items to the end of the view
--- 1. copy the view
local tmp_view = misc.dup(PSS.view)
--- 2. sort the items based on the age
table.sort(tmp_view,function(a,b) return a.age < b.age end)
--- 3. get the H largest aged elements from the tmp_view, remove them from the view
--- (we assume there are no duplicates in the view at this point!)
--- and put them at the end of the view
for i=(#tmp_view-PSS.H+1),#tmp_view do
local ind = -1
for j=1,#PSS.view do
if PSS.same_peer(tmp_view[i],PSS.view[j]) then ind=j; break end
end
assert (not (ind == -1))
elem = table.remove(PSS.view,ind)
PSS.view[#PSS.view+1] = elem
end

   -- append the first exch-1 elements of view to toSend
   for i=1,(PSS.exch-1) do
       toSend[#toSend+1]=PSS.view[i]
   end

   return toSend

end,

pss_selectToKeep = function(received)
local selectToKeepStart= misc.time()
-- concatenate the view and the received set of view items
for j=1,#received do PSS.view[#PSS.view+1] = received[j] end

   -- remove duplicates from view
   -- note that we can't rely on sorting the table as we need its order later
   local i = 1
   while i < #PSS.view-1 do
       for j=i+1,#PSS.view do
           if PSS.same_peer_but_different_ages(PSS.view[i],PSS.view[j]) then
               -- delete the oldest
               if PSS.view[i].age < PSS.view[j].age then
                   table.remove(PSS.view,j)
               else
                   table.remove(PSS.view,i)
               end
               i = i - 1 -- we need to retest for i in case there is one more duplicate
               break
           end
       end
       i = i + 1
   end

   -- remove the min(H,#view-c) oldest items from view
   local o = math.min(PSS.H,#PSS.view-PSS.c)
   while o > 0 do
       -- brute force -- remove the oldest
       local oldest_index = -1
       local oldest_age = -1
       for i=1,#PSS.view do
           if oldest_age < PSS.view[i].age then
               oldest_age = PSS.view[i].age
               oldest_index = i
           end
       end
       assert (not (oldest_index == -1))
       table.remove(PSS.view,oldest_index)
       o = o - 1
   end

   -- remove the min(S,#view-c) head items from view
   o = math.min(PSS.S,#PSS.view-PSS.c)
   while o > 0 do
       table.remove(PSS.view,1) -- not optimal
       o = o - 1
   end

   -- in the case there still are too many peers in the view, remove at random
   while #PSS.view > PSS.c do table.remove(PSS.view,math.random(#PSS.view)) end

   assert (#PSS.view <= PSS.c)
   --log:print("PSS_SELECT_TO_KEEP ", ( misc.time() - selectToKeepStart ) )

end,

ongoing_at_rpc=false,

is_init = false,

pss_passive_thread = function(from,buffer)
if PSS.ongoing_at_rpc or not PSS.is_init then
return false
end

   --PSS.print_view("passive_thread ("..job.position.."): entering")
   --PSS.print_set_of_peers(buffer,"passive_thread ("..job.position.."): received from "..from)
   local ret = PSS.pss_selectToSend()
   PSS.pss_selectToKeep(buffer)
   --PSS.print_view("passive_thread ("..job.position.."): after selectToKeep")
   return ret

end,

pss_send_at_rpc = function(peer,pos,buf)
local ok, r = rpc.acall(peer,{"PSS.pss_passive_thread", pos, buf},PSS_SHUFFLE_PERIOD/2)
return ok,r
end,

pss_active_thread = function()
PSS.ongoing_at_rpc=true
-- select a partner
local exchange_aborted=true
local exchange_retry=2
for i=1,exchange_retry do --up to 2 attemps per round, re-do in case of conflict
partner_ind = PSS.pss_selectPartner()
if not partner_ind then
log:print("pss_active_thread: pss view is empty, no partner can be selected")
return
end
partner = PSS.view[partner_ind]
-- remove the partner from the view
table.remove(PSS.view,partner_ind)
-- select what to send to the partner
buffer = PSS.pss_selectToSend()
--PSS.print_set_of_peers(buffer,"active_thread ("..job.position.."): sending to "..partner.id)

       -- send to the partner
       local rpcStart=misc.time()
       local ok, r = PSS.pss_send_at_rpc(partner.peer,job.position, buffer) -- rpc.acall(partner.peer,{"PSS.pss_passive_thread", job.position, buffer},PSS_SHUFFLE_PERIOD/2)
       --log:print("PSS.pss_passive_thread.RPC ",  misc.time() - rpcStart  )

       if ok then
           -- select what to keep etc.
           local received = r[1]
           if received==false then
               log:print("PSS received false due to ongoing RPC or yet uninitialized view, will try again in a short while")
               events.sleep(math.random())
               --the call was aborted due to pending RPC at peer's node
           else
               exchange_aborted=false
               --PSS.print_set_of_peers(received,"active_thread ("..job.position.."): received from "..partner.id)
               PSS.pss_selectToKeep(received)
               --PSS.print_view("active_thread ("..job.position.."): after selectToKeep")
           end
       else
           -- peer not replying? remove it from view!
           log:print("on peer ("..job.position..") peer "..partner.id.." did not respond -- removing it from the view")
           log:warning("PSS.pss_passive_thread RPC error:", r)
           table.remove(PSS.view,partner_ind)
       end
       if exchange_aborted==false then break end
   end

   PSS.view_copy_lock:lock()
   local viewCopyLock = misc.time()
   PSS.view_copy = misc.dup(PSS.view)
   --log:print("PSS_VIEW_COPY_LOCK_HELD ", ( misc.time() - viewCopyLock ) )
   PSS.view_copy_lock:unlock()
   for _,v in ipairs(PSS.view) do
           v.age = v.age+1
   end
   -- now, allow to have an incoming passive thread request
   PSS.ongoing_at_rpc=false

end,

-- API
pss_getPeer = function()
PSS.view_copy_lock:lock()
local getPeerLockHeldStart = misc.time()

   local peer = PSS.view_copy[math.random(#PSS.view_copy)]

   --log:print("PSS_GET_PEER_LOCK_HELD_VIEW_COPY ", ( misc.time() - getPeerLockHeldStart ) )
   PSS.view_copy_lock:unlock()

   return peer

end,

pss_init = function()
-- ideally, would perform a random walk on an existing overlay
-- but here we emerge from the void, so let's use the Splay provided peers.
-- Ages are taken randomly in [0..c] but could be 0 as well.
local indexes = {}
for i=1,#job.nodes do indexes[#indexes+1]=i end
table.remove(indexes,job.position) --remove myself
local selected_indexes = misc.random_pick(indexes,math.min(PSS.c,#indexes))
for _,v in ipairs(selected_indexes) do
local a_peer = job.nodes[v]
local hashed_index = compute_id(a_peer.ip..a_peer.port)
PSS.view[#PSS.view+1] =
{peer=a_peer,age=math.random(PSS.c),id=hashed_index}
end
PSS.view_copy = misc.dup(PSS.view)
PSS.is_init = true
assert (#PSS.view == math.min(PSS.c,#indexes))
--PSS.print_view("PSS initial view")
end,

log_view = function()
-- called once to log the view
events.sleep(10.5*PSS_SHUFFLE_PERIOD)
log:print("VIEW_CONTENT "..job.position.." "..PSS.set_of_peers_to_string(PSS.view))
end,

}

-- ############################################################################
-- T-KAD
-- ############################################################################

TKAD = {
view = {},
routing_table = {},
v = TKAD_VIEW,
k = K_SIZE,
m = TKAD_MESSAGE,
view_lock = events.lock(),
rt_lock = events.lock(),
cycle = 0,
c_lock = events.lock(),
ideal_rt = {},
keys = {},
responsible = {},
opt_links = 0,
mand_links = 0,


-- debug

display_view = function(v, which)
local display = table.concat({which,"\n"})
for i,w in ipairs(v) do
display = table.concat({display, " ",num(w)})
end
log:print(display.."\n")
end,

display_rt = function()
log:print("ROUTING TABLE:", num(me.id))
for i = 0, bits do
if TKAD.routing_table[i] and #TKAD.routing_table[i] > 0 then
local out = ""
for j,v in ipairs(TKAD.routing_table[i]) do
out = table.concat({out,num(v.id)," | "})
end
log:print(i, out)
end
end
end,

debug = function(c)
log:print("TKAD cycle:", c)
log:print(TKAD.display_rt())
end,


-- utilities

remove_dup = function(set)
for i,v in ipairs(set) do
local j = i+1
while(j <= #set and #set > 0) do
if v.id == set[j].id then
table.remove(set,j)
else j = j + 1
end
end
end
end,

--keep n first elelements from t
keep_n = function(t,n)
for i = #t, n+1, -1 do
table.remove(t,i)
end
end,

same_node = function(n1,n2)
local peer_first
if n1.peer then peer_first = n1.peer else peer_first = n1 end
local peer_second
if n2.peer then peer_second = n2.peer else peer_second = n2 end
return peer_first.port == peer_second.port and peer_first.ip == peer_second.ip
end,

remove_node = function(t, node)
local j = 1
for i = 1, #t do
if TKAD.same_node(t[j],node) then table.remove(t, j)
else j = j+1 end
end
end,

--flatten a two-dimensional array
flatten = function(t)
result = {}
for i,v in ipairs(t) do
for j,w in ipairs(v) do
result[#result+1] = w
end
end
return result
end,

-- computes the diff between two ids based on the number of bits in which they differ
xor_diff = function(n,m)
local xor_result = bit.bxor(n,m)
local diff = 0
while xor_result > 0 do
diff = diff + xor_result%2
--arithmetic right shift
xor_result = bit.arshift(xor_result,1)
end
return diff
end,

--ranks nodes according to the number of differing bits in their IDs;
--used for selecting TKAD peer and creating TKAD message
xor_rank = function(set, partner)
table.sort(set, function (a,b) return TKAD.xor_diff(num(a), num(partner)) < TKAD.xor_diff(num(b), num(partner)) end)
end,

xor_rank_pure = function(set, partner)
table.sort(set, function (a,b) return bit.bxor(num(a), num(partner)) < bit.bxor(num(b), num(partner)) end)
end,

already_in = function(t,n)
for i,v in ipairs(t) do
if v.id == n.id then return i end
end
return -1
end,

--computes the number of the k-bucket for the given node
bucket_num = function(p)
local n = bit.bxor(num(p), num(me))
local counter = 0
while n ~= 0 do
--logical rightshift
n = bit.rshift(n, 1)
counter = counter+1
end
return bits-counter
end,

latency = function(n)
local latency = rpc.ping(n.peer)
if latency then return latency end
end,

remove_failed_node = function(node)
TKAD.view_lock:lock()
TKAD.remove_node(TKAD.view, node)
TKAD.view_lock:unlock()
TKAD.rt_lock:lock()
TKAD.remove_node(TKAD.routing_table, node)
TKAD.rt_lock:unlock()

end,


-- Convergence

hash_all = function()
local ids = {}
for i,v in ipairs(job.nodes) do
if not TKAD.same_node(v,me) then
local hashed_index = compute_id(v.ip..v.port)
ids[#ids+1] = hashed_index
end
end
return ids
end,

precompute_routing_table = function()
local entries = {}
if TKAD_CONVERGE then
for i = 0, bits do TKAD.ideal_rt[i] = {} end
local ids = TKAD.hash_all()
for i,v in ipairs(ids) do
local buck = TKAD.bucket_num(v)
if entries[buck] then entries[buck] = entries[buck]+1
else entries[buck] = 1 end
--print(v.id, TKAD.bit_out(v), buck)
table.insert(TKAD.ideal_rt[buck], v)
end
for i,v in pairs(entries) do
if v == 1 then TKAD.mand_links = TKAD.mand_links+1
else
if v > TKAD.k then
TKAD.mand_links = TKAD.mand_links+1
TKAD.opt_links = TKAD.opt_links+TKAD.k-1
else
TKAD.mand_links = TKAD.mand_links+1
TKAD.opt_links = TKAD.opt_links+v-1
end
end
end
--TKAD.display_ideal_rt()
log:print("COMPLETE_VIEW_STATE "..me.id.." mandatory ".. TKAD.mand_links.." optional "..TKAD.opt_links)
end
end,

display_ideal_rt = function()
log:print("IDEAL ROUTING TABLE ", num(me.id))
for i = 0, bits do
if TKAD.ideal_rt[i] and #TKAD.ideal_rt[i] > 0 then
local out = ""
for j,v in ipairs(TKAD.ideal_rt[i]) do
out = out..num(v).." | "
end
log:print(i, out)
end
end
end,

-- Every node must know at least one other node in each of its non-empty subtrees
check_convergence = function(c,thread)
if TKAD_CONVERGE then
local opt_entries, mand_entries = 0,0
for i = 0, #TKAD.routing_table do
if TKAD.routing_table[i] and #TKAD.routing_table[i]>0 then
mand_entries = mand_entries+1
if #TKAD.routing_table[i]>1 then
opt_entries = opt_entries+#TKAD.routing_table[i]-1
end
end
end
log:print(table.concat({thread," TMAN_cycle ",c," CURRENT_VIEW_STATE ", me.id, " mandatory ",mand_entries/(TKAD.mand_links/100), " optional ",opt_entries/(TKAD.opt_links/100)," bytes_sent ", bytesSent," bytes_received ", bytesReceived}))
log:print("opt_entries", opt_entries, "mand_entries", mand_entries)
end
end,


-- Lookup and routing efficiency

-- alfa = 1 (number of find_node RPCs sent asynchronously)

--return the index of the first node in the results that has not been queried yet, if no such node found returns -1
already_sent = function(results, sent_set)
for i,v in ipairs(results) do
local sent = false
for _, w in ipairs(sent_set) do
if v.peer == w.peer then sent = true break end
end
if not sent then return i end
end
return -1
end,

--updates the results list by merging it with set and keeping k closest nodes
update_results = function(results, set, key)
local updated = misc.merge(results, set)
TKAD.remove_dup(updated)
TKAD.xor_rank_pure(updated, key)
TKAD.keep_n(updated, TKAD.k)
return updated
end,

--selects k closest nodes
k_closest_bucket_nodes = function(key)
local results, buck = {}, TKAD.bucket_num(key)
for i = #TKAD.routing_table, buck, -1 do
if #TKAD.routing_table[i] > 0 then results = misc.merge(results, TKAD.routing_table[i]) end
end
if #results == TKAD.k then return results
elseif #results > TKAD.k then local set = {} results = TKAD.update_results(results, set, key) return results end
buck = buck-1
while buck >= 0 do
if #TKAD.routing_table[buck] > 0 then results = misc.merge(results, TKAD.routing_table[buck]) end
buck = buck-1
if #results == TKAD.k then return results
elseif #results > TKAD.k then local set = {} results = TKAD.update_results(results, set, key) return results end
end
return results
end,

callback =function(signal, results)
events.fire(signal,results)
end,

find_node = function(signal, me, key)
events.thread(function()
local closest = TKAD.k_closest_bucket_nodes(key)
rpc.call(me.peer, {'TKAD.callback', signal, closest})
end)
end,

find = function(key)
--the first nodes to contact are k nodes from the closest k-bucket
results = TKAD.k_closest_bucket_nodes(key)
local hops = 0
local start_time = tostring(misc.time())
local sent = {}
while true do
local index = TKAD.already_sent(results, sent)
if index == -1 then return results, hops, start_time
else
local current = results[index]
--display_view(results, "results before update from: "..num(current).."("..bit_out(current.id).."):")
local signal = current.id..key
sent[#sent+1] = current
rpc.call(current.peer, {'TKAD.find_node', signal, me, key})
local p_results = events.wait(signal)
--display_view(p_results, "p_results from node: "..num(current).."("..bit_out(current.id).."):")
results = TKAD.update_results(results, p_results, key)
--display_view(results, "results after update from: "..num(current).."("..bit_out(current.id).."):")
hops = hops + 1
end
end
end,

lookup = function(key)
log:print("LOOKUP_START ".. num(key).."("..key ..")")
local nearest_k, hops, start_time = TKAD.find(key)
local elapsed = misc.time()-start_time
log:print("LOOKUP_END")
local correct = TKAD.check_correctness(key, nearest_k)
local res=nil
if correct then res = "CORRECT_FOUND " else res = "CORRECT_NOT_FOUND " end
local msg = res.."LOOKUP KEY "..num(key).." HOPS "..hops.." DELAY "..elapsed.." RESPONSIBLE: "
for i,v in ipairs(nearest_k) do
msg = msg..table.concat({" ",num(v)})
end
log:print(msg)
end,


-- Routing correctness

precompute_resp_nodes = function()
local ids = TKAD.hash_all()
ids[#ids+1] = me.id
for i,k in ipairs(TKAD.keys) do
TKAD.xor_rank_pure(ids, k)
TKAD.responsible[k] = {}
--local out="resp for "..k.." with bucket_num "..TKAD.bucket_num(k)..": "
for i=1, TKAD.k do
table.insert(TKAD.responsible[k],ids[i])
-- out=out.." "..num(ids[i])
end
-- log:print(out)
end
end,

check_correctness = function(key,closest)
for i,v in ipairs(closest) do
--log:print("closest", i, num(v))
local match = false
for j,w in ipairs(TKAD.responsible[key]) do
if num(v) == num(w) then match = true end
end
if not match then return false end
end
return true
end,


-- TMAN

init = function()
for i = 0, bits - 1 do TKAD.routing_table[i] = {} end
TKAD.view = misc.random_pick(PSS.view, TKAD.v)
TKAD.check_convergence(TKAD.cycle,"TKAD.init")
end,

-- ranks view according to xor_diff from self and selects a random node from the first m nodes
select_peer = function()
return misc.random_pick(TKAD.view)
end,

-- select peer from pss view, different from self
select_peer_pss = function()
PSS.view_copy_lock:lock()
local i
repeat i = math.random(#PSS.view_copy)
until not TKAD.same_node(PSS.view_copy[i], me)
local partner = PSS.view_copy[i]
PSS.view_copy_lock:unlock()
return partner
end,

create_message = function(partner)
TKAD.view_lock:lock()
local buffer = misc.dup(TKAD.view)
TKAD.view_lock:unlock()
TKAD.rt_lock:lock()
buffer = misc.merge(buffer,PSS.view, TKAD.flatten(TKAD.routing_table))
TKAD.rt_lock:unlock()
buffer[#buffer+1] = me
TKAD.remove_dup(buffer)
TKAD.remove_node(buffer, partner)
return buffer
end,

--merges TKAD view with the received message
update_view = function(received)
TKAD.view_lock:lock()
TKAD.view = misc.merge(TKAD.view, received)
TKAD.remove_dup(TKAD.view)
TKAD.xor_rank_pure(TKAD.view,me)
--TKAD.xor_rank(TKAD.view,me)
TKAD.keep_n(TKAD.view, TKAD.v)
TKAD.view_lock:unlock()
end,

-- add nodes from the received to the corresponding k-buckets
update_prefix_table = function(received)
TKAD.rt_lock:lock()
for i,v in ipairs(received) do
local buck = TKAD.bucket_num(v)
if not TKAD.routing_table[buck] then
TKAD.routing_table[buck] = {}
table.insert(TKAD.routing_table[buck], v)
else
if #TKAD.routing_table[buck] < TKAD.k then --bucket is not full
--check if this element is already in the bucket: yes - move it to the tail, no - add it to the tail
local index = TKAD.already_in(TKAD.routing_table[buck], v)
if index > 0 then
table.insert(TKAD.routing_table[buck], table.remove(TKAD.routing_table[buck], index))
else table.insert(TKAD.routing_table[buck], v) end
else --bucket full: ping the element at the head, if there is response - move to the tail; no response - evict and push the new element to the tail
if TKAD.latency(TKAD.routing_table[buck][1]) then
table.insert(TKAD.routing_table[buck], table.remove(TKAD.routing_table[buck], 1))
else
table.remove(TKAD.routing_table[buck], 1)
table.insert(TKAD.routing_table[buck], v)
end
end
end
end
TKAD.rt_lock:unlock()
end,

passive_thread = function(received,sender)
local buffer = TKAD.create_message(sender)
TKAD.update_view(received)
TKAD.update_prefix_table(received)
return buffer
end,

select_partner = function(partner_select_code, loc_cycle)
if partner_select_code == 0 then --alternating
if loc_cycle <= 2 then return TKAD.select_peer_pss()
else
if loc_cycle%2==0 then return TKAD.select_peer_pss() else return TKAD.select_peer() end
end
elseif partner_select_code == 1 then --only pss
return TKAD.select_peer_pss()
elseif partner_select_code == 2 then --only view
return TKAD.select_peer()
end
end,

active_thread = function()
TKAD.c_lock:lock()
local loc_cycle = TKAD.cycle + 1
TKAD.cycle = TKAD.cycle + 1
TKAD.c_lock:unlock()
local partner = TKAD.select_partner(PARTNER_SELECT,loc_cycle)
local buffer = TKAD.create_message(partner)
local try = 0
local ok, res = rpc.acall(partner.peer, {'TKAD.passive_thread', buffer, me})
while not ok do
try = try + 1
if try <= 3 then
log:print("TKAD active thread: no response from:"..partner.id.. ": "..tostring(res).." => try again")
events.sleep(math.random(try * 30, try * 60))
ok, res = rpc.acall(partner.peer, {'TKAD.passive_thread', buffer, me})
else
log:print("TKAD active thread: no response from:"..partner.id..": "..tostring(res).." => removing it from view")
TKAD.remove_failed_node(partner)
break
end
end
if ok then
local received = res[1]
TKAD.update_view(received)
TKAD.update_prefix_table(received)
--resource_stats()
TKAD.check_convergence(loc_cycle,"TKAD.active_thread")
--TKAD.debug(loc_cycle)
end
end,
}


-- Main loop

function resource_stats()
--log:print("MEMORY_USED_Kb ", gcinfo())
local ts,tr = socket.stats()
local tot_KB_sent=misc.bitcalc(ts).kilobytes
local tot_KB_recv=misc.bitcalc(tr).kilobytes
--log:print("BANDWIDTH_TOTAL ",tot_KB_sent, tot_KB_recv)
--log:print("BANDWIDTH_RATE ", (tot_KB_sent - bytesSent )/STATS_PERIOD, (tot_KB_recv - bytesReceived) /STATS_PERIOD)
bytesSent = tot_KB_sent
bytesReceived = tot_KB_recv
end

function terminator()
events.sleep(max_time)
os.exit()
end

function main()
-- this thread will be in charge of killing the node after max_time seconds
events.thread(terminator)

log:print("UP: "..job.me.ip..":"..job.me.port.."id "..me.id)

-- init random number generator
math.randomseed(job.position*os.time())

-- wait for all nodes to start up (conservative)
events.sleep(5)

-- desynchronize the nodes
local desync_wait = (GOSSIP_TIME * math.random())
log:print("waiting for "..desync_wait.." to desynchronize")
events.sleep(desync_wait)

PSS.pss_init()
PSS_thread = events.periodic(PSS_SHUFFLE_PERIOD, PSS.pss_active_thread)
events.sleep(60)

TKAD.precompute_routing_table()
TKAD.init()
events.sleep(20)

log:print("VIEW_CONSTRUCTION_START_TIME", misc.time())
TKAD_thread = events.periodic(GOSSIP_TIME, TKAD.active_thread)

events.sleep(200)

events.kill(TKAD_thread)
events.kill(PSS_thread)

if TKAD_LOOKUP then
--generate keys
for i = 1, 50 do
TKAD.keys[#TKAD.keys+1] = compute_id(math.random())
end

   --precompute responsible nodes
   TKAD.precompute_resp_nodes()
   events.sleep(20)

   --start lookups
   for i,v in ipairs(TKAD.keys) do
       events.thread(function() TKAD.lookup(v) end)
       events.sleep(1)
   end

end
end

events.thread(main)
events.loop()

Reply to this email directly or view it on GitHubhttps://github.com//issues/52.

@vschiavoni
Copy link
Member Author

I'm confident it's a concurrency issue because the same scenario with less concurrent queries (thus, less overlapping in-flight queries traversing the network) does not produce the behaviours shown in the attached gantt chart (that is, queries getting slower and slower over time).
It'd be nice to have a simpler test case, I agree.
Exploiting the multi-core nature of today's hardware using the same facilities we're used to with Splay to produce more efficient code would not hurt: as of today, the observed performances of the prototypes implemented in Splay are somewhat limited by the (fake) concurrency support. Something I had experimented with was https://github.com/vschiavoni/splay_llthreads

@etriviere
Copy link

This is true but this would require a re-design that goes beyond what we can do. In particular, we would probably have to move away from Lua. Note sure this is worth the pain.

Etienne

On 23 Nov 2015, at 14:24, Valerio Schiavoni [email protected] wrote:

I'm confident it's a concurrency issue because the same scenario with less concurrent queries (thus, less overlapping in-flight queries traversing the network) does not produce the behaviours shown in the attached gantt chart (that is, queries getting slower and slower over time).
It'd be nice to have a simpler test case, I agree.
Exploiting the multi-core nature of today's hardware using the same facilities we're used to with Splay to produce more efficient code would not hurt: as of today, the observed performances of the prototypes implemented in Splay are somewhat limited by the (fake) concurrency support. Something I had experimented with was https://github.com/vschiavoni/splay_llthreads


Reply to this email directly or view it on GitHubhttps://github.com//issues/52#issuecomment-158932613.

@vschiavoni vschiavoni changed the title Blocked queries (probably due Blocked queries Feb 4, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants