From 1bfbfb0317263be46869150f6673f014e8ef0ae8 Mon Sep 17 00:00:00 2001
From: Eugen Rochko <eugen@zeonfederated.com>
Date: Tue, 25 Oct 2022 01:07:00 +0200
Subject: [PATCH] Add deduplication for JSON payloads in job queue

---
 app/workers/activitypub/delivery_worker.rb   |  2 +-
 app/workers/activitypub/processing_worker.rb |  2 +-
 app/workers/concerns/exponential_backoff.rb  | 18 +++++++
 config/initializers/sidekiq.rb               |  5 ++
 lib/argument_deduplication.rb                | 26 +++++++++++
 lib/argument_deduplication/argument.rb       | 49 ++++++++++++++++++++
 lib/argument_deduplication/client.rb         | 25 ++++++++++
 lib/argument_deduplication/server.rb         | 38 +++++++++++++++
 8 files changed, 163 insertions(+), 2 deletions(-)
 create mode 100644 lib/argument_deduplication.rb
 create mode 100644 lib/argument_deduplication/argument.rb
 create mode 100644 lib/argument_deduplication/client.rb
 create mode 100644 lib/argument_deduplication/server.rb

diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb
index d9153132b31..cd265a20d12 100644
--- a/app/workers/activitypub/delivery_worker.rb
+++ b/app/workers/activitypub/delivery_worker.rb
@@ -8,7 +8,7 @@ class ActivityPub::DeliveryWorker
   STOPLIGHT_FAILURE_THRESHOLD = 10
   STOPLIGHT_COOLDOWN = 60
 
-  sidekiq_options queue: 'push', retry: 16, dead: false
+  sidekiq_options queue: 'push', retry: 16, dead: false, deduplicate_arguments: 0
 
   HEADERS = { 'Content-Type' => 'application/activity+json' }.freeze
 
diff --git a/app/workers/activitypub/processing_worker.rb b/app/workers/activitypub/processing_worker.rb
index 4d06ad07960..3fc97fcd15d 100644
--- a/app/workers/activitypub/processing_worker.rb
+++ b/app/workers/activitypub/processing_worker.rb
@@ -3,7 +3,7 @@
 class ActivityPub::ProcessingWorker
   include Sidekiq::Worker
 
-  sidekiq_options backtrace: true, retry: 8
+  sidekiq_options backtrace: true, retry: 8, deduplicate_arguments: 1
 
   def perform(actor_id, body, delivered_to_account_id = nil, actor_type = 'Account')
     case actor_type
diff --git a/app/workers/concerns/exponential_backoff.rb b/app/workers/concerns/exponential_backoff.rb
index f2b931e3312..3a4ebbc9834 100644
--- a/app/workers/concerns/exponential_backoff.rb
+++ b/app/workers/concerns/exponential_backoff.rb
@@ -4,6 +4,24 @@ module ExponentialBackoff
   extend ActiveSupport::Concern
 
   included do
+    #  # | Next retry backoff | Total waiting time
+    # ---|--------------------|--------------------
+    #  1 | 32                 | 32
+    #  2 | 320                | 352
+    #  3 | 1022               | 1374
+    #  4 | 3060               | 4434
+    #  5 | 6778               | 11212
+    #  6 | 16088              | 27300
+    #  7 | 37742              | 65042
+    #  8 | 53799              | 118841
+    #  9 | 105677             | 224518
+    # 10 | 129972             | 354490
+    # 11 | 270226             | 624716
+    # 12 | 301127             | 925843
+    # 13 | 287711             | 1213554
+    # 14 | 616223             | 1829777
+    # 15 | 607261             | 2437038
+    # 16 | 1161984            | 3599022
     sidekiq_retry_in do |count|
       15 + 10 * (count**4) + rand(10 * (count**4))
     end
diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb
index c1327053df7..ac96de85d80 100644
--- a/config/initializers/sidekiq.rb
+++ b/config/initializers/sidekiq.rb
@@ -1,6 +1,7 @@
 # frozen_string_literal: true
 
 require_relative '../../lib/mastodon/sidekiq_middleware'
+require_relative '../../lib/argument_deduplication'
 
 Sidekiq.configure_server do |config|
   config.redis = REDIS_SIDEKIQ_PARAMS
@@ -10,13 +11,16 @@ Sidekiq.configure_server do |config|
   end
 
   config.server_middleware do |chain|
+    chain.add ArgumentDeduplication::Server
     chain.add SidekiqUniqueJobs::Middleware::Server
   end
 
   config.client_middleware do |chain|
+    chain.add ArgumentDeduplication::Client
     chain.add SidekiqUniqueJobs::Middleware::Client
   end
 
+  ArgumentDeduplication.configure(config)
   SidekiqUniqueJobs::Server.configure(config)
 end
 
@@ -24,6 +28,7 @@ Sidekiq.configure_client do |config|
   config.redis = REDIS_SIDEKIQ_PARAMS
 
   config.client_middleware do |chain|
+    chain.add ArgumentDeduplication::Client
     chain.add SidekiqUniqueJobs::Middleware::Client
   end
 end
