Skip to content

Commit

Permalink
Merge pull request #36 from prog-supdex/feat/configurable_redis_prefix
Browse files Browse the repository at this point in the history
feat: Add configurable graphql redis_prefix
  • Loading branch information
palkan authored Sep 13, 2023
2 parents 2f3217e + 21b0393 commit 8391875
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 48 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ GraphQL-AnyCable uses [anyway_config] to configure itself. There are several pos
GRAPHQL_ANYCABLE_SUBSCRIPTION_EXPIRATION_SECONDS=604800
GRAPHQL_ANYCABLE_USE_REDIS_OBJECT_ON_CLEANUP=true
GRAPHQL_ANYCABLE_USE_CLIENT_PROVIDED_UNIQ_ID=false
GRAPHQL_ANYCABLE_REDIS_PREFIX=graphql
```
2. YAML configuration files (note that this is `config/graphql_anycable.yml`, *not* `config/anycable.yml`):
Expand All @@ -151,13 +152,15 @@ GraphQL-AnyCable uses [anyway_config] to configure itself. There are several pos
subscription_expiration_seconds: 300 # 5 minutes
use_redis_object_on_cleanup: false # For restricted redis installations
use_client_provided_uniq_id: false # To avoid problems with non-uniqueness of Apollo channel identifiers
redis_prefix: graphql # You can configure redis_prefix for anycable-graphql subscription prefixes. Default value "graphql"
```
3. Configuration from your application code:
```ruby
GraphQL::AnyCable.configure do |config|
config.subscription_expiration_seconds = 3600 # 1 hour
config.redis_prefix = "graphql" # on our side, we add `-` ourselves after the redis_prefix
end
```
Expand Down
1 change: 1 addition & 0 deletions lib/graphql/anycable/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class Config < Anyway::Config
attr_config subscription_expiration_seconds: nil
attr_config use_redis_object_on_cleanup: true
attr_config use_client_provided_uniq_id: true
attr_config redis_prefix: "graphql" # Here, we set clear redis_prefix without any hyphen. The hyphen is added at the end of this value on our side.
end
end
end
50 changes: 27 additions & 23 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ class AnyCableSubscriptions < GraphQL::Subscriptions

def_delegators :"GraphQL::AnyCable", :redis, :config

SUBSCRIPTION_PREFIX = "graphql-subscription:" # HASH: Stores subscription data: query, context, …
FINGERPRINTS_PREFIX = "graphql-fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "graphql-subscriptions:" # SET: To get subscriptions by fingerprint
CHANNEL_PREFIX = "graphql-channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, …
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup

# @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
def initialize(serializer: Serialize, **rest)
Expand All @@ -70,13 +70,13 @@ def initialize(serializer: Serialize, **rest)
# An event was triggered.
# Re-evaluate all subscribed queries and push the data over ActionCable.
def execute_all(event, object)
fingerprints = redis.zrange(FINGERPRINTS_PREFIX + event.topic, 0, -1)
fingerprints = redis.zrange(redis_key(FINGERPRINTS_PREFIX) + event.topic, 0, -1)
return if fingerprints.empty?

fingerprint_subscription_ids = Hash[fingerprints.zip(
redis.pipelined do |pipeline|
fingerprints.map do |fingerprint|
pipeline.smembers(SUBSCRIPTIONS_PREFIX + fingerprint)
pipeline.smembers(redis_key(SUBSCRIPTIONS_PREFIX) + fingerprint)
end
end
)]
Expand All @@ -94,14 +94,14 @@ def execute_all(event, object)
def execute_grouped(fingerprint, subscription_ids, event, object)
return if subscription_ids.empty?

subscription_id = subscription_ids.find { |sid| redis.exists?(SUBSCRIPTION_PREFIX + sid) }
subscription_id = subscription_ids.find { |sid| redis.exists?(redis_key(SUBSCRIPTION_PREFIX) + sid) }
return unless subscription_id # All subscriptions has expired but haven't cleaned up yet

result = execute_update(subscription_id, event, object)
return unless result

