Skip to content

Commit

Permalink
Change redis client library used in outbox
Browse files Browse the repository at this point in the history
Follow the change in sidekiq7, swap Redis to RedisClient
  • Loading branch information
porbas committed Apr 12, 2024
1 parent 2000d9c commit 2caeb5a
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 54 deletions.
2 changes: 1 addition & 1 deletion contrib/ruby_event_store-outbox/Gemfile.rails_6_1
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ eval_gemfile "../../support/bundler/Gemfile.database"

gem "ruby_event_store", path: "../.."

gem "sidekiq", "~> 6.2"
gem "sidekiq", "~> 7.2"
gem "influxdb", "~> 0.8.1", require: false
gem "childprocess"
gem "rails", "~> 6.1.7"
14 changes: 8 additions & 6 deletions contrib/ruby_event_store-outbox/Gemfile.rails_6_1.lock
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ GEM
rake (>= 12.2)
thor (~> 1.0)
rake (13.1.0)
redis (4.8.1)
redis-client (0.21.1)
connection_pool
regexp_parser (2.6.2)
rspec (3.12.0)
rspec-core (~> 3.12.0)
Expand All @@ -185,10 +186,11 @@ GEM
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.12.0)
rspec-support (3.12.1)
sidekiq (6.5.9)
connection_pool (>= 2.2.5, < 3)
rack (~> 2.0)
redis (>= 4.5.0, < 5)
sidekiq (7.2.2)
concurrent-ruby (< 2)
connection_pool (>= 2.3.0)
rack (>= 2.2.4)
redis-client (>= 0.19.0)
sorbet-runtime (0.5.11042)
sprockets (4.2.1)
concurrent-ruby (~> 1.0)
Expand Down Expand Up @@ -233,7 +235,7 @@ DEPENDENCIES
rspec (~> 3.6)
ruby_event_store!
ruby_event_store-outbox!
sidekiq (~> 6.2)
sidekiq (~> 7.2)
sqlite3 (~> 1.7)

BUNDLED WITH
Expand Down
1 change: 1 addition & 0 deletions contrib/ruby_event_store-outbox/Gemfile.sidekiq_6_5
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ eval_gemfile "../../support/bundler/Gemfile.database"
gem "ruby_event_store", path: "../.."

gem "sidekiq", "~> 6.5"
gem "redis-client"
gem "influxdb", "~> 0.8.1", require: false
gem "childprocess"
gem "rails", "~> 6.1.7"
3 changes: 3 additions & 0 deletions contrib/ruby_event_store-outbox/Gemfile.sidekiq_6_5.lock
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ GEM
thor (~> 1.0)
rake (13.1.0)
redis (4.8.1)
redis-client (0.21.1)
connection_pool
regexp_parser (2.6.2)
rspec (3.12.0)
rspec-core (~> 3.12.0)
Expand Down Expand Up @@ -230,6 +232,7 @@ DEPENDENCIES
pg (~> 1.5.4)
rails (~> 6.1.7)
rake (>= 10.0)
redis-client
rspec (~> 3.6)
ruby_event_store!
ruby_event_store-outbox!
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

require "logger"
require "redis"
require "redis-client"
require "active_record"
require_relative "repository"
require_relative "sidekiq5_format"
Expand All @@ -24,7 +24,8 @@ def initialize(consumer_uuid, configuration, clock: Time, logger:, metrics:)
@consumer_uuid = consumer_uuid

raise "Unknown format" if configuration.message_format != SIDEKIQ5_FORMAT
@processor = SidekiqProcessor.new(Redis.new(url: configuration.redis_url))
redis_config = RedisClient.config(url: configuration.redis_url)
@processor = SidekiqProcessor.new(redis_config.new_client)

@repository = Repository.new(configuration.database_url)
@cleanup_strategy = CleanupStrategies.build(configuration, repository)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ def process(record, now)
raise InvalidPayload.new("Missing queue") if queue.nil? || queue.empty?
payload = JSON.generate(parsed_record.merge({ "enqueued_at" => now.to_f }))

redis.lpush("queue:#{queue}", payload)
redis.call("LPUSH", "queue:#{queue}", payload)

@recently_used_queues << queue
rescue Redis::TimeoutError, Redis::ConnectionError
rescue RedisClient::TimeoutError, RedisClient::ConnectionError
raise RetriableError
end

Expand All @@ -38,7 +38,7 @@ def message_format

def ensure_that_sidekiq_knows_about_all_queues
if !@recently_used_queues.empty?
redis.sadd("queues", @recently_used_queues.to_a)
redis.call("SADD", "queues", @recently_used_queues.to_a)
@recently_used_queues.clear
end
end
Expand Down
54 changes: 27 additions & 27 deletions contrib/ruby_event_store-outbox/spec/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Outbox

