Skip to content

Commit

Permalink
Merge pull request #11 from rightscale/CF-2601_neverblock_and_threads
Browse files Browse the repository at this point in the history
CF-2601 Have Neverblock + threads play nice
  • Loading branch information
psschroeter authored Nov 6, 2017
2 parents be30ac1 + dcfd591 commit 5973921
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 17 deletions.
10 changes: 6 additions & 4 deletions lib/neverblock.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'net/http'

# Author:: Mohammad A. Ali (mailto:[email protected])
# Copyright:: Copyright (c) 2008 eSpace, Inc.
# License:: Distributes under the same terms as Ruby
Expand All @@ -14,16 +16,16 @@ def self.logger

# Checks if we should be working in a non-blocking mode
def self.neverblocking?
NB::Fiber.respond_to?(:current) && NB::Fiber.current.respond_to?('[]') && NB::Fiber.current[:neverblock] && NB.reactor.reactor_running?
NB::Fiber.current.respond_to?(:neverblock) && NB::Fiber.current.neverblock && NB.reactor.reactor_running?
end

# The given block will run its queries either in blocking or non-blocking
# mode based on the first parameter
def self.neverblock(nb = true, &block)
status = NB::Fiber.current[:neverblock]
NB::Fiber.current[:neverblock] = nb
status = NB::Fiber.current.neverblock
NB::Fiber.current.neverblock = !!nb
block.call
NB::Fiber.current[:neverblock] = status
NB::Fiber.current.neverblock = status
end

# Exception to be thrown for all neverblock internal errors
Expand Down
103 changes: 94 additions & 9 deletions lib/neverblock/core/fiber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,35 @@
# License:: Distributes under the same terms as Ruby

require 'fiber'
require 'thread'

class NeverBlock::Fiber < Fiber
attr_accessor :lock_count, :neverblock

def initialize(neverblock = true, &block)
self[:neverblock] = neverblock
@local_fiber_variables = {}
@neverblock = neverblock
@lock_count = 0
super()
end


#Attribute Reference--Returns the value of a fiber-local variable, using
#either a symbol or a string name. If the specified variable does not exist,
#returns nil.
def [](key)
local_fiber_variables[key]
@local_fiber_variables[key]
end

#Attribute Assignment--Sets or creates the value of a fiber-local variable,
#using either a symbol or a string. See also Fiber#[].
def []=(key,value)
local_fiber_variables[key] = value
@local_fiber_variables[key] = value
end


def delete(key)
@local_fiber_variables.delete(key)
end

#Sending an exception instance to resume will yield the fiber
#and then raise the exception. This is necessary to raise exceptions
#in their correct context.
Expand All @@ -33,12 +40,90 @@ def self.yield(*args)
raise result if result.is_a? Exception
result
end
end

# Code that is designed to work with mutexes and threads but isn't aware of fibers
# will potentially screw up.
class Mutex
alias :_orig_try_lock :try_lock
def try_lock
acquired = _orig_try_lock
disable_neverblock if acquired
acquired
end

# sleep is mostly used by ConditionVariable
alias :_orig_sleep :sleep
def sleep(*args)
restore_neverblock
ret = _orig_sleep(*args)
disable_neverblock
ret
end

alias :_orig_lock :lock
def lock
ret = _orig_lock
disable_neverblock
ret
end

alias :_orig_unlock :unlock
def unlock
restore_neverblock
_orig_unlock
end

alias :_orig_synchronize :synchronize
def synchronize
_orig_synchronize do
begin
disable_neverblock
yield
ensure
restore_neverblock
end
end
end

private

def local_fiber_variables
@local_fiber_variables ||= {}

# slow, only uncomment for specs/dev
def _dbg(title)
call_stack = caller[2..4].map{|path| path.gsub(/.*(cache\/|gems\/)/,'')}.join("->")
fiber_str = NB::Fiber.current[:nb_fiber_pool_idx] || NB::Fiber.current.object_id
unless call_stack =~ /merb_syslog_logger.*log/ || call_stack =~ /eventmachine.*next_tick/
$stderr.puts "DEBUG: #{title}: Mutex.lock=#{self.object_id} Fiber=#{fiber_str} Callers=#{call_stack}"
end
end

end
# disable_neverblock disables neverblock using from using its special methods for the current
# fiber. See the "NeverBlock.neverblocking?" function for reference. With this function returning
# false, all neverblock overriden methods should fall back to the standard ruby versions which
# should not switch fibers. Switching fibers in the lock/synchronize section of a mutex can
# lead to "recursive lock" errors as the two fibers can call lock/synchronize twice in the same
# thread. See CF-2601 for reference.
def disable_neverblock
if NB::Fiber.current.respond_to?(:neverblock)
current_fiber = NB::Fiber.current
if current_fiber.lock_count == 0
current_fiber[:neverblock_save] = current_fiber.neverblock
current_fiber.neverblock = false
#_dbg('NEVERBLOCK DISABLED') if current_fiber[:neverblock_save]
end
current_fiber.lock_count += 1
end
end