# Having calculated the result _once_, send the same payload to all subscribers
deliver(SUBSCRIPTIONS_PREFIX + fingerprint, result)
deliver(redis_key(SUBSCRIPTIONS_PREFIX) + fingerprint, result)
end

# Disable this method as there is no fingerprint (it can be retrieved from subscription though)
Expand Down Expand Up @@ -133,7 +133,7 @@ def write_subscription(query, events)


events.each do |event|
channel.stream_from(SUBSCRIPTIONS_PREFIX + event.fingerprint)
channel.stream_from(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint)
end

data = {
Expand All @@ -145,22 +145,22 @@ def write_subscription(query, events)
}

redis.multi do |pipeline|
pipeline.sadd(CHANNEL_PREFIX + channel_uniq_id, [subscription_id])
pipeline.mapped_hmset(SUBSCRIPTION_PREFIX + subscription_id, data)
pipeline.sadd(redis_key(CHANNEL_PREFIX) + channel_uniq_id, [subscription_id])
pipeline.mapped_hmset(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, data)
events.each do |event|
pipeline.zincrby(FINGERPRINTS_PREFIX + event.topic, 1, event.fingerprint)
pipeline.sadd(SUBSCRIPTIONS_PREFIX + event.fingerprint, [subscription_id])
pipeline.zincrby(redis_key(FINGERPRINTS_PREFIX) + event.topic, 1, event.fingerprint)
pipeline.sadd(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint, [subscription_id])
end
next unless config.subscription_expiration_seconds
pipeline.expire(CHANNEL_PREFIX + channel_uniq_id, config.subscription_expiration_seconds)
pipeline.expire(SUBSCRIPTION_PREFIX + subscription_id, config.subscription_expiration_seconds)
pipeline.expire(redis_key(CHANNEL_PREFIX) + channel_uniq_id, config.subscription_expiration_seconds)
pipeline.expire(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, config.subscription_expiration_seconds)
end
end

# Return the query from "storage" (in redis)
def read_subscription(subscription_id)
redis.mapped_hmget(
"#{SUBSCRIPTION_PREFIX}#{subscription_id}",
"#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}",
:query_string, :variables, :context, :operation_name
).tap do |subscription|
return if subscription.values.all?(&:nil?) # Redis returns hash with all nils for missing key
Expand All @@ -172,17 +172,17 @@ def read_subscription(subscription_id)
end

def delete_subscription(subscription_id)
events = redis.hget(SUBSCRIPTION_PREFIX + subscription_id, :events)
events = redis.hget(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, :events)
events = events ? JSON.parse(events) : {}
fingerprint_subscriptions = {}
redis.pipelined do |pipeline|
events.each do |topic, fingerprint|
pipeline.srem(SUBSCRIPTIONS_PREFIX + fingerprint, subscription_id)
score = pipeline.zincrby(FINGERPRINTS_PREFIX + topic, -1, fingerprint)
fingerprint_subscriptions[FINGERPRINTS_PREFIX + topic] = score
pipeline.srem(redis_key(SUBSCRIPTIONS_PREFIX) + fingerprint, subscription_id)
score = pipeline.zincrby(redis_key(FINGERPRINTS_PREFIX) + topic, -1, fingerprint)
fingerprint_subscriptions[redis_key(FINGERPRINTS_PREFIX) + topic] = score
end
# Delete subscription itself
pipeline.del(SUBSCRIPTION_PREFIX + subscription_id)
pipeline.del(redis_key(SUBSCRIPTION_PREFIX) + subscription_id)
end
# Clean up fingerprints that doesn't have any subscriptions left
redis.pipelined do |pipeline|
Expand All @@ -200,10 +200,10 @@ def delete_channel_subscriptions(channel_or_id)
# Missing in case disconnect happens before #execute
return unless channel_id

redis.smembers(CHANNEL_PREFIX + channel_id).each do |subscription_id|
redis.smembers(redis_key(CHANNEL_PREFIX) + channel_id).each do |subscription_id|
delete_subscription(subscription_id)
end
redis.del(CHANNEL_PREFIX + channel_id)
redis.del(redis_key(CHANNEL_PREFIX) + channel_id)
end

