From 1f09bf0c8648d8c1282145a7f3f6f441d96a14d2 Mon Sep 17 00:00:00 2001
From: Eugen Rochko <eugen@zeonfederated.com>
Date: Thu, 10 Feb 2022 20:44:03 +0100
Subject: [PATCH] Fix redis pipeline usage

---
 app/lib/feed_manager.rb                       | 12 ++++++------
 .../follow_recommendation_suppression.rb      |  4 ++--
 app/models/trends/base.rb                     |  3 ++-
 app/models/trends/history.rb                  | 10 +++++-----
 app/models/trends/links.rb                    | 10 +++++-----
 app/models/trends/statuses.rb                 | 10 +++++-----
 app/models/trends/tags.rb                     |  2 +-
 app/services/batched_remove_status_service.rb | 19 ++++++++++---------
 8 files changed, 36 insertions(+), 34 deletions(-)

diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb
index 46a55c7a439..c060b5a9b6f 100644
--- a/app/lib/feed_manager.rb
+++ b/app/lib/feed_manager.rb
@@ -271,24 +271,24 @@ class FeedManager
   def clean_feeds!(type, ids)
     reblogged_id_sets = {}
 
-    redis.pipelined do
+    redis.pipelined do |pipeline|
       ids.each do |feed_id|
-        redis.del(key(type, feed_id))
+        pipeline.del(key(type, feed_id))
         reblog_key = key(type, feed_id, 'reblogs')
         # We collect a future for this: we don't block while getting
         # it, but we can iterate over it later.
-        reblogged_id_sets[feed_id] = redis.zrange(reblog_key, 0, -1)
-        redis.del(reblog_key)
+        reblogged_id_sets[feed_id] = pipeline.zrange(reblog_key, 0, -1)
+        pipeline.del(reblog_key)
       end
     end
 
     # Remove all of the reblog tracking keys we just removed the
     # references to.
-    redis.pipelined do
+    redis.pipelined do |pipeline|
       reblogged_id_sets.each do |feed_id, future|
         future.value.each do |reblogged_id|
           reblog_set_key = key(type, feed_id, "reblogs:#{reblogged_id}")
-          redis.del(reblog_set_key)
+          pipeline.del(reblog_set_key)
         end
       end
     end
diff --git a/app/models/follow_recommendation_suppression.rb b/app/models/follow_recommendation_suppression.rb
index 170506b853f..3b503c9c706 100644
--- a/app/models/follow_recommendation_suppression.rb
+++ b/app/models/follow_recommendation_suppression.rb
@@ -19,9 +19,9 @@ class FollowRecommendationSuppression < ApplicationRecord
   private
 
   def remove_follow_recommendations
-    redis.pipelined do
+    redis.pipelined do |pipeline|
       I18n.available_locales.each do |locale|
-        redis.zrem("follow_recommendations:#{locale}", account_id)
+        pipeline.zrem("follow_recommendations:#{locale}", account_id)
       end
     end
   end
diff --git a/app/models/trends/base.rb b/app/models/trends/base.rb
index 7ed13228dfd..199a55e6fa9 100644
--- a/app/models/trends/base.rb
+++ b/app/models/trends/base.rb
@@ -73,10 +73,11 @@ class Trends::Base
     redis.zrevrange("#{key_prefix}:allowed", 0, rank, with_scores: true).last&.last || 0
   end
 
+  # @param [Redis] redis
   # @param [Integer] id
   # @param [Float] score
   # @param [Hash<String, Boolean>] subsets
-  def add_to_and_remove_from_subsets(id, score, subsets = {})
+  def add_to_and_remove_from_subsets(redis, id, score, subsets = {})
     subsets.each_key do |subset|
       key = [key_prefix, subset].compact.join(':')
 
diff --git a/app/models/trends/history.rb b/app/models/trends/history.rb
index 608e337924d..0ee21cee0ca 100644
--- a/app/models/trends/history.rb
+++ b/app/models/trends/history.rb
@@ -41,11 +41,11 @@ class Trends::History
     end
 
     def add(account_id)
-      redis.pipelined do
-        redis.incrby(key_for(:uses), 1)
-        redis.pfadd(key_for(:accounts), account_id)
-        redis.expire(key_for(:uses), EXPIRE_AFTER)
-        redis.expire(key_for(:accounts), EXPIRE_AFTER)
+      redis.pipelined do |pipeline|
+        pipeline.incrby(key_for(:uses), 1)
+        pipeline.pfadd(key_for(:accounts), account_id)
+        pipeline.expire(key_for(:uses), EXPIRE_AFTER)
+        pipeline.expire(key_for(:accounts), EXPIRE_AFTER)
       end
     end
 
diff --git a/app/models/trends/links.rb b/app/models/trends/links.rb
index 62308e7067b..b75b4007764 100644
--- a/app/models/trends/links.rb
+++ b/app/models/trends/links.rb
@@ -88,14 +88,14 @@ class Trends::Links < Trends::Base
 
       decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f))
 
