Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimization Cleaner #39

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ group :development, :test do
gem "rubocop"
gem "rubocop-rspec"
end

group :test do
gem "timecop"
end
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,26 @@ To avoid filling Redis storage with stale subscription data:

Heroku users should set up `use_redis_object_on_cleanup` setting to `false` due to [limitations in Heroku Redis](https://devcenter.heroku.com/articles/heroku-redis#connection-permissions).

You can also call specific commands `clean_channels` or `clean_subscriptions` with passed `subscription_expiration_seconds` as an argument.
For instance

```ruby
GraphQL::AnyCable::Cleaner.clean_channels(100)
# or
GraphQL::AnyCable::Cleaner.clean_subscriptions(100)
```

It will be helpful in cases when you have another value, `subscription_expiration_seconds` (or you don't have one) in `configuration`,
but it needs to remove `subscriptions` and `channels` earlier without changing `subscription_expiration_seconds` in `configuration`

You can't put a zero value for `clean_channels` or `clean_subscriptions` methods

### Recommendations

You should run `GraphQL::AnyCable::Cleaner` or `rake graphql:anycable:clean` periodically because it helps to avoid swelling of RAM consumption,
but before using `GraphQL::AnyCable::Cleaner` or `rake graphql:anycable:clean`, you should configure `subscription_expiration_seconds`
and `use_redis_object_on_cleanup` settings

## Configuration

GraphQL-AnyCable uses [anyway_config] to configure itself. There are several possibilities to configure this gem:
Expand Down
65 changes: 51 additions & 14 deletions lib/graphql/anycable/cleaner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,45 @@ module AnyCable
module Cleaner
extend self

MAX_RECORDS_AT_ONCE = 1_000

def clean
clean_channels
clean_subscriptions
clean_fingerprint_subscriptions
clean_topic_fingerprints
end

def clean_channels
return unless config.subscription_expiration_seconds
def clean_channels(expiration_seconds = nil)
expiration_seconds ||= config.subscription_expiration_seconds

return if expiration_seconds.nil? || expiration_seconds.to_i.zero?
return unless config.use_redis_object_on_cleanup

redis.scan_each(match: "#{redis_key(adapter::CHANNEL_PREFIX)}*") do |key|
idle = redis.object("IDLETIME", key)
next if idle&.<= config.subscription_expiration_seconds
store_name = redis_key(adapter::CHANNELS_STORAGE_TIME)

redis.del(key)
end
remove_old_objects(store_name, expiration_seconds.to_i)
end

def clean_subscriptions
return unless config.subscription_expiration_seconds
def clean_subscriptions(expiration_seconds = nil)
expiration_seconds ||= config.subscription_expiration_seconds

return if expiration_seconds.nil? || expiration_seconds.to_i.zero?
return unless config.use_redis_object_on_cleanup

redis.scan_each(match: "#{redis_key(adapter::SUBSCRIPTION_PREFIX)}*") do |key|
idle = redis.object("IDLETIME", key)
next if idle&.<= config.subscription_expiration_seconds
store_name = redis_key(adapter::SUBSCRIPTIONS_STORAGE_TIME)

redis.del(key)
end
remove_old_objects(store_name, expiration_seconds.to_i)
end

# For cases, when we need to clear only `subscription time storage`
def clean_subscription_time_storage
clean_created_time_storage(redis_key(adapter::SUBSCRIPTIONS_STORAGE_TIME))
end

# For cases, when we need to clear only `channel time storage`
def clean_channel_time_storage
clean_created_time_storage(redis_key(adapter::CHANNELS_STORAGE_TIME))
end

def clean_fingerprint_subscriptions
Expand Down Expand Up @@ -74,6 +84,33 @@ def config
def redis_key(prefix)
"#{config.redis_prefix}-#{prefix}"
end

def remove_old_objects(store_name, expiration_seconds)
# Determine the time point before which the keys should be deleted
time_point = (Time.now - expiration_seconds).to_i

# iterating per 1000 records
loop do
# fetches keys, which need to be deleted
keys = redis.zrangebyscore(store_name, "-inf", time_point, limit: [0, MAX_RECORDS_AT_ONCE])

break if keys.empty?

redis.multi do |pipeline|
pipeline.del(*keys)
pipeline.zrem(store_name, keys)
end
end
end

# For cases, when the key was dropped, but it remains in the `subscription/channel time storage`
def clean_created_time_storage(storage_name)
redis.zscan_each(storage_name, count: MAX_RECORDS_AT_ONCE) do |key|
next if redis.exists?(key)

redis.zrem(storage_name, key)
end
end
end
end
end
8 changes: 4 additions & 4 deletions lib/graphql/anycable/tasks/clean_expired_subscriptions.rake
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ namespace :graphql do

namespace :clean do
# Clean up old channels
task :channels do
GraphQL::AnyCable::Cleaner.clean_channels
task :channels, [:expire_seconds] do |_, args|
GraphQL::AnyCable::Cleaner.clean_channels(args[:expire_seconds]&.to_i)
end

# Clean up old subscriptions (they should have expired by themselves)
task :subscriptions do
GraphQL::AnyCable::Cleaner.clean_subscriptions
task :subscriptions, [:expire_seconds] do |_, args|
GraphQL::AnyCable::Cleaner.clean_subscriptions(args[:expire_seconds]&.to_i)
end

# Clean up subscription_ids from event fingerprints for expired subscriptions
Expand Down
44 changes: 31 additions & 13 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
require "anycable"
require "graphql/subscriptions"
require "graphql/anycable/errors"

# rubocop: disable Metrics/AbcSize, Metrics/LineLength, Metrics/MethodLength

# A subscriptions implementation that sends data as AnyCable broadcastings.
Expand Down Expand Up @@ -56,10 +55,12 @@ class AnyCableSubscriptions < GraphQL::Subscriptions

def_delegators :"GraphQL::AnyCable", :redis, :config

SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, …
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, …
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
SUBSCRIPTIONS_STORAGE_TIME = "subscription-storage-time" # ZSET: Stores name and created_time of subscriptions
CHANNELS_STORAGE_TIME = "channel-storage-time" # ZSET: Stores name and created_time of channels

# @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
def initialize(serializer: Serialize, **rest)
Expand Down Expand Up @@ -131,7 +132,6 @@ def write_subscription(query, events)
# Store subscription_id in the channel state to cleanup on disconnect
write_subscription_id(channel, channel_uniq_id)


events.each do |event|
channel.stream_from(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint)
end
Expand All @@ -145,15 +145,26 @@ def write_subscription(query, events)
}

redis.multi do |pipeline|
pipeline.sadd(redis_key(CHANNEL_PREFIX) + channel_uniq_id, [subscription_id])
pipeline.mapped_hmset(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, data)
full_subscription_id = "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}"
full_channel_id = "#{redis_key(CHANNEL_PREFIX)}#{channel_uniq_id}"

pipeline.sadd(full_channel_id, [subscription_id])
pipeline.mapped_hmset(full_subscription_id, data)

events.each do |event|
pipeline.zincrby(redis_key(FINGERPRINTS_PREFIX) + event.topic, 1, event.fingerprint)
pipeline.sadd(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint, [subscription_id])
end

current_timestamp = Time.now.to_i

pipeline.zadd(redis_key(SUBSCRIPTIONS_STORAGE_TIME), current_timestamp, full_subscription_id)
pipeline.zadd(redis_key(CHANNELS_STORAGE_TIME), current_timestamp, full_channel_id)

next unless config.subscription_expiration_seconds
pipeline.expire(redis_key(CHANNEL_PREFIX) + channel_uniq_id, config.subscription_expiration_seconds)
pipeline.expire(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, config.subscription_expiration_seconds)

pipeline.expire(full_channel_id, config.subscription_expiration_seconds)
pipeline.expire(full_subscription_id, config.subscription_expiration_seconds)
end
end

Expand Down Expand Up @@ -182,7 +193,10 @@ def delete_subscription(subscription_id)
fingerprint_subscriptions[redis_key(FINGERPRINTS_PREFIX) + topic] = score
end
# Delete subscription itself
pipeline.del(redis_key(SUBSCRIPTION_PREFIX) + subscription_id)
full_subscription_id = "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}"

pipeline.del(full_subscription_id)
pipeline.zrem(redis_key(SUBSCRIPTIONS_STORAGE_TIME), full_subscription_id)
end
# Clean up fingerprints that doesn't have any subscriptions left
redis.pipelined do |pipeline|
Expand All @@ -200,10 +214,14 @@ def delete_channel_subscriptions(channel_or_id)
# Missing in case disconnect happens before #execute
return unless channel_id

redis.smembers(redis_key(CHANNEL_PREFIX) + channel_id).each do |subscription_id|
full_channel_id = "#{redis_key(CHANNEL_PREFIX)}#{channel_id}"

redis.smembers(full_channel_id).each do |subscription_id|
delete_subscription(subscription_id)
end
redis.del(redis_key(CHANNEL_PREFIX) + channel_id)

redis.del(full_channel_id)
redis.zrem(redis_key(CHANNELS_STORAGE_TIME), full_channel_id)
end

private
Expand Down
15 changes: 11 additions & 4 deletions spec/graphql/anycable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,20 @@
end

it "removes subscription from redis" do
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be true
expect(redis.exists?("graphql-channel:some-truly-random-number")).to be true
subscription = "graphql-subscription:#{subscription_id}"
channel = "graphql-channel:#{subscription_id}"

expect(redis.exists?(subscription)).to be true
expect(redis.exists?(channel)).to be true
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be true
expect(redis.zrange("graphql-subscription-storage-time", 0, -1).member?(subscription)).to be true
expect(redis.zrange("graphql-channel-storage-time", 0, -1).member?(channel)).to be true
subject
expect(redis.exists?("graphql-channel:some-truly-random-number")).to be false
expect(redis.exists?(channel)).to be false
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be false
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be false
expect(redis.exists?(subscription)).to be false
expect(redis.zrange("graphql-subscription-storage-time", 0, -1).member?(subscription)).to be false
expect(redis.zrange("graphql-channel-storage-time", 0, -1).member?(channel)).to be false
end
end

Expand Down
Loading