From 99f9ec4c2973d645b6acef4a1aab938267964ad8 Mon Sep 17 00:00:00 2001 From: prog-supdex Date: Wed, 20 Sep 2023 17:13:17 +0200 Subject: [PATCH] add spec for Cleaner. Add ability to set "subscription_expiration_seconds" as argument --- Gemfile | 4 + README.md | 14 ++ lib/graphql/anycable/cleaner.rb | 20 +- .../tasks/clean_expired_subscriptions.rake | 8 +- .../subscriptions/anycable_subscriptions.rb | 11 +- spec/graphql/cleaner_spec.rb | 184 ++++++++++++++++++ 6 files changed, 222 insertions(+), 19 deletions(-) create mode 100644 spec/graphql/cleaner_spec.rb diff --git a/Gemfile b/Gemfile index e74ef60..d057d41 100644 --- a/Gemfile +++ b/Gemfile @@ -18,3 +18,7 @@ group :development, :test do gem "rubocop" gem "rubocop-rspec" end + +group :test do + gem "timecop" +end diff --git a/README.md b/README.md index 9ddd1c1..1bd1776 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,20 @@ To avoid filling Redis storage with stale subscription data: Heroku users should set up `use_redis_object_on_cleanup` setting to `false` due to [limitations in Heroku Redis](https://devcenter.heroku.com/articles/heroku-redis#connection-permissions). +You can also call specific commands `clean_channels` or `clean_subscriptions` with passed `subscription_expiration_seconds` as an argument. +For instance + +```ruby + GraphQL::AnyCable::Cleaner.clean_channels(100) + # or + GraphQL::AnyCable::Cleaner.clean_subscriptions(100) +``` + +It will be helpful in cases when you have another value, `subscription_expiration_seconds` (or you don't have one) in `configuration`, +but it needs to remove `subscriptions` and `channels` earlier without changing `subscription_expiration_seconds` in `configuration` + +You can't put a zero value for `clean_channels` or `clean_subscriptions` methods + ### Recommendations You should run `GraphQL::AnyCable::Cleaner` or `rake graphql:anycable:clean` periodically because it helps to avoid swelling of RAM consumption, diff --git a/lib/graphql/anycable/cleaner.rb b/lib/graphql/anycable/cleaner.rb index 3fb29fe..8b1ced9 100644 --- a/lib/graphql/anycable/cleaner.rb +++ b/lib/graphql/anycable/cleaner.rb @@ -14,22 +14,26 @@ def clean clean_topic_fingerprints end - def clean_channels - return unless config.subscription_expiration_seconds + def clean_channels(expiration_seconds = nil) + expiration_seconds ||= config.subscription_expiration_seconds + + return if expiration_seconds.nil? || expiration_seconds.to_i.zero? return unless config.use_redis_object_on_cleanup store_name = redis_key(adapter::CHANNELS_STORAGE_TIME) - remove_old_objects(store_name) + remove_old_objects(store_name, expiration_seconds.to_i) end - def clean_subscriptions - return unless config.subscription_expiration_seconds + def clean_subscriptions(expiration_seconds = nil) + expiration_seconds ||= config.subscription_expiration_seconds + + return if expiration_seconds.nil? || expiration_seconds.to_i.zero? return unless config.use_redis_object_on_cleanup store_name = redis_key(adapter::SUBSCRIPTIONS_STORAGE_TIME) - remove_old_objects(store_name) + remove_old_objects(store_name, expiration_seconds.to_i) end # For cases, when we need to clear only `subscription time storage` @@ -81,9 +85,9 @@ def redis_key(prefix) "#{config.redis_prefix}-#{prefix}" end - def remove_old_objects(store_name) + def remove_old_objects(store_name, expiration_seconds) # Determine the time point before which the keys should be deleted - time_point = (Time.now - config.subscription_expiration_seconds).to_i + time_point = (Time.now - expiration_seconds).to_i # iterating per 1000 records loop do diff --git a/lib/graphql/anycable/tasks/clean_expired_subscriptions.rake b/lib/graphql/anycable/tasks/clean_expired_subscriptions.rake index c9043a6..3a7c474 100644 --- a/lib/graphql/anycable/tasks/clean_expired_subscriptions.rake +++ b/lib/graphql/anycable/tasks/clean_expired_subscriptions.rake @@ -9,13 +9,13 @@ namespace :graphql do namespace :clean do # Clean up old channels - task :channels do - GraphQL::AnyCable::Cleaner.clean_channels + task :channels, [:expire_seconds] do |_, args| + GraphQL::AnyCable::Cleaner.clean_channels(args[:expire_seconds]&.to_i) end # Clean up old subscriptions (they should have expired by themselves) - task :subscriptions do - GraphQL::AnyCable::Cleaner.clean_subscriptions + task :subscriptions, [:expire_seconds] do |_, args| + GraphQL::AnyCable::Cleaner.clean_subscriptions(args[:expire_seconds]&.to_i) end # Clean up subscription_ids from event fingerprints for expired subscriptions diff --git a/lib/graphql/subscriptions/anycable_subscriptions.rb b/lib/graphql/subscriptions/anycable_subscriptions.rb index 71907f6..1e64f69 100644 --- a/lib/graphql/subscriptions/anycable_subscriptions.rb +++ b/lib/graphql/subscriptions/anycable_subscriptions.rb @@ -156,15 +156,12 @@ def write_subscription(query, events) pipeline.sadd(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint, [subscription_id]) end - # add the records to the storages if subscription_expiration_seconds is nil - unless config.subscription_expiration_seconds - current_timestamp = Time.now.to_i + current_timestamp = Time.now.to_i - pipeline.zadd(redis_key(SUBSCRIPTIONS_STORAGE_TIME), current_timestamp, full_subscription_id) - pipeline.zadd(redis_key(CHANNELS_STORAGE_TIME), current_timestamp, full_channel_id) + pipeline.zadd(redis_key(SUBSCRIPTIONS_STORAGE_TIME), current_timestamp, full_subscription_id) + pipeline.zadd(redis_key(CHANNELS_STORAGE_TIME), current_timestamp, full_channel_id) - next - end + next unless config.subscription_expiration_seconds pipeline.expire(full_channel_id, config.subscription_expiration_seconds) pipeline.expire(full_subscription_id, config.subscription_expiration_seconds) diff --git a/spec/graphql/cleaner_spec.rb b/spec/graphql/cleaner_spec.rb new file mode 100644 index 0000000..6e82d9c --- /dev/null +++ b/spec/graphql/cleaner_spec.rb @@ -0,0 +1,184 @@ +# frozen_string_literal: true + +require "timecop" + +RSpec.describe GraphQL::AnyCable::Cleaner do + let(:query) do + <<~GRAPHQL + subscription SomeSubscription { productUpdated { id } } + GRAPHQL + end + + let(:channel) do + socket = double("Socket", istate: AnyCable::Socket::State.new({})) + connection = double("Connection", anycable_socket: socket) + double("Channel", id: "legacy_id", params: { "channelId" => "legacy_id" }, stream_from: nil, connection: connection) + end + + let(:subscription_id) do + "some-truly-random-number" + end + + let(:redis) { GraphQL::AnyCable.redis } + + before do + AnycableSchema.execute( + query: query, + context: { channel: channel, subscription_id: subscription_id }, + variables: {}, + operation_name: "SomeSubscription", + ) + end + + describe ".clean" do + + end + + describe ".clean_subscriptions" do + context "when expired_seconds passed via argument" do + context "when subscriptions are expired" do + let(:lifetime_in_seconds) { 10 } + + it "cleans subscriptions" do + expect(redis.keys("graphql-subscription:*").count).to be > 0 + + Timecop.freeze(Time.now + 10) do + described_class.clean_subscriptions(lifetime_in_seconds) + end + + expect(redis.keys("graphql-subscription:*").count).to be 0 + end + end + + context "when subscriptions are not expired" do + let(:lifetime_in_seconds) { 100 } + + it "not cleans subscriptions" do + described_class.clean_subscriptions(lifetime_in_seconds) + + expect(redis.keys("graphql-subscription:*").count).to be > 0 + end + end + end + + context "when expired_seconds passed via config" do + context "when subscriptions are expired" do + around do |ex| + old_value = GraphQL::AnyCable.config.subscription_expiration_seconds + GraphQL::AnyCable.config.subscription_expiration_seconds = 10 + + ex.run + + GraphQL::AnyCable.config.subscription_expiration_seconds = old_value + end + + it "cleans subscriptions" do + expect(redis.keys("graphql-subscription:*").count).to be > 0 + + Timecop.freeze(Time.now + 10) do + described_class.clean_subscriptions + end + + expect(redis.keys("graphql-subscription:*").count).to be 0 + end + end + + context "when config.subscription_expiration_seconds is nil" do + it "remains subscriptions" do + Timecop.freeze(Time.now + 10) do + described_class.clean_subscriptions + end + + expect(redis.keys("graphql-subscription:*").count).to be > 0 + end + end + end + + context "when an expiration_seconds is not positive integer" do + it "does not clean subscriptions" do + expect(described_class).to_not receive(:remove_old_objects) + + described_class.clean_subscriptions("") + + expect(redis.keys("graphql-subscription:*").count).to be > 0 + end + end + end + + describe ".clean_channels" do + context "when expired_seconds passed via argument" do + context "when channels are expired" do + let(:lifetime_in_seconds) { 10 } + + it "cleans subscriptions" do + expect(redis.keys("graphql-channel:*").count).to be > 0 + + Timecop.freeze(Time.now + 10) do + described_class.clean_channels(lifetime_in_seconds) + end + + expect(redis.keys("graphql-channel:*").count).to be 0 + end + end + + context "when channels are not expired" do + let(:lifetime_in_seconds) { 100 } + + it "does not clean channels" do + described_class.clean_channels(lifetime_in_seconds) + + expect(redis.keys("graphql-channel:*").count).to be > 0 + end + end + end + + context "when an expiration_seconds is not positive integer" do + it "does not clean channels" do + expect(described_class).to_not receive(:remove_old_objects) + + described_class.clean_channels("") + + expect(redis.keys("graphql-channel:*").count).to be > 0 + end + end + end + + describe ".clean_fingerprint_subscriptions" do + context "when subscription is blank" do + subject do + AnycableSchema.subscriptions.delete_subscription(subscription_id) + + described_class.clean_fingerprint_subscriptions + end + + it "cleans graphql-subscriptions" do + subscriptions_key = redis.keys("graphql-subscriptions:*")[0] + + expect(redis.smembers(subscriptions_key).empty?).to be false + + subject + + expect(redis.smembers(subscriptions_key).empty?).to be true + end + end + end + + describe ".clean_topic_fingerprints" do + subject do + # Emulate situation, when subscriptions in fingerprints are orphan + redis.scan_each(match: "graphql-subscriptions:*").each do |k| + redis.del(k) + end + + described_class.clean_topic_fingerprints + end + + it "cleans fingerprints" do + expect(redis.zrange("graphql-fingerprints::productUpdated:", 0, -1).empty?).to be false + + subject + + expect(redis.zrange("graphql-fingerprints::productUpdated:", 0, -1).empty?).to be true + end + end +end