Fix posts from threads received out-of-order sometimes not being inserted into timelines (#27653)
This commit is contained in:
		
							parent
							
								
									2aa28e06d1
								
							
						
					
					
						commit
						0337df3a42
					
				| 
						 | 
				
			
			@ -8,6 +8,7 @@ class FanOutOnWriteService < BaseService
 | 
			
		|||
  # @param [Hash] options
 | 
			
		||||
  # @option options [Boolean] update
 | 
			
		||||
  # @option options [Array<Integer>] silenced_account_ids
 | 
			
		||||
  # @option options [Boolean] skip_notifications
 | 
			
		||||
  def call(status, options = {})
 | 
			
		||||
    @status    = status
 | 
			
		||||
    @account   = status.account
 | 
			
		||||
| 
						 | 
				
			
			@ -37,8 +38,11 @@ class FanOutOnWriteService < BaseService
 | 
			
		|||
 | 
			
		||||
  def fan_out_to_local_recipients!
 | 
			
		||||
    deliver_to_self!
 | 
			
		||||
    notify_mentioned_accounts!
 | 
			
		||||
    notify_about_update! if update?
 | 
			
		||||
 | 
			
		||||
    unless @options[:skip_notifications]
 | 
			
		||||
      notify_mentioned_accounts!
 | 
			
		||||
      notify_about_update! if update?
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    case @status.visibility.to_sym
 | 
			
		||||
    when :public, :unlisted, :private
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -7,13 +7,18 @@ class ThreadResolveWorker
 | 
			
		|||
  sidekiq_options queue: 'pull', retry: 3
 | 
			
		||||
 | 
			
		||||
  def perform(child_status_id, parent_url, options = {})
 | 
			
		||||
    child_status  = Status.find(child_status_id)
 | 
			
		||||
    parent_status = FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
 | 
			
		||||
    child_status = Status.find(child_status_id)
 | 
			
		||||
    return if child_status.in_reply_to_id.present?
 | 
			
		||||
 | 
			
		||||
    parent_status = ActivityPub::TagManager.instance.uri_to_resource(parent_url, Status)
 | 
			
		||||
    parent_status ||= FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
 | 
			
		||||
 | 
			
		||||
    return if parent_status.nil?
 | 
			
		||||
 | 
			
		||||
    child_status.thread = parent_status
 | 
			
		||||
    child_status.save!
 | 
			
		||||
 | 
			
		||||
    DistributionWorker.perform_async(child_status_id, { 'skip_notifications' => true }) if child_status.within_realtime_window?
 | 
			
		||||
  rescue ActiveRecord::RecordNotFound
 | 
			
		||||
    true
 | 
			
		||||
  end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -23,6 +23,109 @@ RSpec.describe ActivityPub::Activity::Create do
 | 
			
		|||
    stub_request(:get, 'http://example.com/emojib.png').to_return(body: attachment_fixture('emojo.png'), headers: { 'Content-Type' => 'application/octet-stream' })
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe 'processing posts received out of order' do
 | 
			
		||||
    let(:follower) { Fabricate(:account, username: 'bob') }
 | 
			
		||||
 | 
			
		||||
    let(:object_json) do
 | 
			
		||||
      {
 | 
			
		||||
        id: [ActivityPub::TagManager.instance.uri_for(sender), 'post1'].join('/'),
 | 
			
		||||
        type: 'Note',
 | 
			
		||||
        to: [
 | 
			
		||||
          'https://www.w3.org/ns/activitystreams#Public',
 | 
			
		||||
          ActivityPub::TagManager.instance.uri_for(follower),
 | 
			
		||||
        ],
 | 
			
		||||
        content: '@bob lorem ipsum',
 | 
			
		||||
        published: 1.hour.ago.utc.iso8601,
 | 
			
		||||
        updated: 1.hour.ago.utc.iso8601,
 | 
			
		||||
        tag: {
 | 
			
		||||
          type: 'Mention',
 | 
			
		||||
          href: ActivityPub::TagManager.instance.uri_for(follower),
 | 
			
		||||
        },
 | 
			
		||||
      }
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    let(:reply_json) do
 | 
			
		||||
      {
 | 
			
		||||
        id: [ActivityPub::TagManager.instance.uri_for(sender), 'reply'].join('/'),
 | 
			
		||||
        type: 'Note',
 | 
			
		||||
        inReplyTo: object_json[:id],
 | 
			
		||||
        to: [
 | 
			
		||||
          'https://www.w3.org/ns/activitystreams#Public',
 | 
			
		||||
          ActivityPub::TagManager.instance.uri_for(follower),
 | 
			
		||||
        ],
 | 
			
		||||
        content: '@bob lorem ipsum',
 | 
			
		||||
        published: Time.now.utc.iso8601,
 | 
			
		||||
        updated: Time.now.utc.iso8601,
 | 
			
		||||
        tag: {
 | 
			
		||||
          type: 'Mention',
 | 
			
		||||
          href: ActivityPub::TagManager.instance.uri_for(follower),
 | 
			
		||||
        },
 | 
			
		||||
      }
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    def activity_for_object(json)
 | 
			
		||||
      {
 | 
			
		||||
        '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
        id: [json[:id], 'activity'].join('/'),
 | 
			
		||||
        type: 'Create',
 | 
			
		||||
        actor: ActivityPub::TagManager.instance.uri_for(sender),
 | 
			
		||||
        object: json,
 | 
			
		||||
      }.with_indifferent_access
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    before do
 | 
			
		||||
      follower.follow!(sender)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    around do |example|
 | 
			
		||||
      Sidekiq::Testing.fake! do
 | 
			
		||||
        example.run
 | 
			
		||||
        Sidekiq::Worker.clear_all
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    it 'correctly processes posts and inserts them in timelines', :aggregate_failures do
 | 
			
		||||
      # Simulate a temporary failure preventing from fetching the parent post
 | 
			
		||||
      stub_request(:get, object_json[:id]).to_return(status: 500)
 | 
			
		||||
 | 
			
		||||
      # When receiving the reply…
 | 
			
		||||
      described_class.new(activity_for_object(reply_json), sender, delivery: true).perform
 | 
			
		||||
 | 
			
		||||
      # NOTE: Refering explicitly to the workers is a bit awkward
 | 
			
		||||
      DistributionWorker.drain
 | 
			
		||||
      FeedInsertWorker.drain
 | 
			
		||||
 | 
			
		||||
      # …it creates a status with an unknown parent
 | 
			
		||||
      reply = Status.find_by(uri: reply_json[:id])
 | 
			
		||||
      expect(reply.reply?).to be true
 | 
			
		||||
      expect(reply.in_reply_to_id).to be_nil
 | 
			
		||||
 | 
			
		||||
      # …and creates a notification
 | 
			
		||||
      expect(LocalNotificationWorker.jobs.size).to eq 1
 | 
			
		||||
 | 
			
		||||
      # …but does not insert it into timelines
 | 
			
		||||
      expect(redis.zscore(FeedManager.instance.key(:home, follower.id), reply.id)).to be_nil
 | 
			
		||||
 | 
			
		||||
      # When receiving the parent…
 | 
			
		||||
      described_class.new(activity_for_object(object_json), sender, delivery: true).perform
 | 
			
		||||
 | 
			
		||||
      Sidekiq::Worker.drain_all
 | 
			
		||||
 | 
			
		||||
      # …it creates a status and insert it into timelines
 | 
			
		||||
      parent = Status.find_by(uri: object_json[:id])
 | 
			
		||||
      expect(parent.reply?).to be false
 | 
			
		||||
      expect(parent.in_reply_to_id).to be_nil
 | 
			
		||||
      expect(reply.reload.in_reply_to_id).to eq parent.id
 | 
			
		||||
 | 
			
		||||
      # Check that the both statuses have been inserted into the home feed
 | 
			
		||||
      expect(redis.zscore(FeedManager.instance.key(:home, follower.id), parent.id)).to be_within(0.1).of(parent.id.to_f)
 | 
			
		||||
      expect(redis.zscore(FeedManager.instance.key(:home, follower.id), reply.id)).to be_within(0.1).of(reply.id.to_f)
 | 
			
		||||
 | 
			
		||||
      # Creates two notifications
 | 
			
		||||
      expect(Notification.count).to eq 2
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe '#perform' do
 | 
			
		||||
    context 'when fetching' do
 | 
			
		||||
      subject { described_class.new(json, sender) }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue