115 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			Ruby
		
	
	
	
			
		
		
	
	
			115 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			Ruby
		
	
	
	
# frozen_string_literal: true
 | 
						|
 | 
						|
require_relative './connection_pool/shared_connection_pool'
 | 
						|
 | 
						|
class RequestPool
 | 
						|
  def self.current
 | 
						|
    @current ||= RequestPool.new
 | 
						|
  end
 | 
						|
 | 
						|
  class Reaper
 | 
						|
    attr_reader :pool, :frequency
 | 
						|
 | 
						|
    def initialize(pool, frequency)
 | 
						|
      @pool      = pool
 | 
						|
      @frequency = frequency
 | 
						|
    end
 | 
						|
 | 
						|
    def run
 | 
						|
      return unless frequency&.positive?
 | 
						|
 | 
						|
      Thread.new(frequency, pool) do |t, p|
 | 
						|
        loop do
 | 
						|
          sleep t
 | 
						|
          p.flush
 | 
						|
        end
 | 
						|
      end
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  MAX_IDLE_TIME = 30
 | 
						|
  WAIT_TIMEOUT  = 5
 | 
						|
  MAX_POOL_SIZE = ENV.fetch('MAX_REQUEST_POOL_SIZE', 512).to_i
 | 
						|
 | 
						|
  class Connection
 | 
						|
    attr_reader :site, :last_used_at, :created_at, :in_use, :dead, :fresh
 | 
						|
 | 
						|
    def initialize(site)
 | 
						|
      @site         = site
 | 
						|
      @http_client  = http_client
 | 
						|
      @last_used_at = nil
 | 
						|
      @created_at   = current_time
 | 
						|
      @dead         = false
 | 
						|
      @fresh        = true
 | 
						|
    end
 | 
						|
 | 
						|
    def use
 | 
						|
      @last_used_at = current_time
 | 
						|
      @in_use       = true
 | 
						|
 | 
						|
      retries = 0
 | 
						|
 | 
						|
      begin
 | 
						|
        yield @http_client
 | 
						|
      rescue HTTP::ConnectionError
 | 
						|
        # It's possible the connection was closed, so let's
 | 
						|
        # try re-opening it once
 | 
						|
 | 
						|
        close
 | 
						|
 | 
						|
        if @fresh || retries.positive?
 | 
						|
          raise
 | 
						|
        else
 | 
						|
          @http_client = http_client
 | 
						|
          retries     += 1
 | 
						|
          retry
 | 
						|
        end
 | 
						|
      rescue StandardError
 | 
						|
        # If this connection raises errors of any kind, it's
 | 
						|
        # better if it gets reaped as soon as possible
 | 
						|
 | 
						|
        close
 | 
						|
        @dead = true
 | 
						|
        raise
 | 
						|
      end
 | 
						|
    ensure
 | 
						|
      @fresh  = false
 | 
						|
      @in_use = false
 | 
						|
    end
 | 
						|
 | 
						|
    def seconds_idle
 | 
						|
      current_time - (@last_used_at || @created_at)
 | 
						|
    end
 | 
						|
 | 
						|
    def close
 | 
						|
      @http_client.close
 | 
						|
    end
 | 
						|
 | 
						|
    private
 | 
						|
 | 
						|
    def http_client
 | 
						|
      Request.http_client.persistent(@site, timeout: MAX_IDLE_TIME)
 | 
						|
    end
 | 
						|
 | 
						|
    def current_time
 | 
						|
      Process.clock_gettime(Process::CLOCK_MONOTONIC)
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  def initialize
 | 
						|
    @pool   = ConnectionPool::SharedConnectionPool.new(size: MAX_POOL_SIZE, timeout: WAIT_TIMEOUT) { |site| Connection.new(site) }
 | 
						|
    @reaper = Reaper.new(self, 30)
 | 
						|
    @reaper.run
 | 
						|
  end
 | 
						|
 | 
						|
  def with(site, &block)
 | 
						|
    @pool.with(site) do |connection|
 | 
						|
      ActiveSupport::Notifications.instrument('with.request_pool', miss: connection.fresh, host: connection.site) do
 | 
						|
        connection.use(&block)
 | 
						|
      end
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  delegate :size, :flush, to: :@pool
 | 
						|
end
 |