diff --git a/.rubocop.yml b/.rubocop.yml index c62cbd3..e9786fb 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -24,3 +24,7 @@ AllCops: Style/ArgumentsForwarding: Enabled: false + +Style/GlobalVars: + Exclude: + - "spec/**/*.rb" diff --git a/CHANGELOG.md b/CHANGELOG.md index 349efc7..92856b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## Unreleased +### Changed + +- Redis subscriptions store configuration has been decoupled from AnyCable, so you can use any broadcasting adapter and configure Redis as you like. [@palkan] ([#44](https://github.com/anycable/graphql-anycable/pull/44)) + ## 1.2.0 - 2024-05-07 ### Added diff --git a/README.md b/README.md index 90951ae..7bde3f7 100644 --- a/README.md +++ b/README.md @@ -11,58 +11,52 @@ A (mostly) drop-in replacement for default ActionCable subscriptions adapter shi ## Why? -AnyCable is fast because it does not execute any Ruby code. But default subscription implementation shipped with [graphql gem] requires to do exactly that: re-evaluate GraphQL queries in ActionCable process. AnyCable doesn't support this (it's possible but hard to implement). +AnyCable is fast because it does not execute any Ruby code. But default subscription implementation shipped with [graphql gem] requires to do exactly that: re-evaluate GraphQL queries in Action Cable process. AnyCable doesn't support this (it's possible but hard to implement). See https://github.com/anycable/anycable-rails/issues/40 for more details and discussion. ## Differences - - Subscription information is stored in Redis database configured to be used by AnyCable. Expiration or data cleanup should be configured separately (see below). - - GraphQL queries for all subscriptions are re-executed in the process that triggers event (it may be web server, async jobs, rake tasks or whatever) +- Subscription information is stored in a Redis database. By default, we use AnyCable Redis configuration. Expiration or data cleanup should be configured separately (see below). +- GraphQL queries for all subscriptions are re-executed in the process that triggers event (it may be web server, async jobs, rake tasks or whatever) ## Compatibility - - Should work with ActionCable in development - - Should work without Rails via [LiteCable] - -## Requirements - -AnyCable must be configured with redis broadcast adapter (this is default). +- Works with Action Cable (e.g., in development/test) +- Works without Rails (e.g., via [LiteCable][]) ## Installation Add this line to your application's Gemfile: ```ruby -gem 'graphql-anycable', '~> 1.0' +gem "graphql-anycable", "~> 1.0" ``` And then execute: - $ bundle - -Or install it yourself as: - - $ gem install graphql-anycable +```sh +bundle install +``` ## Usage - 1. Plug it into the schema (replace from ActionCable adapter if you have one): - + 1. Plug it into the schema (replace the Action Cable adapter if you have one): + ```ruby class MySchema < GraphQL::Schema use GraphQL::AnyCable, broadcast: true - + subscription SubscriptionType end ``` - - 2. Execute query in ActionCable/LiteCable channel. - + + 2. Execute a query within an Action Cable/LiteCable channel. + ```ruby class GraphqlChannel < ApplicationCable::Channel def execute(data) - result = + result = MySchema.execute( query: data["query"], context: context, @@ -75,11 +69,11 @@ Or install it yourself as: more: result.subscription?, ) end - + def unsubscribed MySchema.subscriptions.delete_channel_subscriptions(self) end - + private def context @@ -90,29 +84,38 @@ Or install it yourself as: end end ``` - - Make sure that you're passing channel instance as `channel` key to the context. - + + Make sure that you're passing channel instance as `channel` key to the context. + 3. Trigger events as usual: - + ```ruby MySchema.subscriptions.trigger(:product_updated, {}, Product.first!, scope: account.id) ``` + 4. (Optional) When using other AnyCable broadcasting adapters than Redis, you MUST configure Redis for graphql-anycable yourself: + + ```ruby + GraphQL::AnyCable.redis = Redis.new(url: ENV["REDIS_URL"]) + + # you can also use a Proc (e.g., if you want to use a connection pool) + redis_pool = ConnectionPool.new(size: 10) { Redis.new(url: ENV["REDIS_URL"]) } + + GraphQL::AnyCable.redis = ->(&block) { redis_pool.with { |conn| block.call(conn) } } + ``` + ## Broadcasting By default, graphql-anycable evaluates queries and transmits results for every subscription client individually. Of course, it is a waste of resources if you have hundreds or thousands clients subscribed to the same data (and has huge negative impact on performance). Thankfully, GraphQL-Ruby has added [Subscriptions Broadcast](https://graphql-ruby.org/subscriptions/broadcast.html) feature that allows to group exact same subscriptions, execute them and transmit results only once. -To enable this feature, turn on [Interpreter](https://graphql-ruby.org/queries/interpreter.html) and pass `broadcast` option set to `true` to graphql-anycable. +To enable this feature, pass the `broadcast` option set to `true` to graphql-anycable. By default all fields are marked as _not safe for broadcasting_. If a subscription has at least one non-broadcastable field in its query, GraphQL-Ruby will execute every subscription for every client independently. If you sure that all your fields are safe to be broadcasted, you can pass `default_broadcastable` option set to `true` (but be aware that it can have security impllications!) ```ruby class MySchema < GraphQL::Schema - use GraphQL::Execution::Interpreter # Required for graphql-ruby before 1.12. Remove it when upgrading to 2.0 - use GraphQL::Analysis::AST # Required for graphql-ruby before 1.12. Remove it when upgrading to 2.0 use GraphQL::AnyCable, broadcast: true, default_broadcastable: true subscription SubscriptionType @@ -125,7 +128,7 @@ See GraphQL-Ruby [broadcasting docs](https://graphql-ruby.org/subscriptions/broa To avoid filling Redis storage with stale subscription data: - 1. Set `subscription_expiration_seconds` setting to number of seconds (e.g. `604800` for 1 week). See [configuration](#Configuration) section below for details. + 1. Set `subscription_expiration_seconds` setting to number of seconds (e.g. `604800` for 1 week). See [configuration](#configuration) section below for details. 2. Execute `rake graphql:anycable:clean` once in a while to clean up stale subscription data. @@ -165,39 +168,46 @@ GraphQL-AnyCable uses [anyway_config] to configure itself. There are several pos And any other way provided by [anyway_config]. Check its documentation! ## Emergency actions -In situations when you don't set `subscription_expiration_seconds`, have a lot of inactive subscriptions, and `GraphQL::AnyCable::Cleaner` does`t help in that, + +In situations when you don't set `subscription_expiration_seconds`, have a lot of inactive subscriptions, and `GraphQL::AnyCable::Cleaner` does`t help in that, you can do the following actions for clearing subscriptions 1. Set `config.subscription_expiration_seconds`. After that, the new subscriptions will have `TTL` + 2. Run the script + ```ruby - redis = GraphQL::AnyCable.redis - config = GraphQL::AnyCable.config - - # do it for subscriptions - redis.scan_each("graphql-subscription:*") do |key| - redis.expire(key, config.subscription_expiration_seconds) if redis.ttl(key) < 0 - # or you can just remove it immediately - # redis.del(key) if redis.ttl(key) < 0 - end - - # do it for channels - redis.scan_each("graphql-channel:*") do |key| - redis.expire(key, config.subscription_expiration_seconds) if redis.ttl(key) < 0 - # or you can just remove it immediately - # redis.del(key) if redis.ttl(key) < 0 - end +config = GraphQL::AnyCable.config + +GraphQL::AnyCable.with_redis do |redis| + # do it for subscriptions + redis.scan_each("graphql-subscription:*") do |key| + redis.expire(key, config.subscription_expiration_seconds) if redis.ttl(key) < 0 + # or you can just remove it immediately + # redis.del(key) if redis.ttl(key) < 0 + end + + # do it for channels + redis.scan_each("graphql-channel:*") do |key| + redis.expire(key, config.subscription_expiration_seconds) if redis.ttl(key) < 0 + # or you can just remove it immediately + # redis.del(key) if redis.ttl(key) < 0 + end +end ``` -Or you can change the `redis_prefix` in the `configuration` and then remove all records with the old_prefix -For instance: +Or you can change the `redis_prefix` in the `configuration` and then remove all records with the old_prefix. For instance: + +1. Change the `redis_prefix`. The default `redis_prefix` is `graphql`. + +2. Run the ruby script, which remove all records with `old prefix`: -1. Change the `redis_prefix`. The default `redis_prefix` is `graphql` -2. Run the ruby script, which remove all records with `old prefix` ```ruby +GraphQL::AnyCable.with_redis do |redis| redis.scan_each("graphql-*") do |key| redis.del(key) end +end ``` ## Data model @@ -206,21 +216,21 @@ As in AnyCable there is no place to store subscription data in-memory, it should 1. Grouped event subscriptions: `graphql-fingerprints:#{event.topic}` sorted set. Used to find all subscriptions on `GraphQLSchema.subscriptions.trigger`. - ``` + ```sh ZREVRANGE graphql-fingerprints:1:myStats: 0 -1 => 1:myStats:/MyStats/fBDZmJU1UGTorQWvOyUeaHVwUxJ3T9SEqnetj6SKGXc=/0/RBNvo1WzZ4oRRq0W9-hknpT7T8If536DEMBg9hyq_4o= ``` 2. Event subscriptions: `graphql-subscriptions:#{event.fingerptint}` set containing identifiers for all subscriptions for given operation with certain context and arguments (serialized in _topic_). Fingerprints are already scoped by topic. - ``` + ```sh SMEMBERS graphql-subscriptions:1:myStats:/MyStats/fBDZmJU1UGTorQWvOyUeaHVwUxJ3T9SEqnetj6SKGXc=/0/RBNvo1WzZ4oRRq0W9-hknpT7T8If536DEMBg9hyq_4o= => 52ee8d65-275e-4d22-94af-313129116388 ``` 3. Subscription data: `graphql-subscription:#{subscription_id}` hash contains everything required to evaluate subscription on trigger and create data for client. - ``` + ```sh HGETALL graphql-subscription:52ee8d65-275e-4d22-94af-313129116388 => { context: '{"user_id":1,"user":{"__gid__":"Z2lkOi8vZWJheS1tYWcyL1VzZXIvMQ"}}', @@ -232,29 +242,29 @@ As in AnyCable there is no place to store subscription data in-memory, it should 4. Channel subscriptions: `graphql-channel:#{subscription_id}` set containing identifiers for subscriptions created in ActionCable channel to delete them on client disconnect. - ``` + ```sh SMEMBERS graphql-channel:17420c6ed9e => 52ee8d65-275e-4d22-94af-313129116388 ``` ## Stats -You can grab Redis subscription statistics by calling +You can grab Redis subscription statistics by calling: ```ruby - GraphQL::AnyCable.stats +GraphQL::AnyCable.stats ``` -It will return a total of the amount of the key with the following prefixes +It will return a total of the amount of the key with the following prefixes: -``` - graphql-subscription - graphql-fingerprints - graphql-subscriptions - graphql-channel +```txt +graphql-subscription +graphql-fingerprints +graphql-subscriptions +graphql-channel ``` -The response will look like this +The response will look like this: ```json { @@ -267,13 +277,13 @@ The response will look like this } ``` -You can also grab the number of subscribers grouped by subscriptions +You can also grab the number of subscribers grouped by subscriptions: ```ruby - GraphQL::AnyCable.stats(include_subscriptions: true) +GraphQL::AnyCable.stats(include_subscriptions: true) ``` -It will return the response that contains `subscriptions` +It will return the response that contains `subscriptions`: ```json { @@ -290,59 +300,58 @@ It will return the response that contains `subscriptions` } ``` -Also, you can set another `scan_count`, if needed. -The default value is 1_000 +Also, you can set another `scan_count`, if needed. The default value is 1_000: ```ruby - GraphQL::AnyCable.stats(scan_count: 100) +GraphQL::AnyCable.stats(scan_count: 100) ``` -We can set statistics data to [Yabeda][] for tracking amount of subscriptions +We can set statistics data to [Yabeda][] for tracking amount of subscriptions: ```ruby - # config/initializers/metrics.rb - Yabeda.configure do - group :graphql_anycable_statistics do - gauge :subscriptions_count, comment: "Number of graphql-anycable subscriptions" - end +# config/initializers/metrics.rb +Yabeda.configure do + group :graphql_anycable_statistics do + gauge :subscriptions_count, comment: "Number of graphql-anycable subscriptions" end +end ``` ```ruby - # in your app - statistics = GraphQL::AnyCable.stats[:total] +# in your app +statistics = GraphQL::AnyCable.stats[:total] - statistics.each do |key , value| - Yabeda.graphql_anycable_statistics.subscriptions_count.set({name: key}, value) - end +statistics.each do |key , value| + Yabeda.graphql_anycable_statistics.subscriptions_count.set({name: key}, value) +end ``` -Or you can use `collect` +Or you can use `collect`: + ```ruby - # config/initializers/metrics.rb - Yabeda.configure do - group :graphql_anycable_statistics do - gauge :subscriptions_count, comment: "Number of graphql-anycable subscriptions" - end - - collect do - statistics = GraphQL::AnyCable.stats[:total] +# config/initializers/metrics.rb +Yabeda.configure do + group :graphql_anycable_statistics do + gauge :subscriptions_count, comment: "Number of graphql-anycable subscriptions" + end - statistics.each do |redis_prefix, value| - graphql_anycable_statistics.subscriptions_count.set({name: redis_prefix}, value) - end + collect do + statistics = GraphQL::AnyCable.stats[:total] + + statistics.each do |redis_prefix, value| + graphql_anycable_statistics.subscriptions_count.set({name: redis_prefix}, value) end end +end ``` - ## Testing applications which use `graphql-anycable` You can pass custom redis-server URL to AnyCable using ENV variable. - ```bash - REDIS_URL=redis://localhost:6379/5 bundle exec rspec - ``` +```bash +REDIS_URL=redis://localhost:6379/5 bundle exec rspec +``` ## Development @@ -382,7 +391,6 @@ To install this gem onto your local machine, run `bundle exec rake install`. To 7. GitHub Actions will create a new release, build and push gem into [rubygems.org](https://rubygems.org)! You're done! - ## Contributing Bug reports and pull requests are welcome on GitHub at https://github.com/Envek/graphql-anycable. diff --git a/lib/graphql-anycable.rb b/lib/graphql-anycable.rb index 35fafeb..571e195 100644 --- a/lib/graphql-anycable.rb +++ b/lib/graphql-anycable.rb @@ -11,33 +11,55 @@ module GraphQL module AnyCable - def self.use(schema, **opts) - schema.use(GraphQL::Subscriptions::AnyCableSubscriptions, **opts) - end + class << self + def use(schema, **opts) + schema.use(GraphQL::Subscriptions::AnyCableSubscriptions, **opts) + end - def self.stats(**opts) - Stats.new(**opts).collect - end + def stats(**opts) + Stats.new(**opts).collect + end + + def redis + warn "Usage of `GraphQL::AnyCable.redis` is deprecated. Instead of `GraphQL::AnyCable.redis.whatever` use `GraphQL::AnyCable.with_redis { |redis| redis.whatever }`" + @redis ||= with_redis { |conn| conn } + end + + def redis=(connector) + @redis_connector = if connector.is_a?(::Proc) + connector + else + ->(&block) { block.call connector } + end + end + + def with_redis(&block) + @redis_connector || default_redis_connector + @redis_connector.call(&block) + end + + def config + @config ||= Config.new + end + + def configure + yield(config) if block_given? + end - module_function + private - def redis - @redis ||= begin + def default_redis_connector adapter = ::AnyCable.broadcast_adapter unless adapter.is_a?(::AnyCable::BroadcastAdapters::Redis) raise "Unsupported AnyCable adapter: #{adapter.class}. " \ - "graphql-anycable works only with redis broadcast adapter." + "Please, configure Redis connector manually:\n\n" \ + " GraphQL::AnyCable.configure do |config|\n" \ + " config.redis = Redis.new(url: 'redis://localhost:6379/0')\n" \ + " end\n" end - ::AnyCable.broadcast_adapter.redis_conn - end - end - def config - @config ||= GraphQL::AnyCable::Config.new - end - - def configure - yield(config) if block_given? + self.redis = ::AnyCable.broadcast_adapter.redis_conn + end end end end diff --git a/lib/graphql/anycable/stats.rb b/lib/graphql/anycable/stats.rb index 458d5f8..e54703e 100644 --- a/lib/graphql/anycable/stats.rb +++ b/lib/graphql/anycable/stats.rb @@ -18,12 +18,14 @@ def initialize(scan_count: SCAN_COUNT_RECORDS_AMOUNT, include_subscriptions: fal def collect total_subscriptions_result = {total: {}} - list_prefixes_keys.each do |name, prefix| - total_subscriptions_result[:total][name] = count_by_scan(match: "#{prefix}*") - end + AnyCable.with_redis do |redis| + list_prefixes_keys.each do |name, prefix| + total_subscriptions_result[:total][name] = count_by_scan(redis, match: "#{prefix}*") + end - if include_subscriptions - total_subscriptions_result[:subscriptions] = group_subscription_stats + if include_subscriptions + total_subscriptions_result[:subscriptions] = group_subscription_stats(redis) + end end total_subscriptions_result @@ -32,7 +34,7 @@ def collect private # Counting all keys, that match the pattern with iterating by count - def count_by_scan(match:) + def count_by_scan(redis, match:) sb_amount = 0 cursor = "0" @@ -47,7 +49,7 @@ def count_by_scan(match:) end # Calculate subscribes, grouped by subscriptions - def group_subscription_stats + def group_subscription_stats(redis) subscription_groups = {} redis.scan_each(match: "#{list_prefixes_keys[:fingerprints]}*", count: scan_count) do |fingerprint_key| @@ -79,10 +81,6 @@ def adapter GraphQL::Subscriptions::AnyCableSubscriptions end - def redis - GraphQL::AnyCable.redis - end - def config GraphQL::AnyCable.config end diff --git a/lib/graphql/subscriptions/anycable_subscriptions.rb b/lib/graphql/subscriptions/anycable_subscriptions.rb index 60b8805..34c3669 100644 --- a/lib/graphql/subscriptions/anycable_subscriptions.rb +++ b/lib/graphql/subscriptions/anycable_subscriptions.rb @@ -54,7 +54,8 @@ class Subscriptions class AnyCableSubscriptions < GraphQL::Subscriptions extend Forwardable - def_delegators :"GraphQL::AnyCable", :redis, :config + def_delegators :"GraphQL::AnyCable", :with_redis, :config + def_delegators :"::AnyCable", :broadcast SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, … FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic @@ -70,16 +71,18 @@ def initialize(serializer: Serialize, **rest) # An event was triggered. # Re-evaluate all subscribed queries and push the data over ActionCable. def execute_all(event, object) - fingerprints = redis.zrange(redis_key(FINGERPRINTS_PREFIX) + event.topic, 0, -1) + fingerprints = with_redis { |redis| redis.zrange(redis_key(FINGERPRINTS_PREFIX) + event.topic, 0, -1) } return if fingerprints.empty? - fingerprint_subscription_ids = fingerprints.zip( - redis.pipelined do |pipeline| - fingerprints.map do |fingerprint| - pipeline.smembers(redis_key(SUBSCRIPTIONS_PREFIX) + fingerprint) + fingerprint_subscription_ids = with_redis do |redis| + fingerprints.zip( + redis.pipelined do |pipeline| + fingerprints.map do |fingerprint| + pipeline.smembers(redis_key(SUBSCRIPTIONS_PREFIX) + fingerprint) + end end - end - ).to_h + ).to_h + end fingerprint_subscription_ids.each do |fingerprint, subscription_ids| execute_grouped(fingerprint, subscription_ids, event, object) @@ -94,7 +97,7 @@ def execute_all(event, object) def execute_grouped(fingerprint, subscription_ids, event, object) return if subscription_ids.empty? - subscription_id = subscription_ids.find { |sid| redis.exists?(redis_key(SUBSCRIPTION_PREFIX) + sid) } + subscription_id = with_redis { |redis| subscription_ids.find { |sid| redis.exists?(redis_key(SUBSCRIPTION_PREFIX) + sid) } } return unless subscription_id # All subscriptions has expired but haven't cleaned up yet result = execute_update(subscription_id, event, object) @@ -115,7 +118,7 @@ def execute(subscription_id, event, object) # @param result [#to_h] result to send to clients def deliver(stream_key, result) payload = {result: result.to_h, more: true}.to_json - anycable.broadcast(stream_key, payload) + broadcast(stream_key, payload) end # Save query to "storage" (in redis) @@ -141,34 +144,55 @@ def write_subscription(query, events) events: events.map { |e| [e.topic, e.fingerprint] }.to_h.to_json } - redis.multi do |pipeline| - pipeline.sadd(redis_key(CHANNEL_PREFIX) + subscription_id, [subscription_id]) - pipeline.mapped_hmset(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, data) - events.each do |event| - pipeline.zincrby(redis_key(FINGERPRINTS_PREFIX) + event.topic, 1, event.fingerprint) - pipeline.sadd(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint, [subscription_id]) + with_redis do |redis| + redis.multi do |pipeline| + pipeline.sadd(redis_key(CHANNEL_PREFIX) + subscription_id, [subscription_id]) + pipeline.mapped_hmset(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, data) + events.each do |event| + pipeline.zincrby(redis_key(FINGERPRINTS_PREFIX) + event.topic, 1, event.fingerprint) + pipeline.sadd(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint, [subscription_id]) + end + next unless config.subscription_expiration_seconds + pipeline.expire(redis_key(CHANNEL_PREFIX) + subscription_id, config.subscription_expiration_seconds) + pipeline.expire(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, config.subscription_expiration_seconds) end - next unless config.subscription_expiration_seconds - pipeline.expire(redis_key(CHANNEL_PREFIX) + subscription_id, config.subscription_expiration_seconds) - pipeline.expire(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, config.subscription_expiration_seconds) end end # Return the query from "storage" (in redis) def read_subscription(subscription_id) - redis.mapped_hmget( - "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}", - :query_string, :variables, :context, :operation_name - ).tap do |subscription| - next if subscription.values.all?(&:nil?) # Redis returns hash with all nils for missing key - - subscription[:context] = @serializer.load(subscription[:context]) - subscription[:variables] = JSON.parse(subscription[:variables]) - subscription[:operation_name] = nil if subscription[:operation_name].strip == "" + with_redis do |redis| + redis.mapped_hmget( + "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}", + :query_string, :variables, :context, :operation_name + ).tap do |subscription| + next if subscription.values.all?(&:nil?) # Redis returns hash with all nils for missing key + + subscription[:context] = @serializer.load(subscription[:context]) + subscription[:variables] = JSON.parse(subscription[:variables]) + subscription[:operation_name] = nil if subscription[:operation_name].strip == "" + end end end - def delete_subscription(subscription_id) + # The channel was closed, forget about it and its subscriptions + def delete_channel_subscriptions(channel) + raise(ArgumentError, "Please pass channel instance to #{__method__} in your #unsubscribed method") if channel.is_a?(String) + + channel_id = read_subscription_id(channel) + + # Missing in case disconnect happens before #execute + return unless channel_id + + with_redis do |redis| + redis.smembers(redis_key(CHANNEL_PREFIX) + channel_id).each do |subscription_id| + delete_subscription(subscription_id, redis: redis) + end + redis.del(redis_key(CHANNEL_PREFIX) + channel_id) + end + end + + def delete_subscription(subscription_id, redis: AnyCable.redis) events = redis.hget(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, :events) events = events ? JSON.parse(events) : {} fingerprint_subscriptions = {} @@ -189,27 +213,8 @@ def delete_subscription(subscription_id) end end - # The channel was closed, forget about it and its subscriptions - def delete_channel_subscriptions(channel) - raise(ArgumentError, "Please pass channel instance to #{__method__} in your #unsubscribed method") if channel.is_a?(String) - - channel_id = read_subscription_id(channel) - - # Missing in case disconnect happens before #execute - return unless channel_id - - redis.smembers(redis_key(CHANNEL_PREFIX) + channel_id).each do |subscription_id| - delete_subscription(subscription_id) - end - redis.del(redis_key(CHANNEL_PREFIX) + channel_id) - end - private - def anycable - @anycable ||= ::AnyCable.broadcast_adapter - end - def read_subscription_id(channel) return channel.instance_variable_get(:@__sid__) if channel.instance_variable_defined?(:@__sid__) diff --git a/spec/graphql/anycable_spec.rb b/spec/graphql/anycable_spec.rb index caac7e7..e6c7862 100644 --- a/spec/graphql/anycable_spec.rb +++ b/spec/graphql/anycable_spec.rb @@ -30,8 +30,6 @@ double("Channel", id: "legacy_id", params: {"channelId" => "legacy_id"}, stream_from: nil, connection: connection) end - let(:anycable) { AnyCable.broadcast_adapter } - let(:subscription_id) do "some-truly-random-number" end @@ -41,7 +39,7 @@ end before do - allow(anycable).to receive(:broadcast) + allow(AnyCable).to receive(:broadcast) allow_any_instance_of(GraphQL::Subscriptions::Event).to receive(:fingerprint).and_return(fingerprint) allow_any_instance_of(GraphQL::Subscriptions).to receive(:build_id).and_return("ohmycables") end @@ -54,7 +52,7 @@ it "broadcasts message when event is being triggered" do subject AnycableSchema.subscriptions.trigger(:product_updated, {}, {id: 1, title: "foo"}) - expect(anycable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", expected_result) + expect(AnyCable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", expected_result) end context "with multiple subscriptions in one query" do @@ -71,7 +69,7 @@ it "broadcasts message only for update event" do subject AnycableSchema.subscriptions.trigger(:product_updated, {}, {id: 1, title: "foo"}) - expect(anycable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", expected_result) + expect(AnyCable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", expected_result) end end @@ -86,7 +84,7 @@ subject AnycableSchema.subscriptions.trigger(:product_created, {}, {id: 1, title: "Gravizapa"}) - expect(anycable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", expected_result) + expect(AnyCable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", expected_result) end end end @@ -124,7 +122,7 @@ ) end - let(:redis) { AnycableSchema.subscriptions.redis } + let(:redis) { $redis } subject do AnycableSchema.subscriptions.delete_channel_subscriptions(channel) @@ -160,7 +158,7 @@ ) end - let(:redis) { AnycableSchema.subscriptions.redis } + let(:redis) { $redis } subject do AnycableSchema.subscriptions.delete_channel_subscriptions(channel) diff --git a/spec/graphql/broadcast_spec.rb b/spec/graphql/broadcast_spec.rb index 7173485..bfbfd5e 100644 --- a/spec/graphql/broadcast_spec.rb +++ b/spec/graphql/broadcast_spec.rb @@ -26,11 +26,9 @@ def subscribe(query) GRAPHQL end - let(:anycable) { AnyCable.broadcast_adapter } - before do allow(channel).to receive(:stream_from) - allow(anycable).to receive(:broadcast) + allow(AnyCable).to receive(:broadcast) end context "when all clients asks for broadcastable fields only" do @@ -44,7 +42,7 @@ def subscribe(query) 2.times { subscribe(query) } BroadcastSchema.subscriptions.trigger(:post_created, {}, object) expect(object).to have_received(:title).once - expect(anycable).to have_received(:broadcast).once + expect(AnyCable).to have_received(:broadcast).once end end @@ -59,7 +57,7 @@ def subscribe(query) 2.times { subscribe(query) } BroadcastSchema.subscriptions.trigger(:post_created, {}, object) expect(object).to have_received(:title).twice - expect(anycable).to have_received(:broadcast).twice + expect(AnyCable).to have_received(:broadcast).twice end end @@ -70,7 +68,7 @@ def subscribe(query) GRAPHQL end - let(:redis) { AnycableSchema.subscriptions.redis } + let(:redis) { $redis } it "doesn't fail" do 3.times { subscribe(query) } @@ -78,7 +76,7 @@ def subscribe(query) expect(redis.keys("graphql-subscription:*").size).to eq(2) expect { BroadcastSchema.subscriptions.trigger(:post_created, {}, object) }.not_to raise_error expect(object).to have_received(:title).once - expect(anycable).to have_received(:broadcast).once + expect(AnyCable).to have_received(:broadcast).once end end end diff --git a/spec/integrations/broadcastable_subscriptions_spec.rb b/spec/integrations/broadcastable_subscriptions_spec.rb index d37093d..74c4d60 100644 --- a/spec/integrations/broadcastable_subscriptions_spec.rb +++ b/spec/integrations/broadcastable_subscriptions_spec.rb @@ -25,7 +25,7 @@ subject { handler.handle(:command, request) } - before { allow(AnyCable.broadcast_adapter).to receive(:broadcast) } + before { allow(AnyCable).to receive(:broadcast) } describe "execute" do it "responds with result" do @@ -59,8 +59,27 @@ end end - describe "unsubscribe" do - let(:redis) { AnycableSchema.subscriptions.redis } + describe "unsubscribe + custom Redis connector" do + let(:custom_redis_url) { REDIS_TEST_DB_URL.sub(/(\/\d+\/?)?$/, "/5") } + + let(:redis_connections) do + Array.new(2) { Redis.new(url: custom_redis_url) } + end + + let(:redis_enumerator) do + redis_connections.cycle + end + + before do + GraphQL::AnyCable.redis = ->(&block) { block.call redis_enumerator.next } + end + + after do + GraphQL::AnyCable.remove_instance_variable(:@redis_connector) + redis.flushdb + end + + let(:redis) { Redis.new(url: custom_redis_url) } specify "removes subscription from the store" do # first, subscribe to obtain the connection state @@ -121,7 +140,7 @@ expect(redis.keys("graphql-subscriptions:*").size).to eq(1) schema.subscriptions.trigger(:post_updated, {id: "a"}, POSTS.first) - expect(AnyCable.broadcast_adapter).to have_received(:broadcast).once + expect(AnyCable).to have_received(:broadcast).once first_state = response.istate @@ -136,7 +155,7 @@ expect(redis.keys("graphql-subscriptions:*").size).to eq(1) schema.subscriptions.trigger(:post_updated, {id: "a"}, POSTS.first) - expect(AnyCable.broadcast_adapter).to have_received(:broadcast).twice + expect(AnyCable).to have_received(:broadcast).twice second_state = response_2.istate @@ -151,7 +170,7 @@ expect(redis.keys("graphql-subscriptions:*").size).to eq(0) schema.subscriptions.trigger(:post_updated, {id: "a"}, POSTS.first) - expect(AnyCable.broadcast_adapter).to have_received(:broadcast).twice + expect(AnyCable).to have_received(:broadcast).twice end context "without subscription" do diff --git a/spec/integrations/per_client_subscriptions_spec.rb b/spec/integrations/per_client_subscriptions_spec.rb index 7ec56cf..3c96591 100644 --- a/spec/integrations/per_client_subscriptions_spec.rb +++ b/spec/integrations/per_client_subscriptions_spec.rb @@ -25,7 +25,7 @@ subject { handler.handle(:command, request) } - before { allow(AnyCable.broadcast_adapter).to receive(:broadcast) } + before { allow(AnyCable).to receive(:broadcast) } describe "execute" do it "responds with result" do @@ -62,7 +62,7 @@ end describe "unsubscribe" do - let(:redis) { AnycableSchema.subscriptions.redis } + let(:redis) { $redis } specify "removes subscription from the store" do # first, subscribe to obtain the connection state @@ -102,7 +102,7 @@ expect(redis.keys("graphql-subscriptions:*").size).to eq(2) schema.subscriptions.trigger(:post_updated, {id: "a"}, POSTS.first) - expect(AnyCable.broadcast_adapter).to have_received(:broadcast).twice + expect(AnyCable).to have_received(:broadcast).twice first_state = response.istate @@ -117,7 +117,7 @@ expect(redis.keys("graphql-subscriptions:*").size).to eq(1) schema.subscriptions.trigger(:post_updated, {id: "a"}, POSTS.first) - expect(AnyCable.broadcast_adapter).to have_received(:broadcast).thrice + expect(AnyCable).to have_received(:broadcast).thrice second_state = response_2.istate @@ -132,7 +132,7 @@ expect(redis.keys("graphql-subscriptions:*").size).to eq(0) schema.subscriptions.trigger(:post_updated, {id: "a"}, POSTS.first) - expect(AnyCable.broadcast_adapter).to have_received(:broadcast).thrice + expect(AnyCable).to have_received(:broadcast).thrice end context "without subscription" do diff --git a/spec/integrations/rails_spec.rb b/spec/integrations/rails_spec.rb index cb94c66..030d18e 100644 --- a/spec/integrations/rails_spec.rb +++ b/spec/integrations/rails_spec.rb @@ -94,7 +94,7 @@ def context subject { handler.handle(:command, request) } - before { allow(AnyCable.broadcast_adapter).to receive(:broadcast) } + before { allow(AnyCable).to receive(:broadcast) } let(:query) do <<~GQL @@ -108,7 +108,7 @@ def context GQL end - let(:redis) { AnycableSchema.subscriptions.redis } + let(:redis) { $redis } it "execute multiple clients + trigger + disconnect one by one" do # first, subscribe to obtain the connection state @@ -129,7 +129,7 @@ def context expect(redis.keys("graphql-subscriptions:*").size).to eq(1) schema.subscriptions.trigger(:post_updated, {id: "a"}, POSTS.first) - expect(AnyCable.broadcast_adapter).to have_received(:broadcast).once + expect(AnyCable).to have_received(:broadcast).once first_state = response.istate @@ -144,7 +144,7 @@ def context expect(redis.keys("graphql-subscriptions:*").size).to eq(1) schema.subscriptions.trigger(:post_updated, {id: "a"}, POSTS.first) - expect(AnyCable.broadcast_adapter).to have_received(:broadcast).twice + expect(AnyCable).to have_received(:broadcast).twice second_state = response_2.istate @@ -164,6 +164,6 @@ def context expect(redis.keys("graphql-subscriptions:*").size).to eq(0) schema.subscriptions.trigger(:post_updated, {id: "a"}, POSTS.first) - expect(AnyCable.broadcast_adapter).to have_received(:broadcast).twice + expect(AnyCable).to have_received(:broadcast).twice end end diff --git a/spec/redis_helper.rb b/spec/redis_helper.rb index 28ddfb1..a42aff6 100644 --- a/spec/redis_helper.rb +++ b/spec/redis_helper.rb @@ -1,17 +1,15 @@ # frozen_string_literal: true -REDIS_TEST_DB_URL = "redis://localhost:6379/6" +REDIS_TEST_DB_URL = ENV.fetch("REDIS_URL", "redis://localhost:6379/6") -def setup_redis_test_db - test_url = ENV.fetch("REDIS_URL", REDIS_TEST_DB_URL) - channel = AnyCable.broadcast_adapter.channel - AnyCable.broadcast_adapter = :redis, {url: test_url, channel: channel} -end +channel = AnyCable.broadcast_adapter.channel + +AnyCable.broadcast_adapter = :redis, {url: REDIS_TEST_DB_URL, channel: channel} -setup_redis_test_db +$redis = Redis.new(url: REDIS_TEST_DB_URL) RSpec.configure do |config| config.before do - GraphQL::AnyCable.redis.flushdb + GraphQL::AnyCable.with_redis(&:flushdb) end end