Skip to content

Commit

Permalink
Implement multi queue to reduce test files load time
Browse files Browse the repository at this point in the history
  • Loading branch information
zarifmahfuz committed Feb 15, 2024
1 parent f9ec197 commit 9452e6b
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 10 deletions.
7 changes: 6 additions & 1 deletion ruby/lib/ci/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ def from_uri(url, config)
File
when 'redis', 'rediss'
require 'ci/queue/redis'
Redis
if config.multi_queue_config
require 'ci/queue/multi_queue'
MultiQueue
else
Redis
end
else
raise ArgumentError, "Don't know how to handle #{uri.scheme} URLs"
end
Expand Down
5 changes: 4 additions & 1 deletion ruby/lib/ci/queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class Configuration
attr_accessor :requeue_tolerance, :namespace, :failing_test, :statsd_endpoint
attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration
attr_accessor :max_test_failed, :redis_ttl, :warnings_file, :debug_log, :max_missed_heartbeat_seconds
attr_accessor :multi_queue_config
attr_reader :circuit_breakers
attr_writer :seed, :build_id
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
Expand Down Expand Up @@ -37,7 +38,8 @@ def initialize(
grind_count: nil, max_duration: nil, failure_file: nil, max_test_duration: nil,
max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil,
queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil,
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil)
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil,
multi_queue_config: nil)
@build_id = build_id
@circuit_breakers = [CircuitBreaker::Disabled]
@failure_file = failure_file
Expand All @@ -64,6 +66,7 @@ def initialize(
@warnings_file = warnings_file
@debug_log = debug_log
@max_missed_heartbeat_seconds = max_missed_heartbeat_seconds
@multi_queue_config = multi_queue_config
end

def queue_init_timeout
Expand Down
216 changes: 216 additions & 0 deletions ruby/lib/ci/queue/multi_queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# frozen_string_literal: true

require "benchmark"
require "forwardable"

module CI
module Queue
class MultiQueue
include Common
extend Forwardable

class << self
def from_uri(uri, config)
new(uri.to_s, config)
end
end

attr_reader :redis_url, :config, :redis, :current_queue

def initialize(redis_url, config)
@redis_url = redis_url
@config = config
if ::Redis::VERSION > "5.0.0"
@redis = ::Redis.new(
url: redis_url,
# Booting a CI worker is costly, so in case of a Redis blip,
# it makes sense to retry for a while before giving up.
)
else
@redis = ::Redis.new(url: redis_url)
end
@shutdown_required = false
end

def distributed?
true
end

def retrying?
queues.any?(&:retrying?)
end

def expired?
queues.any?(&:expired?)
end

def exhausted?
queues.all?(&:exhausted?)
end

def total
queues.each { |q| q.wait_for_master(timeout: config.queue_init_timeout) }
queues.sum(&:total)
end

def size
queues.sum(&:size)
end

def progress
total - size
end

def remaining
queues.sum(&:remaining)
end

def running
queues.sum(&:running)
end

def poll
queues.each do |q|
@current_queue = q

begin
if q.exhausted?
puts "# All tests executed in #{q.name} queue, skipping..."
next
end

q.load_tests!

puts "# Processing #{q.size} tests in #{q.name} queue..."

q.poll do |test|
yield test
end
rescue *CI::Queue::Redis::Base::CONNECTION_ERRORS
end
end
end

def max_test_failed?
return false if config.max_test_failed.nil?

queues.sum(&:test_failed) >= config.max_test_failed
end

def build
@build ||= CI::Queue::Redis::BuildRecord.new(self, redis, config)
end

def supervisor
@supervisor ||= Supervisor.new(multi_queue: self)
end

def retry_queue
# TODO: implement
end

def created_at=(time)
queues.each { |q| q.created_at = time }
end

def shutdown!
@shutdown_required = true
end

def shutdown_required?
@shutdown_required
end

def_delegators :@current_queue, :acknowledge, :requeue, :populate, :release!, :increment_test_failed

# TODO: move heartbeat into module
def boot_heartbeat_process!; end

def with_heartbeat(id)
yield
end

def ensure_heartbeat_thread_alive!; end

def stop_heartbeat!; end

class SubQueue < SimpleDelegator
attr_reader :name

def initialize(worker:, multi_queue:, name:, test_files:, preload_files:)
super(worker)
@name = name
@test_files = test_files
@multi_queue = multi_queue
@preload_files = preload_files
@preloaded = false
end

def load_tests!
duration = Benchmark.realtime do
@test_files.each do |test_file|
require ::File.expand_path(test_file)
end
end

puts "# Loaded #{@test_files.size} test files in #{name} queue in #{duration.round(2)} seconds"
end

def max_test_failed?
@multi_queue.max_test_failed?
end

def preload_files!
return if @preload_files.empty? || @preloaded

@preload_files.each do |file|
require ::File.expand_path(file)
end

@preloaded = true
end
end

class Supervisor < SimpleDelegator
def initialize(multi_queue:)
super(multi_queue)
@multi_queue = multi_queue
end

def wait_for_workers
wait_statuses = @multi_queue.queues.map do |q|
q.supervisor.wait_for_workers do
yield
end
end
wait_statuses.all? { |status| status == true }
end

def queue_initialized?
all_queues_initialized = true
@multi_queue.queues.each do |q|
unless q.queue_initialized?
puts "Queue #{q.name} was not initialized"
all_queues_initialized = false
end
end
all_queues_initialized
end
end

private

def queues
@queues ||= @config.multi_queue_config["queues"].map do |name, files|
sub_queue_config = @config.dup.tap { |c| c.namespace = name }
SubQueue.new(
worker:CI::Queue::Redis::Worker.new(@redis_url, sub_queue_config, @redis),
multi_queue: self, name: name, test_files: files, preload_files: @config.multi_queue_config["preload_files"]
)
end.shuffle(random: Random.new(Digest::MD5.hexdigest(@config.worker_id.to_s).to_i(16)))
@current_queue ||= @queues.first
@queues
end
end
end
end
13 changes: 9 additions & 4 deletions ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ def call_pipelined(commands, redis_config)
end
end

def initialize(redis_url, config)
def initialize(redis_url, config, redis = nil)
@redis_url = redis_url
@config = config
if ::Redis::VERSION > "5.0.0"
@redis = ::Redis.new(
@redis = redis
@redis ||= if ::Redis::VERSION > "5.0.0"
::Redis.new(
url: redis_url,
# Booting a CI worker is costly, so in case of a Redis blip,
# it makes sense to retry for a while before giving up.
Expand All @@ -40,7 +41,7 @@ def initialize(redis_url, config)
custom: custom_config,
)
else
@redis = ::Redis.new(url: redis_url)
::Redis.new(url: redis_url)
end
end

Expand Down Expand Up @@ -126,6 +127,10 @@ def running
redis.zcard(key('running'))
end

def total
redis.get(key('total')).to_i
end

def to_a
redis.multi do |transaction|
transaction.lrange(key('queue'), 0, -1)
Expand Down
4 changes: 2 additions & 2 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ class << self
class Worker < Base
attr_reader :total

def initialize(redis, config)
def initialize(redis_url, config, redis = nil)
@reserved_test = nil
@shutdown_required = false
super(redis, config)
super(redis_url, config, redis)
end

def distributed?
Expand Down
18 changes: 16 additions & 2 deletions ruby/lib/minitest/queue/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,13 @@ def set_load_path
end

def load_tests
argv.sort.each do |f|
require File.expand_path(f)
if queue_config.multi_queue_config
queue.current_queue.preload_files!
queue.current_queue.load_tests!
else
argv.sort.each do |f|
require File.expand_path(f)
end
end
end

Expand Down Expand Up @@ -631,6 +636,15 @@ def parser
opts.on('--failing-test TEST_IDENTIFIER') do |identifier|
queue_config.failing_test = identifier
end

help = <<~EOS
The file path for multi-queue configuration. It should be a valid YAML file.
The file should map queue names to a list of their test files.
EOS
opts.on('--multi-queue-config PATH', help) do |path|
require 'yaml'
queue_config.multi_queue_config = YAML.load(File.read(path))
end
end
end

Expand Down

0 comments on commit 9452e6b

Please sign in to comment.