From f08db8e89dbd618d015068f4b2763d3a8e556d14 Mon Sep 17 00:00:00 2001 From: prog-supdex Date: Thu, 21 Sep 2023 17:37:32 +0200 Subject: [PATCH] deliver messages asynchronously --- graphql-anycable.gemspec | 1 + lib/graphql-anycable.rb | 1 + lib/graphql/anycable/config.rb | 3 +++ .../subscriptions/adapters/base_job.rb | 17 ++++++++++++ .../subscriptions/anycable_subscriptions.rb | 26 +++++++++++++++++++ 5 files changed, 48 insertions(+) create mode 100644 lib/graphql/subscriptions/adapters/base_job.rb diff --git a/graphql-anycable.gemspec b/graphql-anycable.gemspec index a5496e0..4041b3e 100644 --- a/graphql-anycable.gemspec +++ b/graphql-anycable.gemspec @@ -30,6 +30,7 @@ Gem::Specification.new do |spec| spec.add_dependency "anyway_config", ">= 1.3", "< 3" spec.add_dependency "graphql", ">= 1.11", "< 3" spec.add_dependency "redis", ">= 4.2.0" + spec.add_dependency "activejob", ">= 5.0.0" spec.add_development_dependency "anycable-rails" spec.add_development_dependency "bundler", "~> 2.0" diff --git a/lib/graphql-anycable.rb b/lib/graphql-anycable.rb index 97d18f1..92fa66a 100644 --- a/lib/graphql-anycable.rb +++ b/lib/graphql-anycable.rb @@ -6,6 +6,7 @@ require_relative "graphql/anycable/cleaner" require_relative "graphql/anycable/config" require_relative "graphql/anycable/railtie" if defined?(Rails) +require_relative "graphql/subscriptions/adapters/base_job" require_relative "graphql/anycable/stats" require_relative "graphql/subscriptions/anycable_subscriptions" diff --git a/lib/graphql/anycable/config.rb b/lib/graphql/anycable/config.rb index c422ca6..2816c4c 100644 --- a/lib/graphql/anycable/config.rb +++ b/lib/graphql/anycable/config.rb @@ -12,6 +12,9 @@ class Config < Anyway::Config attr_config use_redis_object_on_cleanup: true attr_config use_client_provided_uniq_id: true attr_config redis_prefix: "graphql" # Here, we set clear redis_prefix without any hyphen. The hyphen is added at the end of this value on our side. + + attr_config use_async_broadcasting: true + attr_config async_broadcasting: { queue: "broadcasting", class: "GraphQL::Adapters::BaseJob" } end end end diff --git a/lib/graphql/subscriptions/adapters/base_job.rb b/lib/graphql/subscriptions/adapters/base_job.rb new file mode 100644 index 0000000..6c67005 --- /dev/null +++ b/lib/graphql/subscriptions/adapters/base_job.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +require "active_job" + +module GraphQL + module Adapters + class BaseJob < ActiveJob::Base + DEFAULT_QUEUE_NAME = :default + + queue_as { GraphQL::AnyCable.config.async_broadcasting["queue"] || DEFAULT_QUEUE_NAME } + + def perform(subscription_object, execute_method, event, object) + Marshal.load(subscription_object).public_send(execute_method, Marshal.load(event), Marshal.load(object)) + end + end + end +end diff --git a/lib/graphql/subscriptions/anycable_subscriptions.rb b/lib/graphql/subscriptions/anycable_subscriptions.rb index 6dec5d8..1dfc291 100644 --- a/lib/graphql/subscriptions/anycable_subscriptions.rb +++ b/lib/graphql/subscriptions/anycable_subscriptions.rb @@ -60,6 +60,7 @@ class AnyCableSubscriptions < GraphQL::Subscriptions FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup + EXECUTOR_METHOD_NAME = "execute_synchronically" # method, who execute the main logic # @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)` def initialize(serializer: Serialize, **rest) @@ -73,6 +74,13 @@ def execute_all(event, object) fingerprints = redis.zrange(redis_key(FINGERPRINTS_PREFIX) + event.topic, 0, -1) return if fingerprints.empty? + perform(event, object) + end + + def execute_synchronically(event, object) + fingerprints = redis.zrange(redis_key(FINGERPRINTS_PREFIX) + event.topic, 0, -1) + return if fingerprints.empty? + fingerprint_subscription_ids = Hash[fingerprints.zip( redis.pipelined do |pipeline| fingerprints.map do |fingerprint| @@ -243,6 +251,24 @@ def fetch_channel_istate(channel) def redis_key(prefix) "#{config.redis_prefix}-#{prefix}" end + + def executor_class_job + custom_class = config.async_broadcasting["class"] + + return Adapters::BaseJob unless custom_class + + Object.const_get(config.async_broadcasting["class"]) + end + + def perform(event, object) + unless config.use_async_broadcasting + return public_send(EXECUTOR_METHOD_NAME, event, object) + end + + args = [Marshal.dump(self), EXECUTOR_METHOD_NAME, Marshal.dump(event), Marshal.dump(object)] + + executor_class_job.perform_later(*args) + end end end end