# restore_neverblock undoes disable_neverblock
def restore_neverblock
if NB::Fiber.current.respond_to?(:neverblock)
current_fiber = NB::Fiber.current
current_fiber.lock_count -= 1
if current_fiber.lock_count == 0
current_fiber.neverblock = current_fiber.delete(:neverblock_save)
#_dbg('NEVERBLOCK RESTORED') if current_fiber.neverblock
end
end
end
end
3 changes: 2 additions & 1 deletion lib/neverblock/core/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def on_empty(&blk)
def spawn(evented = true, &block)
if fiber = @fibers.shift
@busy_fibers[fiber.object_id] = fiber
fiber[:neverblock] = evented
fiber.neverblock = evented
fiber.lock_count = 0
fiber.resume(block)
else
@queue << block
Expand Down
2 changes: 1 addition & 1 deletion neverblock.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ require 'rake'

Gem::Specification.new do |s|
s.name = "neverblock"
s.version = "2.1"
s.version = "2.2"
s.date = "2014-01-31"
s.summary = "Utilities for non-blocking stack components"
s.email = "[email protected]"
Expand Down
150 changes: 150 additions & 0 deletions spec/core/fiber_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
require_relative "../spec_helper"

describe NeverBlock::Fiber do
context "with mutex" do
before(:each) { @mutex = Mutex.new }
it "turns off neverblock in synchronize section" do
results = []
EM.run do
fiber_pool = NB::FiberPool.new(1)
fiber_pool.spawn do
results << NB.neverblocking?
@mutex.synchronize do
results << NB.neverblocking?
end
results << NB.neverblocking?
end
fiber_pool.spawn { EM.stop }
end
results.should == [true, false, true]
end

it "handles multiple synchronizes" do
results = []
@mutex2 = Mutex.new
EM.run do
fiber_pool = NB::FiberPool.new(1)
fiber_pool.spawn do
results << NB.neverblocking?
@mutex.synchronize do
results << NB.neverblocking?
@mutex2.synchronize do
results << NB.neverblocking?
end
results << NB.neverblocking?
end
results << NB.neverblocking?
end
fiber_pool.spawn { EM.stop }
end
results.should == [true, false, false, false, true]
end

it "handles interwoven locks" do
results = []
@mutex2 = Mutex.new
@mutex3 = Mutex.new
EM.run do
fiber_pool = NB::FiberPool.new(1)
fiber_pool.spawn do
# first result is true -- all the rest are false till everything unlocks at the end
results << NB.neverblocking?
@mutex.lock
results << NB.neverblocking?
@mutex2.lock
results << NB.neverblocking?
@mutex3.synchronize { results << NB.neverblocking? }
@mutex.unlock
results << NB.neverblocking?
@mutex3.synchronize { results << NB.neverblocking? }
@mutex2.unlock
results << NB.neverblocking? # expects true
end
fiber_pool.spawn { EM.stop }
end
results.should == [true, false, false, false, false, false, true]
end

it "recovers from errors in synchronize" do
results = []
EM.run do
fiber_pool = NB::FiberPool.new(5)
fiber_pool.spawn do
results << NB.neverblocking?
begin
@mutex.synchronize do
results << NB.neverblocking?
raise "ERROR"
end
rescue
end
results << NB.neverblocking?
end
fiber_pool.spawn { EM.stop }
end
results.should == [true, false, true]
end

# For something like a logger may be background threads calling the logger's
# mutex to synchronize log calls. Those other threads shouldn't affect
# the main event loop thread.
it "turns off neverblock when mutex is locked" do
results = []
results_by_thread = {}
EM.run do
fiber_pool = NB::FiberPool.new(5)
fiber_pool.spawn do
results << NB.neverblocking? # expect true
@mutex.lock
threads = []
5.times do
threads << Thread.new do
@mutex.lock
results_by_thread[Thread.current.object_id] = NB.neverblocking?
@mutex.unlock
end
end
results << NB.neverblocking? # expect false
@mutex.unlock
threads.each(&:join)

results << NB.neverblocking? # expect true
end
fiber_pool.spawn { EM.stop }
end
results.should == [true, false, true]
results_by_thread.values.each do |results|
results.should == false
end
end

# For something like a logger may be background threads calling the logger's
# mutex to synchronize log calls. Those other threads shouldn't affect
# the main event loop thread.
it "handles conditional variables (which use mutex.sleep)" do
results = []
@cv = ConditionVariable.new
EM.run do
fiber_pool = NB::FiberPool.new(5)
fiber_pool.spawn do
results << NB.neverblocking? # expect true
@mutex.lock
results << NB.neverblocking? # expect false
t = Thread.new do
@mutex.synchronize do
results << NB.neverblocking? # expect false
@cv.signal
end
end
@cv.wait(@mutex) # Calls @mutex.sleep and waits for thread to call signal
results << NB.neverblocking? # expect false
@mutex.unlock
results << NB.neverblocking? # expect true
end
fiber_pool.spawn { EM.stop }
end
results.should == [true, false, false, false, true]
end

end
end
4 changes: 2 additions & 2 deletions spec/fiber_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
@fiber_pool.instance_variable_get(:@queue).length.should == 0
end

it "should have fibers with :neverblock fiber variable set to true" do
@fiber_pool.fibers.each {|f| f[:neverblock].should == true}
it "should have fibers with @neverblock instance variable set to true" do
@fiber_pool.fibers.each {|f| f.neverblock.should == true}
end

it "should process a new block if there are available fibers" do
Expand Down

0 comments on commit 5973921

Please sign in to comment.