diff --git a/ruby/lib/ci/queue.rb b/ruby/lib/ci/queue.rb index 8cf0a167..7634bc80 100644 --- a/ruby/lib/ci/queue.rb +++ b/ruby/lib/ci/queue.rb @@ -55,7 +55,12 @@ def from_uri(url, config) File when 'redis', 'rediss' require 'ci/queue/redis' - Redis + if config.multi_queue_config + require 'ci/queue/multi_queue' + MultiQueue + else + Redis + end else raise ArgumentError, "Don't know how to handle #{uri.scheme} URLs" end diff --git a/ruby/lib/ci/queue/configuration.rb b/ruby/lib/ci/queue/configuration.rb index 1941b81e..813f249b 100644 --- a/ruby/lib/ci/queue/configuration.rb +++ b/ruby/lib/ci/queue/configuration.rb @@ -6,6 +6,7 @@ class Configuration attr_accessor :requeue_tolerance, :namespace, :failing_test, :statsd_endpoint attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration attr_accessor :max_test_failed, :redis_ttl, :warnings_file, :debug_log, :max_missed_heartbeat_seconds + attr_accessor :multi_queue_config attr_reader :circuit_breakers attr_writer :seed, :build_id attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout @@ -37,7 +38,8 @@ def initialize( grind_count: nil, max_duration: nil, failure_file: nil, max_test_duration: nil, max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil, queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil, - export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil) + export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil, + multi_queue_config: nil) @build_id = build_id @circuit_breakers = [CircuitBreaker::Disabled] @failure_file = failure_file @@ -64,6 +66,7 @@ def initialize( @warnings_file = warnings_file @debug_log = debug_log @max_missed_heartbeat_seconds = max_missed_heartbeat_seconds + @multi_queue_config = multi_queue_config end def queue_init_timeout diff --git a/ruby/lib/ci/queue/multi_queue.rb b/ruby/lib/ci/queue/multi_queue.rb new file mode 100644 index 00000000..5eb14d4f --- /dev/null +++ b/ruby/lib/ci/queue/multi_queue.rb @@ -0,0 +1,220 @@ +# frozen_string_literal: true + +require "benchmark" +require "forwardable" + +module CI + module Queue + class MultiQueue + include Common + extend Forwardable + + class << self + def from_uri(uri, config) + new(uri.to_s, config) + end + end + + attr_reader :redis_url, :config, :redis, :current_queue + + def initialize(redis_url, config) + @redis_url = redis_url + @config = config + if ::Redis::VERSION > "5.0.0" + @redis = ::Redis.new( + url: redis_url, + # Booting a CI worker is costly, so in case of a Redis blip, + # it makes sense to retry for a while before giving up. + ) + else + @redis = ::Redis.new(url: redis_url) + end + @shutdown_required = false + end + + def distributed? + true + end + + def retrying? + queues.any?(&:retrying?) + end + + def expired? + queues.any?(&:expired?) + end + + def exhausted? + queues.all?(&:exhausted?) + end + + def total + queues.each { |q| q.wait_for_master(timeout: config.queue_init_timeout) } + queues.sum(&:total) + end + + def size + queues.sum(&:size) + end + + def progress + total - size + end + + def remaining + queues.sum(&:remaining) + end + + def running + queues.sum(&:running) + end + + def poll + queues.each do |q| + @current_queue = q + + begin + if q.exhausted? + puts "# All tests executed in #{q.name} queue, skipping..." + next + end + + q.load_tests! + + puts "# Processing #{q.size} tests in #{q.name} queue..." + + q.poll do |test| + yield test + end + rescue *q.class::CONNECTION_ERRORS + end + end + end + + def max_test_failed? + return false if config.max_test_failed.nil? + + queues.sum(&:test_failed) >= config.max_test_failed + end + + def build + @build ||= CI::Queue::Redis::BuildRecord.new(self, redis, config) + end + + def supervisor + @supervisor ||= Supervisor.new(multi_queue: self) + end + + def retry_queue + # TODO: implement + end + + def created_at=(time) + queues.each { |q| q.created_at = time } + end + + def shutdown! + @shutdown_required = true + end + + def shutdown_required? + @shutdown_required + end + + def_delegators :@current_queue, :requeue, :populate, :release!, :increment_test_failed + + # TODO: move heartbeat into module + def boot_heartbeat_process!; end + + def with_heartbeat(id) + yield + end + + def ensure_heartbeat_thread_alive!; end + + def stop_heartbeat!; end + + class SubQueue < SimpleDelegator + attr_reader :name + + def initialize(worker:, multi_queue:, name:, test_files:, preload_files:) + super(worker) + @name = name + @test_files = test_files + @multi_queue = multi_queue + @preload_files = preload_files + @preloaded = false + end + + def load_tests! + preload_files! + + duration = Benchmark.realtime do + @test_files.each do |test_file| + require ::File.expand_path(test_file) + end + end + + puts "# Loaded #{@test_files.size} test files in #{name} queue in #{duration.round(2)} seconds" + end + + def max_test_failed? + @multi_queue.max_test_failed? + end + + private + + def preload_files! + return if @preload_files.empty? || @preloaded + + @preload_files.each do |file| + require ::File.expand_path(file) + end + + @preloaded = true + end + end + + class Supervisor < SimpleDelegator + def initialize(multi_queue:) + super(multi_queue) + @multi_queue = multi_queue + end + + def wait_for_workers + wait_statuses = @multi_queue.queues.map do |q| + q.supervisor.wait_for_workers do + yield + end + end + wait_statuses.all? { |status| status == true } + end + + def queue_initialized? + all_queues_initialized = true + @multi_queue.queues.each do |q| + unless q.queue_initialized? + puts "Queue #{q.name} was not initialized" + all_queues_initialized = false + end + end + all_queues_initialized + end + end + + private + + def queues + @queues ||= @config.multi_queue_config["queues"].map do |name, files| + sub_queue_config = @config.dup.tap { |c| c.namespace = name } + SubQueue.new( + worker:CI::Queue::Redis::Worker.new(@redis_url, sub_queue_config, @redis), + multi_queue: self, name: name, test_files: files, preload_files: @config.multi_queue_config["preload_files"] + ) + end.shuffle(random: Random.new(Digest::MD5.hexdigest(@config.worker_id.to_s).to_i(16))) + @current_queue ||= @queues.first + @queues + end + end + end +end diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 6b4ec153..34a91030 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -27,11 +27,12 @@ def call_pipelined(commands, redis_config) end end - def initialize(redis_url, config) + def initialize(redis_url, config, redis = nil) @redis_url = redis_url @config = config - if ::Redis::VERSION > "5.0.0" - @redis = ::Redis.new( + @redis = redis + @redis ||= if ::Redis::VERSION > "5.0.0" + ::Redis.new( url: redis_url, # Booting a CI worker is costly, so in case of a Redis blip, # it makes sense to retry for a while before giving up. @@ -40,7 +41,7 @@ def initialize(redis_url, config) custom: custom_config, ) else - @redis = ::Redis.new(url: redis_url) + ::Redis.new(url: redis_url) end end @@ -126,6 +127,10 @@ def running redis.zcard(key('running')) end + def total + redis.get(key('total')).to_i + end + def to_a redis.multi do |transaction| transaction.lrange(key('queue'), 0, -1) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index ff312563..c6327bfc 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -15,10 +15,10 @@ class << self class Worker < Base attr_reader :total - def initialize(redis, config) + def initialize(redis_url, config, redis = nil) @reserved_test = nil @shutdown_required = false - super(redis, config) + super(redis_url, config, redis) end def distributed? diff --git a/ruby/lib/minitest/queue/runner.rb b/ruby/lib/minitest/queue/runner.rb index 1a1ab672..24f41aa6 100644 --- a/ruby/lib/minitest/queue/runner.rb +++ b/ruby/lib/minitest/queue/runner.rb @@ -350,8 +350,12 @@ def set_load_path end def load_tests - argv.sort.each do |f| - require File.expand_path(f) + if queue_config.multi_queue_config + queue.current_queue.load_tests! + else + argv.sort.each do |f| + require File.expand_path(f) + end end end @@ -631,6 +635,15 @@ def parser opts.on('--failing-test TEST_IDENTIFIER') do |identifier| queue_config.failing_test = identifier end + + help = <<~EOS + The file path for multi-queue configuration. It should be a valid YAML file. + The file should map queue names to a list of their test files. + EOS + opts.on('--multi-queue-config PATH', help) do |path| + require 'yaml' + queue_config.multi_queue_config = YAML.load(File.read(path)) + end end end