Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimization Cleaner #39

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Optimization GraphQL::AnyCable::Cleaner. Use a separate key for stori…
…ng created_time of subscriptions and channels
prog-supdex committed Sep 20, 2023
commit 1a1981e9f37e0a35e6ee68b7b8b84fae8c15b8c6
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -131,6 +131,12 @@ To avoid filling Redis storage with stale subscription data:

Heroku users should set up `use_redis_object_on_cleanup` setting to `false` due to [limitations in Heroku Redis](https://devcenter.heroku.com/articles/heroku-redis#connection-permissions).

### Recommendations

You should run `GraphQL::AnyCable::Cleaner` or `rake graphql:anycable:clean` periodically because it helps to avoid swelling of RAM consumption,
but before using `GraphQL::AnyCable::Cleaner` or `rake graphql:anycable:clean`, you should configure `subscription_expiration_seconds`
and `use_redis_object_on_cleanup` settings

## Configuration

GraphQL-AnyCable uses [anyway_config] to configure itself. There are several possibilities to configure this gem:
53 changes: 43 additions & 10 deletions lib/graphql/anycable/cleaner.rb
Original file line number Diff line number Diff line change
@@ -5,6 +5,8 @@ module AnyCable
module Cleaner
extend self

MAX_RECORDS_AT_ONCE = 1_000

def clean
clean_channels
clean_subscriptions
@@ -16,24 +18,28 @@ def clean_channels
return unless config.subscription_expiration_seconds
return unless config.use_redis_object_on_cleanup

redis.scan_each(match: "#{redis_key(adapter::CHANNEL_PREFIX)}*") do |key|
idle = redis.object("IDLETIME", key)
next if idle&.<= config.subscription_expiration_seconds
store_name = redis_key(adapter::CHANNELS_STORAGE_TIME)

redis.del(key)
end
remove_old_objects(store_name)
end

def clean_subscriptions
return unless config.subscription_expiration_seconds
return unless config.use_redis_object_on_cleanup

redis.scan_each(match: "#{redis_key(adapter::SUBSCRIPTION_PREFIX)}*") do |key|
idle = redis.object("IDLETIME", key)
next if idle&.<= config.subscription_expiration_seconds
store_name = redis_key(adapter::SUBSCRIPTIONS_STORAGE_TIME)

redis.del(key)
end
remove_old_objects(store_name)
end

# For cases, when we need to clear only `subscription time storage`
def clean_subscription_time_storage
clean_created_time_storage(redis_key(adapter::SUBSCRIPTIONS_STORAGE_TIME))
end

# For cases, when we need to clear only `channel time storage`
def clean_channel_time_storage
clean_created_time_storage(redis_key(adapter::CHANNELS_STORAGE_TIME))
end

def clean_fingerprint_subscriptions
@@ -74,6 +80,33 @@ def config
def redis_key(prefix)
"#{config.redis_prefix}-#{prefix}"
end

def remove_old_objects(store_name)
# Determine the time point before which the keys should be deleted
time_point = (Time.now - config.subscription_expiration_seconds).to_i

# iterating per 1000 records
loop do
# fetches keys, which need to be deleted
keys = redis.zrangebyscore(store_name, "-inf", time_point, limit: [0, MAX_RECORDS_AT_ONCE])

break if keys.empty?

redis.multi do |pipeline|
pipeline.del(*keys)
pipeline.zrem(store_name, keys)
end
end
end

# For cases, when the key was dropped, but it remains in the `subscription/channel time storage`
def clean_created_time_storage(storage_name)
redis.zscan_each(storage_name, count: MAX_RECORDS_AT_ONCE) do |key|
next if redis.exists?(key)

redis.zrem(storage_name, key)
end
end
end
end
end
49 changes: 35 additions & 14 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@
require "anycable"
require "graphql/subscriptions"
require "graphql/anycable/errors"

# rubocop: disable Metrics/AbcSize, Metrics/LineLength, Metrics/MethodLength

# A subscriptions implementation that sends data as AnyCable broadcastings.
@@ -56,10 +55,12 @@ class AnyCableSubscriptions < GraphQL::Subscriptions

def_delegators :"GraphQL::AnyCable", :redis, :config

SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, …
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, …
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
SUBSCRIPTIONS_STORAGE_TIME = "subscription-storage-time" # ZSET: Stores name and created_time of subscriptions
CHANNELS_STORAGE_TIME = "channel-storage-time" # ZSET: Stores name and created_time of channels

# @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
def initialize(serializer: Serialize, **rest)
@@ -131,7 +132,6 @@ def write_subscription(query, events)
# Store subscription_id in the channel state to cleanup on disconnect
write_subscription_id(channel, channel_uniq_id)


events.each do |event|
channel.stream_from(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint)
end
@@ -145,15 +145,29 @@ def write_subscription(query, events)
}

