diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index 46a55c7a4399a353d2fddd91786d3ef0056fed71..c060b5a9b6fc91622f2068393dfb826cef82d6e9 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -271,24 +271,24 @@ class FeedManager def clean_feeds!(type, ids) reblogged_id_sets = {} - redis.pipelined do + redis.pipelined do |pipeline| ids.each do |feed_id| - redis.del(key(type, feed_id)) + pipeline.del(key(type, feed_id)) reblog_key = key(type, feed_id, 'reblogs') # We collect a future for this: we don't block while getting # it, but we can iterate over it later. - reblogged_id_sets[feed_id] = redis.zrange(reblog_key, 0, -1) - redis.del(reblog_key) + reblogged_id_sets[feed_id] = pipeline.zrange(reblog_key, 0, -1) + pipeline.del(reblog_key) end end # Remove all of the reblog tracking keys we just removed the # references to. - redis.pipelined do + redis.pipelined do |pipeline| reblogged_id_sets.each do |feed_id, future| future.value.each do |reblogged_id| reblog_set_key = key(type, feed_id, "reblogs:#{reblogged_id}") - redis.del(reblog_set_key) + pipeline.del(reblog_set_key) end end end diff --git a/app/models/follow_recommendation_suppression.rb b/app/models/follow_recommendation_suppression.rb index 170506b853f26a870fed7fb5026dec8e4e9b6887..3b503c9c70699afe9c4f6b457de59168e561850e 100644 --- a/app/models/follow_recommendation_suppression.rb +++ b/app/models/follow_recommendation_suppression.rb @@ -19,9 +19,9 @@ class FollowRecommendationSuppression < ApplicationRecord private def remove_follow_recommendations - redis.pipelined do + redis.pipelined do |pipeline| I18n.available_locales.each do |locale| - redis.zrem("follow_recommendations:#{locale}", account_id) + pipeline.zrem("follow_recommendations:#{locale}", account_id) end end end diff --git a/app/models/trends/base.rb b/app/models/trends/base.rb index 7ed13228dfd66842864a1d2e692aea46b6194972..199a55e6fa9cdf59537102d6de63d9ef72cdeb81 100644 --- a/app/models/trends/base.rb +++ b/app/models/trends/base.rb @@ -73,10 +73,11 @@ class Trends::Base redis.zrevrange("#{key_prefix}:allowed", 0, rank, with_scores: true).last&.last || 0 end + # @param [Redis] redis # @param [Integer] id # @param [Float] score # @param [Hash<String, Boolean>] subsets - def add_to_and_remove_from_subsets(id, score, subsets = {}) + def add_to_and_remove_from_subsets(redis, id, score, subsets = {}) subsets.each_key do |subset| key = [key_prefix, subset].compact.join(':') diff --git a/app/models/trends/history.rb b/app/models/trends/history.rb index 608e337924d469db0aa48f1b7bc818fb02db3a8b..0ee21cee0ca6f8bd7ea6ceb7d1ab9a66f7843af7 100644 --- a/app/models/trends/history.rb +++ b/app/models/trends/history.rb @@ -41,11 +41,11 @@ class Trends::History end def add(account_id) - redis.pipelined do - redis.incrby(key_for(:uses), 1) - redis.pfadd(key_for(:accounts), account_id) - redis.expire(key_for(:uses), EXPIRE_AFTER) - redis.expire(key_for(:accounts), EXPIRE_AFTER) + redis.pipelined do |pipeline| + pipeline.incrby(key_for(:uses), 1) + pipeline.pfadd(key_for(:accounts), account_id) + pipeline.expire(key_for(:uses), EXPIRE_AFTER) + pipeline.expire(key_for(:accounts), EXPIRE_AFTER) end end diff --git a/app/models/trends/links.rb b/app/models/trends/links.rb index 62308e7067bbade3ff1efe8c60b708df9da5793d..b75b4007764cfb86faaec635c98d3d55e986b107 100644 --- a/app/models/trends/links.rb +++ b/app/models/trends/links.rb @@ -88,14 +88,14 @@ class Trends::Links < Trends::Base decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f)) - add_to_and_remove_from_subsets(preview_card.id, decaying_score, { + add_to_and_remove_from_subsets(redis, preview_card.id, decaying_score, { all: true, allowed: preview_card.trendable?, }) next unless valid_locale?(preview_card.language) - add_to_and_remove_from_subsets(preview_card.id, decaying_score, { + add_to_and_remove_from_subsets(redis, preview_card.id, decaying_score, { "all:#{preview_card.language}" => true, "allowed:#{preview_card.language}" => preview_card.trendable?, }) @@ -105,10 +105,10 @@ class Trends::Links < Trends::Base # set. We do this instead of just deleting the localized sets to avoid # having moments where the API returns empty results - redis.pipelined do + redis.pipelined do |pipeline| Trends.available_locales.each do |locale| - redis.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max') - redis.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max') + pipeline.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max') + pipeline.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max') end end end diff --git a/app/models/trends/statuses.rb b/app/models/trends/statuses.rb index e785413ec471bc9806074d9a3efc626c59e287f9..744fcfb938e9e0911fe5b1caf990ae69462ab186 100644 --- a/app/models/trends/statuses.rb +++ b/app/models/trends/statuses.rb @@ -97,7 +97,7 @@ class Trends::Statuses < Trends::Base end def calculate_scores(statuses, at_time) - redis.pipelined do + redis.pipelined do |pipeline| statuses.each do |status| expected = 1.0 observed = (status.reblogs_count + status.favourites_count).to_f @@ -112,14 +112,14 @@ class Trends::Statuses < Trends::Base decaying_score = score * (0.5**((at_time.to_f - status.created_at.to_f) / options[:score_halflife].to_f)) - add_to_and_remove_from_subsets(status.id, decaying_score, { + add_to_and_remove_from_subsets(pipeline, status.id, decaying_score, { all: true, allowed: status.trendable? && status.account.discoverable?, }) next unless valid_locale?(status.language) - add_to_and_remove_from_subsets(status.id, decaying_score, { + add_to_and_remove_from_subsets(pipeline, status.id, decaying_score, { "all:#{status.language}" => true, "allowed:#{status.language}" => status.trendable? && status.account.discoverable?, }) @@ -130,8 +130,8 @@ class Trends::Statuses < Trends::Base # having moments where the API returns empty results Trends.available_locales.each do |locale| - redis.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max') - redis.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max') + pipeline.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max') + pipeline.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max') end end end diff --git a/app/models/trends/tags.rb b/app/models/trends/tags.rb index 3caa58815d2c30a8606f1179dc6077209a2be4ef..ba2a8560f0cad791f2983bc3e8ad41029ae7ecaa 100644 --- a/app/models/trends/tags.rb +++ b/app/models/trends/tags.rb @@ -79,7 +79,7 @@ class Trends::Tags < Trends::Base decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f)) - add_to_and_remove_from_subsets(tag.id, decaying_score, { + add_to_and_remove_from_subsets(redis, tag.id, decaying_score, { all: true, allowed: tag.trendable?, }) diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb index 5000062e41a5035af4a9f08c0937491864ff98e7..9d106801d5569aa9af36fcc7c1a49047bca89487 100644 --- a/app/services/batched_remove_status_service.rb +++ b/app/services/batched_remove_status_service.rb @@ -47,9 +47,10 @@ class BatchedRemoveStatusService < BaseService # Cannot be batched @status_id_cutoff = Mastodon::Snowflake.id_at(2.weeks.ago) - redis.pipelined do + + redis.pipelined do |pipeline| statuses.each do |status| - unpush_from_public_timelines(status) + unpush_from_public_timelines(pipeline, status) end end end @@ -72,22 +73,22 @@ class BatchedRemoveStatusService < BaseService end end - def unpush_from_public_timelines(status) + def unpush_from_public_timelines(pipeline, status) return unless status.public_visibility? && status.id > @status_id_cutoff payload = Oj.dump(event: :delete, payload: status.id.to_s) - redis.publish('timeline:public', payload) - redis.publish(status.local? ? 'timeline:public:local' : 'timeline:public:remote', payload) + pipeline.publish('timeline:public', payload) + pipeline.publish(status.local? ? 'timeline:public:local' : 'timeline:public:remote', payload) if status.media_attachments.any? - redis.publish('timeline:public:media', payload) - redis.publish(status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', payload) + pipeline.publish('timeline:public:media', payload) + pipeline.publish(status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', payload) end status.tags.map { |tag| tag.name.mb_chars.downcase }.each do |hashtag| - redis.publish("timeline:hashtag:#{hashtag}", payload) - redis.publish("timeline:hashtag:#{hashtag}:local", payload) if status.local? + pipeline.publish("timeline:hashtag:#{hashtag}", payload) + pipeline.publish("timeline:hashtag:#{hashtag}:local", payload) if status.local? end end end