From 1a1981e9f37e0a35e6ee68b7b8b84fae8c15b8c6 Mon Sep 17 00:00:00 2001 From: prog-supdex Date: Mon, 18 Sep 2023 17:29:18 +0200 Subject: [PATCH 1/2] Optimization GraphQL::AnyCable::Cleaner. Use a separate key for storing created_time of subscriptions and channels --- README.md | 6 +++ lib/graphql/anycable/cleaner.rb | 53 +++++++++++++++---- .../subscriptions/anycable_subscriptions.rb | 49 ++++++++++++----- spec/graphql/anycable_spec.rb | 15 ++++-- 4 files changed, 95 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 5f64a7e..9ddd1c1 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,12 @@ To avoid filling Redis storage with stale subscription data: Heroku users should set up `use_redis_object_on_cleanup` setting to `false` due to [limitations in Heroku Redis](https://devcenter.heroku.com/articles/heroku-redis#connection-permissions). +### Recommendations + +You should run `GraphQL::AnyCable::Cleaner` or `rake graphql:anycable:clean` periodically because it helps to avoid swelling of RAM consumption, +but before using `GraphQL::AnyCable::Cleaner` or `rake graphql:anycable:clean`, you should configure `subscription_expiration_seconds` +and `use_redis_object_on_cleanup` settings + ## Configuration GraphQL-AnyCable uses [anyway_config] to configure itself. There are several possibilities to configure this gem: diff --git a/lib/graphql/anycable/cleaner.rb b/lib/graphql/anycable/cleaner.rb index 77f2777..3fb29fe 100644 --- a/lib/graphql/anycable/cleaner.rb +++ b/lib/graphql/anycable/cleaner.rb @@ -5,6 +5,8 @@ module AnyCable module Cleaner extend self + MAX_RECORDS_AT_ONCE = 1_000 + def clean clean_channels clean_subscriptions @@ -16,24 +18,28 @@ def clean_channels return unless config.subscription_expiration_seconds return unless config.use_redis_object_on_cleanup - redis.scan_each(match: "#{redis_key(adapter::CHANNEL_PREFIX)}*") do |key| - idle = redis.object("IDLETIME", key) - next if idle&.<= config.subscription_expiration_seconds + store_name = redis_key(adapter::CHANNELS_STORAGE_TIME) - redis.del(key) - end + remove_old_objects(store_name) end def clean_subscriptions return unless config.subscription_expiration_seconds return unless config.use_redis_object_on_cleanup - redis.scan_each(match: "#{redis_key(adapter::SUBSCRIPTION_PREFIX)}*") do |key| - idle = redis.object("IDLETIME", key) - next if idle&.<= config.subscription_expiration_seconds + store_name = redis_key(adapter::SUBSCRIPTIONS_STORAGE_TIME) - redis.del(key) - end + remove_old_objects(store_name) + end + + # For cases, when we need to clear only `subscription time storage` + def clean_subscription_time_storage + clean_created_time_storage(redis_key(adapter::SUBSCRIPTIONS_STORAGE_TIME)) + end + + # For cases, when we need to clear only `channel time storage` + def clean_channel_time_storage + clean_created_time_storage(redis_key(adapter::CHANNELS_STORAGE_TIME)) end def clean_fingerprint_subscriptions @@ -74,6 +80,33 @@ def config def redis_key(prefix) "#{config.redis_prefix}-#{prefix}" end + + def remove_old_objects(store_name) + # Determine the time point before which the keys should be deleted + time_point = (Time.now - config.subscription_expiration_seconds).to_i + + # iterating per 1000 records + loop do + # fetches keys, which need to be deleted + keys = redis.zrangebyscore(store_name, "-inf", time_point, limit: [0, MAX_RECORDS_AT_ONCE]) + + break if keys.empty? + + redis.multi do |pipeline| + pipeline.del(*keys) + pipeline.zrem(store_name, keys) + end + end + end + + # For cases, when the key was dropped, but it remains in the `subscription/channel time storage` + def clean_created_time_storage(storage_name) + redis.zscan_each(storage_name, count: MAX_RECORDS_AT_ONCE) do |key| + next if redis.exists?(key) + + redis.zrem(storage_name, key) + end + end end end end diff --git a/lib/graphql/subscriptions/anycable_subscriptions.rb b/lib/graphql/subscriptions/anycable_subscriptions.rb index 6dec5d8..71907f6 100644 --- a/lib/graphql/subscriptions/anycable_subscriptions.rb +++ b/lib/graphql/subscriptions/anycable_subscriptions.rb @@ -3,7 +3,6 @@ require "anycable" require "graphql/subscriptions" require "graphql/anycable/errors" - # rubocop: disable Metrics/AbcSize, Metrics/LineLength, Metrics/MethodLength # A subscriptions implementation that sends data as AnyCable broadcastings. @@ -56,10 +55,12 @@ class AnyCableSubscriptions < GraphQL::Subscriptions def_delegators :"GraphQL::AnyCable", :redis, :config - SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, … - FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic - SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint - CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup + SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, … + FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic + SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint + CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup + SUBSCRIPTIONS_STORAGE_TIME = "subscription-storage-time" # ZSET: Stores name and created_time of subscriptions + CHANNELS_STORAGE_TIME = "channel-storage-time" # ZSET: Stores name and created_time of channels # @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)` def initialize(serializer: Serialize, **rest) @@ -131,7 +132,6 @@ def write_subscription(query, events) # Store subscription_id in the channel state to cleanup on disconnect write_subscription_id(channel, channel_uniq_id) - events.each do |event| channel.stream_from(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint) end @@ -145,15 +145,29 @@ def write_subscription(query, events) } redis.multi do |pipeline| - pipeline.sadd(redis_key(CHANNEL_PREFIX) + channel_uniq_id, [subscription_id]) - pipeline.mapped_hmset(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, data) + full_subscription_id = "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}" + full_channel_id = "#{redis_key(CHANNEL_PREFIX)}#{channel_uniq_id}" + + pipeline.sadd(full_channel_id, [subscription_id]) + pipeline.mapped_hmset(full_subscription_id, data) + events.each do |event| pipeline.zincrby(redis_key(FINGERPRINTS_PREFIX) + event.topic, 1, event.fingerprint) pipeline.sadd(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint, [subscription_id]) end - next unless config.subscription_expiration_seconds - pipeline.expire(redis_key(CHANNEL_PREFIX) + channel_uniq_id, config.subscription_expiration_seconds) - pipeline.expire(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, config.subscription_expiration_seconds) + + # add the records to the storages if subscription_expiration_seconds is nil + unless config.subscription_expiration_seconds + current_timestamp = Time.now.to_i + + pipeline.zadd(redis_key(SUBSCRIPTIONS_STORAGE_TIME), current_timestamp, full_subscription_id) + pipeline.zadd(redis_key(CHANNELS_STORAGE_TIME), current_timestamp, full_channel_id) + + next + end + + pipeline.expire(full_channel_id, config.subscription_expiration_seconds) + pipeline.expire(full_subscription_id, config.subscription_expiration_seconds) end end @@ -182,7 +196,10 @@ def delete_subscription(subscription_id) fingerprint_subscriptions[redis_key(FINGERPRINTS_PREFIX) + topic] = score end # Delete subscription itself - pipeline.del(redis_key(SUBSCRIPTION_PREFIX) + subscription_id) + full_subscription_id = "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}" + + pipeline.del(full_subscription_id) + pipeline.zrem(redis_key(SUBSCRIPTIONS_STORAGE_TIME), full_subscription_id) end # Clean up fingerprints that doesn't have any subscriptions left redis.pipelined do |pipeline| @@ -200,10 +217,14 @@ def delete_channel_subscriptions(channel_or_id) # Missing in case disconnect happens before #execute return unless channel_id - redis.smembers(redis_key(CHANNEL_PREFIX) + channel_id).each do |subscription_id| + full_channel_id = "#{redis_key(CHANNEL_PREFIX)}#{channel_id}" + + redis.smembers(full_channel_id).each do |subscription_id| delete_subscription(subscription_id) end - redis.del(redis_key(CHANNEL_PREFIX) + channel_id) + + redis.del(full_channel_id) + redis.zrem(redis_key(CHANNELS_STORAGE_TIME), full_channel_id) end private diff --git a/spec/graphql/anycable_spec.rb b/spec/graphql/anycable_spec.rb index 7149d1b..b8a31a3 100644 --- a/spec/graphql/anycable_spec.rb +++ b/spec/graphql/anycable_spec.rb @@ -135,13 +135,20 @@ end it "removes subscription from redis" do - expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be true - expect(redis.exists?("graphql-channel:some-truly-random-number")).to be true + subscription = "graphql-subscription:#{subscription_id}" + channel = "graphql-channel:#{subscription_id}" + + expect(redis.exists?(subscription)).to be true + expect(redis.exists?(channel)).to be true expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be true + expect(redis.zrange("graphql-subscription-storage-time", 0, -1).member?(subscription)).to be true + expect(redis.zrange("graphql-channel-storage-time", 0, -1).member?(channel)).to be true subject - expect(redis.exists?("graphql-channel:some-truly-random-number")).to be false + expect(redis.exists?(channel)).to be false expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be false - expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be false + expect(redis.exists?(subscription)).to be false + expect(redis.zrange("graphql-subscription-storage-time", 0, -1).member?(subscription)).to be false + expect(redis.zrange("graphql-channel-storage-time", 0, -1).member?(channel)).to be false end end From aab684c812115dff4510aaf7fab8d05fa8a6dbdb Mon Sep 17 00:00:00 2001 From: prog-supdex Date: Wed, 20 Sep 2023 17:13:17 +0200 Subject: [PATCH 2/2] add spec for Cleaner. Add ability to set "subscription_expiration_seconds" as argument --- Gemfile | 4 + README.md | 14 ++ lib/graphql/anycable/cleaner.rb | 20 +- .../tasks/clean_expired_subscriptions.rake | 8 +- .../subscriptions/anycable_subscriptions.rb | 11 +- spec/graphql/cleaner_spec.rb | 180 ++++++++++++++++++ 6 files changed, 218 insertions(+), 19 deletions(-) create mode 100644 spec/graphql/cleaner_spec.rb diff --git a/Gemfile b/Gemfile index e74ef60..d057d41 100644 --- a/Gemfile +++ b/Gemfile @@ -18,3 +18,7 @@ group :development, :test do gem "rubocop" gem "rubocop-rspec" end + +group :test do + gem "timecop" +end diff --git a/README.md b/README.md index 9ddd1c1..1bd1776 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,20 @@ To avoid filling Redis storage with stale subscription data: Heroku users should set up `use_redis_object_on_cleanup` setting to `false` due to [limitations in Heroku Redis](https://devcenter.heroku.com/articles/heroku-redis#connection-permissions). +You can also call specific commands `clean_channels` or `clean_subscriptions` with passed `subscription_expiration_seconds` as an argument. +For instance + +```ruby + GraphQL::AnyCable::Cleaner.clean_channels(100) + # or + GraphQL::AnyCable::Cleaner.clean_subscriptions(100) +``` + +It will be helpful in cases when you have another value, `subscription_expiration_seconds` (or you don't have one) in `configuration`, +but it needs to remove `subscriptions` and `channels` earlier without changing `subscription_expiration_seconds` in `configuration` + +You can't put a zero value for `clean_channels` or `clean_subscriptions` methods + ### Recommendations You should run `GraphQL::AnyCable::Cleaner` or `rake graphql:anycable:clean` periodically because it helps to avoid swelling of RAM consumption, diff --git a/lib/graphql/anycable/cleaner.rb b/lib/graphql/anycable/cleaner.rb index 3fb29fe..8b1ced9 100644 --- a/lib/graphql/anycable/cleaner.rb +++ b/lib/graphql/anycable/cleaner.rb @@ -14,22 +14,26 @@ def clean clean_topic_fingerprints end - def clean_channels - return unless config.subscription_expiration_seconds + def clean_channels(expiration_seconds = nil) + expiration_seconds ||= config.subscription_expiration_seconds + + return if expiration_seconds.nil? || expiration_seconds.to_i.zero? return unless config.use_redis_object_on_cleanup store_name = redis_key(adapter::CHANNELS_STORAGE_TIME) - remove_old_objects(store_name) + remove_old_objects(store_name, expiration_seconds.to_i) end - def clean_subscriptions - return unless config.subscription_expiration_seconds + def clean_subscriptions(expiration_seconds = nil) + expiration_seconds ||= config.subscription_expiration_seconds + + return if expiration_seconds.nil? || expiration_seconds.to_i.zero? return unless config.use_redis_object_on_cleanup store_name = redis_key(adapter::SUBSCRIPTIONS_STORAGE_TIME) - remove_old_objects(store_name) + remove_old_objects(store_name, expiration_seconds.to_i) end # For cases, when we need to clear only `subscription time storage` @@ -81,9 +85,9 @@ def redis_key(prefix) "#{config.redis_prefix}-#{prefix}" end - def remove_old_objects(store_name) + def remove_old_objects(store_name, expiration_seconds) # Determine the time point before which the keys should be deleted - time_point = (Time.now - config.subscription_expiration_seconds).to_i + time_point = (Time.now - expiration_seconds).to_i # iterating per 1000 records loop do diff --git a/lib/graphql/anycable/tasks/clean_expired_subscriptions.rake b/lib/graphql/anycable/tasks/clean_expired_subscriptions.rake index c9043a6..3a7c474 100644 --- a/lib/graphql/anycable/tasks/clean_expired_subscriptions.rake +++ b/lib/graphql/anycable/tasks/clean_expired_subscriptions.rake @@ -9,13 +9,13 @@ namespace :graphql do namespace :clean do # Clean up old channels - task :channels do - GraphQL::AnyCable::Cleaner.clean_channels + task :channels, [:expire_seconds] do |_, args| + GraphQL::AnyCable::Cleaner.clean_channels(args[:expire_seconds]&.to_i) end # Clean up old subscriptions (they should have expired by themselves) - task :subscriptions do - GraphQL::AnyCable::Cleaner.clean_subscriptions + task :subscriptions, [:expire_seconds] do |_, args| + GraphQL::AnyCable::Cleaner.clean_subscriptions(args[:expire_seconds]&.to_i) end # Clean up subscription_ids from event fingerprints for expired subscriptions diff --git a/lib/graphql/subscriptions/anycable_subscriptions.rb b/lib/graphql/subscriptions/anycable_subscriptions.rb index 71907f6..1e64f69 100644 --- a/lib/graphql/subscriptions/anycable_subscriptions.rb +++ b/lib/graphql/subscriptions/anycable_subscriptions.rb @@ -156,15 +156,12 @@ def write_subscription(query, events) pipeline.sadd(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint, [subscription_id]) end - # add the records to the storages if subscription_expiration_seconds is nil - unless config.subscription_expiration_seconds - current_timestamp = Time.now.to_i + current_timestamp = Time.now.to_i - pipeline.zadd(redis_key(SUBSCRIPTIONS_STORAGE_TIME), current_timestamp, full_subscription_id) - pipeline.zadd(redis_key(CHANNELS_STORAGE_TIME), current_timestamp, full_channel_id) + pipeline.zadd(redis_key(SUBSCRIPTIONS_STORAGE_TIME), current_timestamp, full_subscription_id) + pipeline.zadd(redis_key(CHANNELS_STORAGE_TIME), current_timestamp, full_channel_id) - next - end + next unless config.subscription_expiration_seconds pipeline.expire(full_channel_id, config.subscription_expiration_seconds) pipeline.expire(full_subscription_id, config.subscription_expiration_seconds) diff --git a/spec/graphql/cleaner_spec.rb b/spec/graphql/cleaner_spec.rb new file mode 100644 index 0000000..62a3e77 --- /dev/null +++ b/spec/graphql/cleaner_spec.rb @@ -0,0 +1,180 @@ +# frozen_string_literal: true + +require "timecop" + +RSpec.describe GraphQL::AnyCable::Cleaner do + let(:query) do + <<~GRAPHQL + subscription SomeSubscription { productUpdated { id } } + GRAPHQL + end + + let(:channel) do + socket = double("Socket", istate: AnyCable::Socket::State.new({})) + connection = double("Connection", anycable_socket: socket) + double("Channel", id: "legacy_id", params: { "channelId" => "legacy_id" }, stream_from: nil, connection: connection) + end + + let(:subscription_id) do + "some-truly-random-number" + end + + let(:redis) { GraphQL::AnyCable.redis } + + before do + AnycableSchema.execute( + query: query, + context: { channel: channel, subscription_id: subscription_id }, + variables: {}, + operation_name: "SomeSubscription", + ) + end + + describe ".clean_subscriptions" do + context "when expired_seconds passed via argument" do + context "when subscriptions are expired" do + let(:lifetime_in_seconds) { 10 } + + it "cleans subscriptions" do + expect(redis.keys("graphql-subscription:*").count).to be > 0 + + Timecop.freeze(Time.now + 10) do + described_class.clean_subscriptions(lifetime_in_seconds) + end + + expect(redis.keys("graphql-subscription:*").count).to be 0 + end + end + + context "when subscriptions are not expired" do + let(:lifetime_in_seconds) { 100 } + + it "not cleans subscriptions" do + described_class.clean_subscriptions(lifetime_in_seconds) + + expect(redis.keys("graphql-subscription:*").count).to be > 0 + end + end + end + + context "when expired_seconds passed via config" do + context "when subscriptions are expired" do + around do |ex| + old_value = GraphQL::AnyCable.config.subscription_expiration_seconds + GraphQL::AnyCable.config.subscription_expiration_seconds = 10 + + ex.run + + GraphQL::AnyCable.config.subscription_expiration_seconds = old_value + end + + it "cleans subscriptions" do + expect(redis.keys("graphql-subscription:*").count).to be > 0 + + Timecop.freeze(Time.now + 10) do + described_class.clean_subscriptions + end + + expect(redis.keys("graphql-subscription:*").count).to be 0 + end + end + + context "when config.subscription_expiration_seconds is nil" do + it "remains subscriptions" do + Timecop.freeze(Time.now + 10) do + described_class.clean_subscriptions + end + + expect(redis.keys("graphql-subscription:*").count).to be > 0 + end + end + end + + context "when an expiration_seconds is not positive integer" do + it "does not clean subscriptions" do + expect(described_class).to_not receive(:remove_old_objects) + + described_class.clean_subscriptions("") + + expect(redis.keys("graphql-subscription:*").count).to be > 0 + end + end + end + + describe ".clean_channels" do + context "when expired_seconds passed via argument" do + context "when channels are expired" do + let(:lifetime_in_seconds) { 10 } + + it "cleans subscriptions" do + expect(redis.keys("graphql-channel:*").count).to be > 0 + + Timecop.freeze(Time.now + 10) do + described_class.clean_channels(lifetime_in_seconds) + end + + expect(redis.keys("graphql-channel:*").count).to be 0 + end + end + + context "when channels are not expired" do + let(:lifetime_in_seconds) { 100 } + + it "does not clean channels" do + described_class.clean_channels(lifetime_in_seconds) + + expect(redis.keys("graphql-channel:*").count).to be > 0 + end + end + end + + context "when an expiration_seconds is not positive integer" do + it "does not clean channels" do + expect(described_class).to_not receive(:remove_old_objects) + + described_class.clean_channels("") + + expect(redis.keys("graphql-channel:*").count).to be > 0 + end + end + end + + describe ".clean_fingerprint_subscriptions" do + context "when subscription is blank" do + subject do + AnycableSchema.subscriptions.delete_subscription(subscription_id) + + described_class.clean_fingerprint_subscriptions + end + + it "cleans graphql-subscriptions" do + subscriptions_key = redis.keys("graphql-subscriptions:*")[0] + + expect(redis.smembers(subscriptions_key).empty?).to be false + + subject + + expect(redis.smembers(subscriptions_key).empty?).to be true + end + end + end + + describe ".clean_topic_fingerprints" do + subject do + # Emulate situation, when subscriptions in fingerprints are orphan + redis.scan_each(match: "graphql-subscriptions:*").each do |k| + redis.del(k) + end + + described_class.clean_topic_fingerprints + end + + it "cleans fingerprints" do + expect(redis.zrange("graphql-fingerprints::productUpdated:", 0, -1).empty?).to be false + + subject + + expect(redis.zrange("graphql-fingerprints::productUpdated:", 0, -1).empty?).to be true + end + end +end