Skip to content

Commit

Permalink
refactoring, add REAME and specs
Browse files Browse the repository at this point in the history
  • Loading branch information
prog-supdex committed Sep 26, 2023
1 parent 646d680 commit e94d36f
Show file tree
Hide file tree
Showing 15 changed files with 419 additions and 206 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,34 @@ Or you can use `collect`
end
```

## Async delivering messages

If you use `Rails` application or you use `ActiveJob`, you can deliver messages using `ActiveJob`

### Configuration

You have the next configuration

```ruby
GraphQL::AnyCable.configure do |config|
# ... other configurations
config.delivery_method = "inline" # the default value "inline", also can be "active_job"
config.queue = "default" # the name of ActiveJob queue
config.job_class = "GraphQL::Adapters::TriggerJob" # the name executor job
end
```

`delivery_method` can be either `inline` or `active_job`.
`inline` means that delivering messaging will work sync.
`active_job` - It will adds delivering messages operations to `ActiveJob` with queue `default` and using job `GraphQL::Adapters::TriggerJob`

You can change the queue or job_class, buy changing it in configuration

Or you can run code

```ruby
GraphQL::AnyCable.delivery_method("active_job", queue: "broadcasting", job_class: "GraphQL::Adapters::TriggerJob")
```

## Testing applications which use `graphql-anycable`

Expand Down
3 changes: 1 addition & 2 deletions graphql-anycable.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ Gem::Specification.new do |spec|
spec.add_dependency "anyway_config", ">= 1.3", "< 3"
spec.add_dependency "graphql", ">= 1.11", "< 3"
spec.add_dependency "redis", ">= 4.2.0"
spec.add_dependency "activejob", ">= 5.2.0"

spec.add_development_dependency "anycable-rails"
spec.add_development_dependency "bundler", "~> 2.0"
spec.add_development_dependency "rack"
spec.add_development_dependency "railties"
spec.add_development_dependency "rake", ">= 12.3.3"
spec.add_development_dependency "rspec", "~> 3.0"
spec.add_development_dependency "rspec-rails", "~> 5.0"
spec.add_development_dependency "activejob", ">= 6.0.0"
end
12 changes: 11 additions & 1 deletion lib/graphql-anycable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
require_relative "graphql/anycable/cleaner"
require_relative "graphql/anycable/config"
require_relative "graphql/anycable/railtie" if defined?(Rails)
require_relative "graphql/adapters/base_job"
require_relative "graphql/anycable/stats"
require_relative "graphql/adapters/delivery_adapter"
require_relative "graphql/subscriptions/anycable_subscriptions"

module GraphQL
Expand All @@ -26,6 +26,16 @@ def self.stats(**options)
Stats.new(**options).collect
end

def self.delivery_method(kind, queue: "default", job_class: "GraphQL::Adapters::TriggerJob")
config.delivery_method = kind
config.queue = queue
config.job_class = job_class
end

def self.delivery_adapter(object)
Adapters::DeliveryAdapter.new(executor_object: object)
end

module_function

def redis
Expand Down
30 changes: 0 additions & 30 deletions lib/graphql/adapters/base_job.rb

This file was deleted.

48 changes: 48 additions & 0 deletions lib/graphql/adapters/delivery_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# frozen_string_literal: true

module GraphQL
module Adapters
class DeliveryAdapter
DELIVERY_METHOD_INLINE = "inline"
DELIVERY_METHOD_ACTIVE_JOB = "active_job"

attr_reader :executor_object, :executor_method

def initialize(executor_object:)
@executor_object = executor_object
@executor_method = executor_object.class::EXECUTOR_METHOD_NAME
end

def trigger(event_name, args, object, **elements)
if config.delivery_method == DELIVERY_METHOD_INLINE
return executor_object.public_send(executor_method, event_name, args, object, **elements)
end

return if config.delivery_method != DELIVERY_METHOD_ACTIVE_JOB

executor_class_job.set(queue: config.queue).perform_later(
executor_object.collected_arguments,
executor_method,
event_name,
args,
object,
elements
)
end

private

def executor_class_job
custom_job_class = config.job_class

return Adapters::TriggerJob unless custom_job_class

custom_job_class.constantize
end

def config
GraphQL::AnyCable.config
end
end
end
end
24 changes: 24 additions & 0 deletions lib/graphql/adapters/trigger_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

module GraphQL
module Adapters
class TriggerJob < ActiveJob::Base
def perform(payload, execute_method, event_name, args = {}, object = nil, options = {})
schema = schema_parse(payload)

schema.public_send(execute_method, event_name, args, object, **options)
end

private

def schema_parse(serialized_payload)
payload = serialized_payload

payload[:schema] = payload[:schema].safe_constantize
payload[:serializer] = payload[:serializer].safe_constantize

GraphQL::Subscriptions::AnyCableSubscriptions.new(**payload)
end
end
end
end
3 changes: 1 addition & 2 deletions lib/graphql/anycable/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ class Config < Anyway::Config
attr_config use_client_provided_uniq_id: true
attr_config redis_prefix: "graphql" # Here, we set clear redis_prefix without any hyphen. The hyphen is added at the end of this value on our side.

attr_config use_async_broadcasting: true
attr_config async_broadcasting: { queue: "broadcasting", class: "GraphQL::Adapters::BaseJob" }
attr_config delivery_method: "inline", queue: "default", job_class: "GraphQL::Adapters::TriggerJob"
end
end
end
7 changes: 7 additions & 0 deletions lib/graphql/anycable/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
module GraphQL
module AnyCable
class Railtie < ::Rails::Railtie
initializer 'graphql_anycable.load_trigger_job' do
#ActiveSupport.on_load(:active_job) do
require "graphql/adapters/trigger_job" if defined?(ActiveJob::Base)
#end
end


rake_tasks do
path = File.expand_path(__dir__)
Dir.glob("#{path}/tasks/**/*.rake").each { |f| load f }
Expand Down
41 changes: 6 additions & 35 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class AnyCableSubscriptions < GraphQL::Subscriptions
def_delegators :"GraphQL::AnyCable", :redis, :config
alias_method :trigger_sync, :trigger

attr_reader :collected_arguments

SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, …
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
Expand All @@ -67,7 +69,7 @@ class AnyCableSubscriptions < GraphQL::Subscriptions
def initialize(serializer: Serialize, **rest)
@serializer = serializer

@serialized_arguments = serialize_arguments(serializer, rest)
@collected_arguments = collect_arguments(serializer, rest)

super
end
Expand Down Expand Up @@ -212,24 +214,11 @@ def delete_channel_subscriptions(channel_or_id)
end

def trigger(event_name, args, object, **elements)
unless config.use_async_broadcasting
return trigger_sync(event_name, args, object, **elements)
end

executor_class_job.perform_later(
serialized_arguments,
EXECUTOR_METHOD_NAME,
event_name,
args,
object,
elements
)
AnyCable.delivery_adapter(self).trigger(event_name, args, object, **elements)
end

private

attr_reader :serialized_arguments

def anycable
@anycable ||= ::AnyCable.broadcast_adapter
end
Expand Down Expand Up @@ -266,31 +255,13 @@ def redis_key(prefix)
"#{config.redis_prefix}-#{prefix}"
end

def executor_class_job
custom_class = config.async_broadcasting["class"]

return Adapters::BaseJob unless custom_class

Object.const_get(config.async_broadcasting["class"])
end

def perform(event, object)
unless config.use_async_broadcasting
return public_send(EXECUTOR_METHOD_NAME, event, object)
end

args = [Marshal.dump(self), EXECUTOR_METHOD_NAME, Marshal.dump(event), Marshal.dump(object)]

executor_class_job.perform_later(*args)
end

def serialize_arguments(serializer, payload)
def collect_arguments(serializer, payload)
payload = payload.dup

payload[:serializer] = serializer.to_s
payload[:schema] = payload[:schema].to_s

JSON.dump(payload)
payload
end
end
end
Expand Down
100 changes: 0 additions & 100 deletions spec/adapters/base_job_spec.rb

This file was deleted.

Loading

0 comments on commit e94d36f

Please sign in to comment.