let(:redis_url) { RedisIsolation.redis_url }
let(:database_url) { ENV["DATABASE_URL"] }
let(:redis) { Redis.new(url: redis_url) }
let(:redis) { RedisClient.config(url: redis_url).new_client }
let(:logger_output) { StringIO.new }
let(:logger) { Logger.new(logger_output) }
let(:default_configuration) do
Expand Down Expand Up @@ -45,8 +45,8 @@ module Outbox
result = consumer.process

record.reload
expect(redis.llen("queue:default")).to eq(1)
payload_in_redis = JSON.parse(redis.lindex("queue:default", 0))
expect(redis.call("LLEN", "queue:default")).to eq(1)
payload_in_redis = JSON.parse(redis.call("LINDEX", "queue:default", 0))
expect(payload_in_redis).to include(JSON.parse(record.payload))
expect(payload_in_redis["enqueued_at"]).to eq(clock.tick(1).to_f)
expect(record.enqueued_at).to eq(clock.tick(1))
Expand All @@ -64,8 +64,8 @@ module Outbox

consumer.process

expect(redis.llen("queue:default")).to eq(2)
expect(redis.llen("queue:default2")).to eq(1)
expect(redis.call("LLEN", "queue:default")).to eq(2)
expect(redis.call("LLEN", "queue:default2")).to eq(1)
end

specify "sidekiq processor ensures that used queues do exist" do
Expand All @@ -76,8 +76,8 @@ module Outbox

consumer.process

expect(redis.scard("queues")).to eq(2)
expect(redis.smembers("queues")).to match_array(%w[queue queue2])
expect(redis.call("SCARD", "queues")).to eq(2)
expect(redis.call("SMEMBERS", "queues")).to match_array(%w[queue queue2])
end

specify "returns false if no records" do
Expand All @@ -101,7 +101,7 @@ module Outbox
result = consumer.process

expect(result).to eq(false)
expect(redis.llen("queue:default")).to eq(0)
expect(redis.call("LLEN", "queue:default")).to eq(0)
end

specify "already processed should be ignored" do
Expand All @@ -112,7 +112,7 @@ module Outbox
result = consumer.process

expect(result).to eq(false)
expect(redis.llen("queue:default")).to eq(0)
expect(redis.call("LLEN", "queue:default")).to eq(0)
expect(logger_output.string).to be_empty
end

Expand All @@ -123,7 +123,7 @@ module Outbox
result = consumer.process

expect(result).to eq(false)
expect(redis.llen("queue:default")).to eq(0)
expect(redis.call("LLEN", "queue:default")).to eq(0)
expect(logger_output.string).to be_empty
end

Expand All @@ -134,7 +134,7 @@ module Outbox
result = consumer.process

expect(result).to eq(false)
expect(redis.llen("queue:other_one")).to eq(0)
expect(redis.call("LLEN", "queue:other_one")).to eq(0)
expect(logger_output.string).to be_empty
end

Expand Down Expand Up @@ -172,7 +172,7 @@ module Outbox
result = consumer.process

expect(result).to eq(true)
expect(redis.llen("queue:default")).to eq(1)
expect(redis.call("LLEN", "queue:default")).to eq(1)
end

specify "incorrect payload wont cause later messages to schedule" do
Expand All @@ -188,7 +188,7 @@ module Outbox
expect(result).to eq(true)
expect(record1.reload.enqueued_at).to be_nil
expect(record2.reload.enqueued_at).to be_present
expect(redis.llen("queue:default")).to eq(1)
expect(redis.call("LLEN", "queue:default")).to eq(1)
expect(logger_output.string).to include("JSON::ParserError")
end

Expand All @@ -209,7 +209,7 @@ module Outbox
expect(logger_output.string).to include("Obtaining lock for split_key 'default' failed (deadlock)")
expect(test_metrics.operation_results).to include({ operation: "obtain", result: "deadlocked" })
expect(result).to eq(false)
expect(redis.llen("queue:default")).to eq(0)
expect(redis.call("LLEN", "queue:default")).to eq(0)
end

specify "lock timeout when obtaining lock just skip that attempt" do
Expand All @@ -229,7 +229,7 @@ module Outbox
expect(logger_output.string).to include("Obtaining lock for split_key 'default' failed (lock timeout)")
expect(test_metrics.operation_results).to include({ operation: "obtain", result: "lock_timeout" })
expect(result).to eq(false)
expect(redis.llen("queue:default")).to eq(0)
expect(redis.call("LLEN", "queue:default")).to eq(0)
end