private
Expand Down Expand Up @@ -239,6 +239,10 @@ def fetch_channel_istate(channel)
channel.connection.socket.istate
end
end

def redis_key(prefix)
"#{config.redis_prefix}-#{prefix}"
end
end
end
end
Expand Down
106 changes: 81 additions & 25 deletions spec/graphql/anycable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,37 +112,75 @@
end

describe ".delete_channel_subscriptions" do
before do
GraphQL::AnyCable.config.use_client_provided_uniq_id = false
end
context "with default config.redis-prefix" do
around do |ex|
GraphQL::AnyCable.config.use_client_provided_uniq_id = false
ex.run
GraphQL::AnyCable.config.use_client_provided_uniq_id = false
end

before do
AnycableSchema.execute(
query: query,
context: { channel: channel, subscription_id: subscription_id },
variables: {},
operation_name: "SomeSubscription",
)
end
before do
AnycableSchema.execute(
query: query,
context: { channel: channel, subscription_id: subscription_id },
variables: {},
operation_name: "SomeSubscription",
)
end

after do
GraphQL::AnyCable.config.use_client_provided_uniq_id = false
end
let(:redis) { AnycableSchema.subscriptions.redis }

let(:redis) { AnycableSchema.subscriptions.redis }
subject do
AnycableSchema.subscriptions.delete_channel_subscriptions(channel)
end

subject do
AnycableSchema.subscriptions.delete_channel_subscriptions(channel)
it "removes subscription from redis" do
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be true
expect(redis.exists?("graphql-channel:some-truly-random-number")).to be true
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be true
subject
expect(redis.exists?("graphql-channel:some-truly-random-number")).to be false
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be false
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be false
end
end

it "removes subscription from redis" do
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be true
expect(redis.exists?("graphql-channel:some-truly-random-number")).to be true
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be true
subject
expect(redis.exists?("graphql-channel:some-truly-random-number")).to be false
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be false
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be false
context "with different config.redis-prefix" do
around do |ex|
old_redis_prefix = GraphQL::AnyCable.config.redis_prefix
GraphQL::AnyCable.config.use_client_provided_uniq_id = false
GraphQL::AnyCable.config.redis_prefix = "graphql-test"

ex.run

GraphQL::AnyCable.config.use_client_provided_uniq_id = false
GraphQL::AnyCable.config.redis_prefix = old_redis_prefix
end

before do
AnycableSchema.execute(
query: query,
context: { channel: channel, subscription_id: subscription_id },
variables: {},
operation_name: "SomeSubscription",
)
end

let(:redis) { AnycableSchema.subscriptions.redis }

subject do
AnycableSchema.subscriptions.delete_channel_subscriptions(channel)
end

it "removes subscription from redis" do
expect(redis.exists?("graphql-test-subscription:some-truly-random-number")).to be true
expect(redis.exists?("graphql-test-channel:some-truly-random-number")).to be true
expect(redis.exists?("graphql-test-fingerprints::productUpdated:")).to be true
subject
expect(redis.exists?("graphql-test-channel:some-truly-random-number")).to be false
expect(redis.exists?("graphql-test-fingerprints::productUpdated:")).to be false
expect(redis.exists?("graphql-test-subscription:some-truly-random-number")).to be false
end
end
end

Expand Down Expand Up @@ -204,4 +242,22 @@
)
end
end

describe ".config" do
it "returns the default redis_prefix" do
expect(GraphQL::AnyCable.config.redis_prefix).to eq("graphql")
end

context "when changed redis_prefix" do
after do
GraphQL::AnyCable.config.redis_prefix = "graphql"
end

it "writes a new value to redis_prefix" do
GraphQL::AnyCable.config.redis_prefix = "new-graphql"

expect(GraphQL::AnyCable.config.redis_prefix).to eq("new-graphql")
end
end
end
end

0 comments on commit 8391875

Please sign in to comment.