diff --git a/lib/argument_deduplication.rb b/lib/argument_deduplication.rb
new file mode 100644
index 00000000000..f271b6f969d
--- /dev/null
+++ b/lib/argument_deduplication.rb
@@ -0,0 +1,26 @@
+# frozen_string_literal: true
+
+require_relative './argument_deduplication/argument'
+require_relative './argument_deduplication/server'
+require_relative './argument_deduplication/client'
+
+module ArgumentDeduplication
+  class CorruptedArgumentError < ::RuntimeError; end
+
+  PREFIX = 'argument_store'
+
+  # The time-to-live is based on the maximum amount of time
+  # a job can possibly spend in the retry queue, assuming
+  # the exponential backoff algorithm and a maximum number
+  # of 16 retries. It is intended as a safe-guard against
+  # any arguments being orphaned due to interruptions.
+  TTL = 50.days.to_i
+
+  DEATH_HANDLER = ->(job) {
+    Argument.new(job['args'][job['deduplicate_arguments']]).pop! if job['deduplicate_arguments']
+  }.freeze
+
+  def self.configure(config)
+    config.death_handlers << DEATH_HANDLER
+  end
+end
diff --git a/lib/argument_deduplication/argument.rb b/lib/argument_deduplication/argument.rb
new file mode 100644
index 00000000000..60be6bca211
--- /dev/null
+++ b/lib/argument_deduplication/argument.rb
@@ -0,0 +1,49 @@
+# frozen_string_literal: true
+
+module ArgumentDeduplication
+  class Argument
+    def self.from_value(value)
+      new(Digest::SHA256.base64digest(value), value)
+    end
+
+    attr_reader :content_hash, :value
+
+    def initialize(content_hash, value)
+      @content_hash = content_hash
+      @value = value
+    end
+
+    def push!
+      with_redis do |redis|
+        redis.multi do |transaction|
+          transaction.set("#{PREFIX}:value:#{content_hash}", value, ex: TTL)
+          transaction.incr("#{PREFIX}:refcount:#{content_hash}")
+          transaction.expire("#{PREFIX}:refcount:#{content_hash}", TTL)
+        end
+      end
+    end
+
+    def pop!
+      with_redis do |redis|
+        redis.decr("#{PREFIX}:refcount:#{content_hash}")
+
+        redis.watch("#{PREFIX}:refcount:#{content_hash}") do
+          if redis.get("#{PREFIX}:refcount:#{content_hash}").to_i <= 0
+            redis.multi do |transaction|
+              transaction.del("#{PREFIX}:refcount:#{content_hash}")
+              transaction.del("#{PREFIX}:value:#{content_hash}")
+            end
+          else
+            redis.unwatch
+          end
+        end
+      end
+    end
+
+    private
+
+    def with_redis(&block)
+      Sidekiq.redis(&block)
+    end
+  end
+end
diff --git a/lib/argument_deduplication/client.rb b/lib/argument_deduplication/client.rb
new file mode 100644
index 00000000000..b4c6f599928
--- /dev/null
+++ b/lib/argument_deduplication/client.rb
@@ -0,0 +1,25 @@
+# frozen_string_literal: true
+
+module ArgumentDeduplication
+  class Client
+    include Sidekiq::ClientMiddleware
+
+    def call(_worker, job, _queue, _redis_pool)
+      process_arguments!(job)
+      yield
+    end
+
+    private
+
+    def process_arguments!(job)
+      return unless job['deduplicate_arguments']
+
+      argument_index = job['deduplicate_arguments']
+      argument       = Argument.from_value(job['args'][argument_index])
+
+      argument.push!
+
+      job['args'][argument_index] = argument.content_hash
+    end
+  end
+end
diff --git a/lib/argument_deduplication/server.rb b/lib/argument_deduplication/server.rb
new file mode 100644
index 00000000000..0da4d1c6215
--- /dev/null
+++ b/lib/argument_deduplication/server.rb
@@ -0,0 +1,38 @@
+# frozen_string_literal: true
+
+module ArgumentDeduplication
+  class Server
+    include Sidekiq::ServerMiddleware
+
+    def call(_worker, job, _queue)
+      argument = process_argument!(job)
+
+      yield
+
+      # If the job completes successfully, we can remove
+      # the argument from the store. If there is an exception,
+      # the job will be retried, so we can't remove the argument
+      # from the store yet. When retries are exhausted, or when
+      # retries are disabled for the worker, the configured death
+      # handler will remove it.
+
+      argument&.pop!
+    end
+
+    private
+
+    def process_argument!(job)
+      return unless job['deduplicate_arguments']
+
+      argument_index = job['deduplicate_arguments']
+      content_hash   = job['args'][argument_index]
+      value          = Sidekiq.redis { |redis| redis.get("#{PREFIX}:value:#{content_hash}") }
+
+      raise CorruptedArgumentError, "The argument for hash #{content_hash} could not be found" if value.nil?
+
+      job['args'][argument_index] = value
+
+      Argument.new(content_hash, value)
+    end
+  end
+end
-- 
GitLab