specify "obtaining taken lock just skip that attempt" do
Expand All @@ -249,7 +249,7 @@ module Outbox
expect(logger_output.string).to include("Obtaining lock for split_key 'default' unsuccessful (taken)")
expect(test_metrics.operation_results).to include({ operation: "obtain", result: "taken" })
expect(result).to eq(false)
expect(redis.llen("queue:default")).to eq(0)
expect(redis.call("LLEN", "queue:default")).to eq(0)
end

specify "deadlock when releasing lock doesnt do anything" do
Expand All @@ -276,7 +276,7 @@ module Outbox
expect(logger_output.string).to include("Releasing lock for split_key 'default' failed \(deadlock\)")
expect(test_metrics.operation_results).to include({ operation: "release", result: "deadlocked" })
expect(result).to eq(true)
expect(redis.llen("queue:default")).to eq(1)
expect(redis.call("LLEN", "queue:default")).to eq(1)
end

specify "lock timeout when releasing lock doesnt do anything" do
Expand All @@ -303,7 +303,7 @@ module Outbox
expect(logger_output.string).to include("Releasing lock for split_key 'default' failed (lock timeout)")
expect(test_metrics.operation_results).to include({ operation: "release", result: "lock_timeout" })
expect(result).to eq(true)
expect(redis.llen("queue:default")).to eq(1)
expect(redis.call("LLEN", "queue:default")).to eq(1)
end

specify "after successful loop, lock is released" do
Expand Down Expand Up @@ -336,7 +336,7 @@ module Outbox
)
expect(test_metrics.operation_results).to include({ operation: "release", result: "not_taken_by_this_process" })
expect(result).to eq(true)
expect(redis.llen("queue:default")).to eq(1)
expect(redis.call("LLEN", "queue:default")).to eq(1)
end

specify "lock stolen in the meantime, doesnt do anything" do
Expand All @@ -353,7 +353,7 @@ module Outbox

expect(logger_output.string).to match(/Releasing lock .* failed \(not taken by this process\)/)
expect(result).to eq(true)
expect(redis.llen("queue:default")).to eq(1)
expect(redis.call("LLEN", "queue:default")).to eq(1)
end

specify "old lock can be reobtained" do
Expand All @@ -368,7 +368,7 @@ module Outbox
result = consumer.process

expect(result).to eq(true)
expect(redis.llen("queue:default")).to eq(1)
expect(redis.call("LLEN", "queue:default")).to eq(1)
expect(record.reload.enqueued_at).to be_present
end

Expand Down Expand Up @@ -399,7 +399,7 @@ module Outbox
result = consumer.process

expect(result).to eq(true)
expect(redis.llen("queue:default")).to eq(1)
expect(redis.call("LLEN", "queue:default")).to eq(1)
expect(record.reload.enqueued_at).to be_present
end

Expand All @@ -411,7 +411,7 @@ module Outbox
second_loop_result = consumer.process

expect(second_loop_result).to eq(true)
expect(redis.llen("queue:default")).to eq(1)
expect(redis.call("LLEN", "queue:default")).to eq(1)
expect(record.reload.enqueued_at).to be_present
end

Expand Down Expand Up @@ -483,7 +483,7 @@ module Outbox
metrics: null_metrics
)
consumer.process
expect(redis.llen("queue:default")).to eq(1)
expect(redis.call("LLEN", "queue:default")).to eq(1)
expect(Repository::Record.count).to eq(1)
travel (7.days + 1.minute)

Expand All @@ -504,7 +504,7 @@ module Outbox
metrics: null_metrics
)
consumer.process
expect(redis.llen("queue:default")).to eq(3)
expect(redis.call("LLEN", "queue:default")).to eq(3)
expect(Repository::Record.count).to eq(3)
travel (7.days + 1.minute)

Expand All @@ -525,7 +525,7 @@ module Outbox
metrics: test_metrics
)
consumer.process
expect(redis.llen("queue:default")).to eq(1)
expect(redis.call("LLEN", "queue:default")).to eq(1)
expect(Repository::Record.count).to eq(1)
travel (7.days + 1.minute)

Expand All @@ -549,7 +549,7 @@ module Outbox
metrics: test_metrics
)
consumer.process
expect(redis.llen("queue:default")).to eq(1)
expect(redis.call("LLEN", "queue:default")).to eq(1)
expect(Repository::Record.count).to eq(1)
travel (7.days + 1.minute)

Expand Down
2 changes: 1 addition & 1 deletion contrib/ruby_event_store-outbox/spec/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Outbox

let(:redis_url) { RedisIsolation.redis_url }
let(:database_url) { ENV["DATABASE_URL"] }
let(:redis) { Redis.new(url: redis_url) }
let(:redis) { RedisClient.config(url: redis_url).new_client }
let(:logger_output) { StringIO.new }
let(:logger) { Logger.new(logger_output) }
let(:default_configuration) do
Expand Down
Loading

0 comments on commit 2caeb5a

Please sign in to comment.