-      add_to_and_remove_from_subsets(preview_card.id, decaying_score, {
+      add_to_and_remove_from_subsets(redis, preview_card.id, decaying_score, {
         all: true,
         allowed: preview_card.trendable?,
       })
 
       next unless valid_locale?(preview_card.language)
 
-      add_to_and_remove_from_subsets(preview_card.id, decaying_score, {
+      add_to_and_remove_from_subsets(redis, preview_card.id, decaying_score, {
         "all:#{preview_card.language}" => true,
         "allowed:#{preview_card.language}" => preview_card.trendable?,
       })
@@ -105,10 +105,10 @@ class Trends::Links < Trends::Base
     # set. We do this instead of just deleting the localized sets to avoid
     # having moments where the API returns empty results
 
-    redis.pipelined do
+    redis.pipelined do |pipeline|
       Trends.available_locales.each do |locale|
-        redis.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
-        redis.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
+        pipeline.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
+        pipeline.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
       end
     end
   end
diff --git a/app/models/trends/statuses.rb b/app/models/trends/statuses.rb
index e785413ec47..744fcfb938e 100644
--- a/app/models/trends/statuses.rb
+++ b/app/models/trends/statuses.rb
@@ -97,7 +97,7 @@ class Trends::Statuses < Trends::Base
   end
 
   def calculate_scores(statuses, at_time)
-    redis.pipelined do
+    redis.pipelined do |pipeline|
       statuses.each do |status|
         expected  = 1.0
         observed  = (status.reblogs_count + status.favourites_count).to_f
@@ -112,14 +112,14 @@ class Trends::Statuses < Trends::Base
 
         decaying_score = score * (0.5**((at_time.to_f - status.created_at.to_f) / options[:score_halflife].to_f))
 
-        add_to_and_remove_from_subsets(status.id, decaying_score, {
+        add_to_and_remove_from_subsets(pipeline, status.id, decaying_score, {
           all: true,
           allowed: status.trendable? && status.account.discoverable?,
         })
 
         next unless valid_locale?(status.language)
 
-        add_to_and_remove_from_subsets(status.id, decaying_score, {
+        add_to_and_remove_from_subsets(pipeline, status.id, decaying_score, {
           "all:#{status.language}" => true,
           "allowed:#{status.language}" => status.trendable? && status.account.discoverable?,
         })
@@ -130,8 +130,8 @@ class Trends::Statuses < Trends::Base
       # having moments where the API returns empty results
 
       Trends.available_locales.each do |locale|
-        redis.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
-        redis.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
+        pipeline.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
+        pipeline.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
       end
     end
   end
diff --git a/app/models/trends/tags.rb b/app/models/trends/tags.rb
index 3caa58815d2..ba2a8560f0c 100644
--- a/app/models/trends/tags.rb
+++ b/app/models/trends/tags.rb
@@ -79,7 +79,7 @@ class Trends::Tags < Trends::Base
 
       decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f))
 
-      add_to_and_remove_from_subsets(tag.id, decaying_score, {
+      add_to_and_remove_from_subsets(redis, tag.id, decaying_score, {
         all: true,
         allowed: tag.trendable?,
       })
diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb
index 5000062e41a..9d106801d55 100644
--- a/app/services/batched_remove_status_service.rb
+++ b/app/services/batched_remove_status_service.rb
@@ -47,9 +47,10 @@ class BatchedRemoveStatusService < BaseService
 
     # Cannot be batched
     @status_id_cutoff = Mastodon::Snowflake.id_at(2.weeks.ago)
-    redis.pipelined do
+
+    redis.pipelined do |pipeline|
       statuses.each do |status|
-        unpush_from_public_timelines(status)
+        unpush_from_public_timelines(pipeline, status)
       end
     end
   end
@@ -72,22 +73,22 @@ class BatchedRemoveStatusService < BaseService
     end
   end
 
-  def unpush_from_public_timelines(status)
+  def unpush_from_public_timelines(pipeline, status)
     return unless status.public_visibility? && status.id > @status_id_cutoff
 
     payload = Oj.dump(event: :delete, payload: status.id.to_s)
 
-    redis.publish('timeline:public', payload)
-    redis.publish(status.local? ? 'timeline:public:local' : 'timeline:public:remote', payload)
+    pipeline.publish('timeline:public', payload)
+    pipeline.publish(status.local? ? 'timeline:public:local' : 'timeline:public:remote', payload)
 
     if status.media_attachments.any?
-      redis.publish('timeline:public:media', payload)
-      redis.publish(status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', payload)
+      pipeline.publish('timeline:public:media', payload)
+      pipeline.publish(status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', payload)
     end
 
     status.tags.map { |tag| tag.name.mb_chars.downcase }.each do |hashtag|
-      redis.publish("timeline:hashtag:#{hashtag}", payload)
-      redis.publish("timeline:hashtag:#{hashtag}:local", payload) if status.local?
+      pipeline.publish("timeline:hashtag:#{hashtag}", payload)
+      pipeline.publish("timeline:hashtag:#{hashtag}:local", payload) if status.local?
     end
   end
 end
-- 
GitLab