Skip to content

Commit

Permalink
geoip: quacking like a SubscriptionObserver is enough
Browse files Browse the repository at this point in the history
  • Loading branch information
yaauie committed Oct 3, 2023
1 parent 4070fe2 commit f06b9d2
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 38 deletions.
32 changes: 24 additions & 8 deletions x-pack/lib/geoip_database_management/subscription_observer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ module LogStash module GeoipDatabaseManagement
module SubscriptionObserver

##
# Coerce an object into an SubscriptionObserver, if necessary
# Coerce an object into an `SubscriptionObserver`, if necessary
# @overload coerce(observer)
# @param observer [SubscriptionObserver]
# @param observer [SubscriptionObserver]: an object that "quacks like" a `SubscriptionObserver`
# as defined by `SubscriptionObserver::===`
# @return [SubscriptionObserver]
# @overload coerce(construct:, :on_update, :on_expire)
# @param construct [Proc(DbInfo)->void]: a single-arity Proc that will receive the current
Expand All @@ -32,10 +33,25 @@ module SubscriptionObserver
# @return [SubscriptionObserver::Proxy]
# @api public
def self.coerce(observer_spec)
return observer_spec if SubscriptionObserver === observer_spec
return Proxy.new(**observer_spec) if observer_spec.kind_of?(Hash)
case observer_spec
when SubscriptionObserver then observer_spec
when Hash then Proxy.new(**observer_spec)
else
fail ArgumentError, "Could not make a SubscriptionObserver from #{observer_spec.inspect}"
end
end

##
# Quacks-like check, to simplify consuming from Java where the ruby module can't be
# directly mixed into a Java class
def self.===(candidate)
return true if super

return false unless candidate.respond_to?(:construct)
return false unless candidate.respond_to?(:on_update)
return false unless candidate.respond_to?(:on_expire)

fail ArgumentError, "Could not make a SubscriptionObserver from #{observer_spec.inspect}"
true
end

##
Expand Down Expand Up @@ -63,9 +79,9 @@ class Proxy
include SubscriptionObserver

def initialize(construct:, on_update:, on_expire:)
fail ArgumentError unless construct.lambda? && construct.arity == 1
fail ArgumentError unless on_update.lambda? && on_update.arity == 1
fail ArgumentError unless on_expire.lambda? && on_expire.arity == 0
fail ArgumentError unless construct.respond_to?(:call) && construct.arity == 1
fail ArgumentError unless on_update.respond_to?(:call) && on_update.arity == 1
fail ArgumentError unless on_expire.respond_to?(:call) && on_expire.arity == 0

@construct = construct
@on_update = on_update
Expand Down
81 changes: 51 additions & 30 deletions x-pack/spec/geoip_database_management/subscription_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,48 +155,69 @@
end

context "#observe" do
def observe_to_log(subscription, log)
subscription.observe(construct: ->(v) { log << [:construct, v]},
on_update: ->(v) { log << [:on_update, v]},
on_expire: ->( ) { log << [:on_expire] })
end
shared_examples "observation" do
let!(:log) { Queue.new }

it 'basic functionality' do
log = Queue.new
it "observes construct, update, and expiry" do
current_value = LogStash::GeoipDatabaseManagement::DbInfo.new(path: "/one/two")
subscription.notify(current_value)
expect(log).to be_empty

current_value = LogStash::GeoipDatabaseManagement::DbInfo.new(path: "/one/two")
subscription.notify(current_value)
subscription.observe(observer_spec)

observe_to_log(subscription, log)
expect(log.size).to eq(1)
expect(log.pop(true)).to eq([:construct, current_value])

expect(log.size).to eq(1)
expect(log.pop(true)).to eq([:construct, current_value])
updated_value = LogStash::GeoipDatabaseManagement::DbInfo.new(path: "/three/four")
subscription.notify(updated_value)
expect(log.size).to eq(1)
expect(log.pop(true)).to eq([:on_update, updated_value])

updated_value = LogStash::GeoipDatabaseManagement::DbInfo.new(path: "/three/four")
subscription.notify(updated_value)
expect(log.size).to eq(1)
expect(log.pop(true)).to eq([:on_update, updated_value])
expired_value = LogStash::GeoipDatabaseManagement::DbInfo::EXPIRED
subscription.notify(expired_value)

expired_value = LogStash::GeoipDatabaseManagement::DbInfo::EXPIRED
subscription.notify(expired_value)
expect(log.size).to eq(1)
expect(log.pop(true)).to eq([:on_expire])

expect(log.size).to eq(1)
expect(log.pop(true)).to eq([:on_expire])
another_updated_value = LogStash::GeoipDatabaseManagement::DbInfo.new(path: "/five/six")
subscription.notify(another_updated_value)
expect(log.size).to eq(1)
expect(log.pop(true)).to eq([:on_update, another_updated_value])
end

another_updated_value = LogStash::GeoipDatabaseManagement::DbInfo.new(path: "/five/six")
subscription.notify(another_updated_value)
expect(log.size).to eq(1)
expect(log.pop(true)).to eq([:on_update, another_updated_value])
context 'when subscription was previously released' do
before(:each) { subscription.release! }
it 'prevents new observation' do
expect { subscription.observe(observer_spec) }.to raise_exception(/released/)
expect(log).to be_empty
end
end
end

context 'when subscription was previously released' do
before(:each) { subscription.release! }
it 'prevents new observation' do
log = Queue.new
context "when given a components hash" do
let(:observer_spec) {
{
construct: ->(v) { log << [:construct, v]},
on_update: ->(v) { log << [:on_update, v]},
on_expire: ->( ) { log << [:on_expire] },
}
}

expect { observe_to_log(subscription, log) }.to raise_exception(/released/)
expect(log).to be_empty
include_examples "observation"
end

context "when given an object that quacks like a SubscriptionObserver instance" do
let(:observer_class) do
Class.new do
def initialize(log); @log = log; end
def construct(v); @log << [:construct, v]; end
def on_update(v); @log << [:on_update, v]; end
def on_expire; @log << [:on_expire]; end
end
end
let(:observer_spec) { observer_class.new(log) }

include_examples "observation"
end
end
end

0 comments on commit f06b9d2

Please sign in to comment.