redis.multi do |pipeline|
pipeline.sadd(redis_key(CHANNEL_PREFIX) + channel_uniq_id, [subscription_id])
pipeline.mapped_hmset(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, data)
full_subscription_id = "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}"
full_channel_id = "#{redis_key(CHANNEL_PREFIX)}#{channel_uniq_id}"

pipeline.sadd(full_channel_id, [subscription_id])
pipeline.mapped_hmset(full_subscription_id, data)

events.each do |event|
pipeline.zincrby(redis_key(FINGERPRINTS_PREFIX) + event.topic, 1, event.fingerprint)
pipeline.sadd(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint, [subscription_id])
end
next unless config.subscription_expiration_seconds
pipeline.expire(redis_key(CHANNEL_PREFIX) + channel_uniq_id, config.subscription_expiration_seconds)
pipeline.expire(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, config.subscription_expiration_seconds)

# add the records to the storages if subscription_expiration_seconds is nil
unless config.subscription_expiration_seconds
current_timestamp = Time.now.to_i

pipeline.zadd(redis_key(SUBSCRIPTIONS_STORAGE_TIME), current_timestamp, full_subscription_id)
pipeline.zadd(redis_key(CHANNELS_STORAGE_TIME), current_timestamp, full_channel_id)

next
end

pipeline.expire(full_channel_id, config.subscription_expiration_seconds)
pipeline.expire(full_subscription_id, config.subscription_expiration_seconds)
end
end

@@ -182,7 +196,10 @@ def delete_subscription(subscription_id)
fingerprint_subscriptions[redis_key(FINGERPRINTS_PREFIX) + topic] = score
end
# Delete subscription itself
pipeline.del(redis_key(SUBSCRIPTION_PREFIX) + subscription_id)
full_subscription_id = "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}"

pipeline.del(full_subscription_id)
pipeline.zrem(redis_key(SUBSCRIPTIONS_STORAGE_TIME), full_subscription_id)
end
# Clean up fingerprints that doesn't have any subscriptions left
redis.pipelined do |pipeline|
@@ -200,10 +217,14 @@ def delete_channel_subscriptions(channel_or_id)
# Missing in case disconnect happens before #execute
return unless channel_id

redis.smembers(redis_key(CHANNEL_PREFIX) + channel_id).each do |subscription_id|
full_channel_id = "#{redis_key(CHANNEL_PREFIX)}#{channel_id}"

redis.smembers(full_channel_id).each do |subscription_id|
delete_subscription(subscription_id)
end
redis.del(redis_key(CHANNEL_PREFIX) + channel_id)

redis.del(full_channel_id)
redis.zrem(redis_key(CHANNELS_STORAGE_TIME), full_channel_id)
end

private
15 changes: 11 additions & 4 deletions spec/graphql/anycable_spec.rb
Original file line number Diff line number Diff line change
@@ -135,13 +135,20 @@
end

it "removes subscription from redis" do
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be true
expect(redis.exists?("graphql-channel:some-truly-random-number")).to be true
subscription = "graphql-subscription:#{subscription_id}"
channel = "graphql-channel:#{subscription_id}"

expect(redis.exists?(subscription)).to be true
expect(redis.exists?(channel)).to be true
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be true
expect(redis.zrange("graphql-subscription-storage-time", 0, -1).member?(subscription)).to be true
expect(redis.zrange("graphql-channel-storage-time", 0, -1).member?(channel)).to be true
subject
expect(redis.exists?("graphql-channel:some-truly-random-number")).to be false
expect(redis.exists?(channel)).to be false
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be false
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be false
expect(redis.exists?(subscription)).to be false
expect(redis.zrange("graphql-subscription-storage-time", 0, -1).member?(subscription)).to be false
expect(redis.zrange("graphql-channel-storage-time", 0, -1).member?(channel)).to be false
end
end