taganaka/polipus

View on GitHub
lib/polipus.rb

Summary

Maintainability
D
3 days
Test Coverage
# encoding: UTF-8
require 'redis'
require 'redis/connection/hiredis'
require 'redis-queue'
require 'polipus/version'
require 'polipus/http'
require 'polipus/storage'
require 'polipus/url_tracker'
require 'polipus/plugin'
require 'polipus/queue_overflow'
require 'polipus/robotex'
require 'polipus/signal_handler'
require 'thread'
require 'logger'
require 'json'

module Polipus
  def self.crawler(job_name = 'polipus', urls = [], options = {}, &block)
    PolipusCrawler.crawl(job_name, urls, options, &block)
  end

  class PolipusCrawler
    OPTS = {
      # run 4 threads
      workers: 4,
      # identify self as Polipus/VERSION
      user_agent: "Polipus - #{Polipus::VERSION} - #{Polipus::HOMEPAGE}",
      # by default, don't limit the depth of the crawl
      depth_limit: false,
      # number of times HTTP redirects will be followed
      redirect_limit: 5,
      # storage engine defaults to DevNull
      storage: nil,
      # proxy server hostname
      proxy_host: nil,
      # proxy server port number
      proxy_port: false,
      # proxy server username
      proxy_user: nil,
      # proxy server password
      proxy_pass: nil,
      # HTTP read timeout in seconds
      read_timeout: 30,
      # HTTP open connection timeout in seconds
      open_timeout: 10,
      # Time to wait for new messages on Redis
      # After this timeout, current crawling session is marked as terminated
      queue_timeout: 30,
      # An URL tracker instance. default is Bloomfilter based on redis
      url_tracker: nil,
      # A Redis options {} that will be passed directly to Redis.new
      redis_options: {},
      # An instance of logger
      logger: nil,
      # A logger level
      logger_level: nil,
      # whether the query string should be included in the saved page
      include_query_string_in_saved_page: true,
      # Max number of items to keep on redis
      queue_items_limit: 2_000_000,
      # The adapter used to store exceed (queue_items_limit) redis items
      queue_overflow_adapter: nil,
      # Every x seconds, the main queue is checked for overflowed items
      queue_overflow_manager_check_time: 60,
      # If true, each page downloaded will increment a counter on redis
      stats_enabled: false,
      # Cookies strategy
      cookie_jar: nil,
      # whether or not accept cookies
      accept_cookies: false,
      # A set of hosts that should be considered parts of the same domain
      # Eg It can be used to follow links with and without 'www' domain
      domain_aliases: [],
      # Mark a connection as staled after connection_max_hits request
      connection_max_hits: nil,
      # Page TTL: mark a page as expired after ttl_page seconds
      ttl_page: nil,
      # don't obey the robots exclusion protocol
      obey_robots_txt: false,
      # If true, signal handling strategy is enabled.
      # INT and TERM signal will stop polipus gracefully
      # Disable it if polipus will run as a part of Resque or DelayedJob-like system
      enable_signal_handler: true
    }

    attr_reader :storage
    attr_reader :job_name
    attr_reader :logger
    attr_reader :options
    attr_reader :crawler_name

    OPTS.keys.each do |key|
      define_method "#{key}=" do |value|
        @options[key.to_sym] = value
      end
      define_method "#{key}" do
        @options[key.to_sym]
      end
    end

    def initialize(job_name = 'polipus', urls = [], options = {})
      @job_name     = job_name
      @options      = OPTS.merge(options)
      @options[:queue_timeout] = 1 if @options[:queue_timeout] <= 0
      @logger       = @options[:logger]  ||= Logger.new(nil)

      unless @logger.class.to_s == 'Log4r::Logger'
        @logger.level = @options[:logger_level] ||= Logger::INFO
      end

      @storage      = @options[:storage] ||= Storage.dev_null

      @workers_pool = []

      @follow_links_like  = []
      @skip_links_like    = []
      @on_page_downloaded = []
      @on_before_save     = []
      @on_page_error      = []
      @focus_crawl_block  = nil
      @on_crawl_start     = []
      @on_crawl_end       = []
      @redis_factory      = nil

      @overflow_manager = nil
      @crawler_name = `hostname`.strip + "-#{@job_name}"

      @storage.include_query_string_in_uuid = @options[:include_query_string_in_saved_page]

      @urls = [urls].flatten.map { |url| URI(url) }
      @urls.each { |url| url.path = '/' if url.path.empty? }
      if @options[:obey_robots_txt]
        @robots =
          if @options[:user_agent].respond_to?(:sample)
            Polipus::Robotex.new(@options[:user_agent].sample)
          else
            Polipus::Robotex.new(@options[:user_agent])
          end
      end
      # Attach signal handling if enabled
      SignalHandler.enable if @options[:enable_signal_handler]

      if queue_overflow_adapter
        @on_crawl_start << lambda do |_|
          Thread.new do
            Thread.current[:name] = :overflow_items_controller
            overflow_items_controller.run
          end
        end
      end

      @on_crawl_end << lambda do |_|
        Thread.list.select { |thread| thread.status && Thread.current[:name] == :overflow_items_controller }.each(&:kill)
      end

      execute_plugin 'on_initialize'

      yield self if block_given?
    end

    def self.crawl(*args, &block)
      new(*args, &block).takeover
    end

    def takeover
      @urls.each do |u|
        add_url(u) { |page| page.user_data.p_seeded = true }
      end
      return if internal_queue.empty?

      @on_crawl_start.each { |e| e.call(self) }

      execute_plugin 'on_crawl_start'
      @options[:workers].times do |worker_number|
        @workers_pool << Thread.new do
          @logger.debug { "Start worker #{worker_number}" }
          http  =  HTTP.new(@options)
          queue =  queue_factory
          queue.process(false, @options[:queue_timeout]) do |message|
            next if message.nil?

            execute_plugin 'on_message_received'

            page = Page.from_json message

            unless should_be_visited?(page.url, false)
              @logger.info { "[worker ##{worker_number}] Page (#{page.url}) is no more welcome." }
              queue.commit
              next
            end

            if page_exists? page
              @logger.info { "[worker ##{worker_number}] Page (#{page.url}) already stored." }
              queue.commit
              next
            end

            url = page.url.to_s
            @logger.debug { "[worker ##{worker_number}] Fetching page: [#{page.url}] Referer: #{page.referer} Depth: #{page.depth}" }

            execute_plugin 'on_before_download'

            pages = http.fetch_pages(url, page.referer, page.depth, page.user_data)
            if pages.count > 1
              rurls = pages.map { |e| e.url.to_s }.join(' --> ')
              @logger.info { "Got redirects! #{rurls}" }
              page = pages.pop
              page.aliases = pages.map(&:url)
              if page_exists? page
                @logger.info { "[worker ##{worker_number}] Page (#{page.url}) already stored." }
                queue.commit
                next
              end
            else
              page = pages.last
            end

            execute_plugin 'on_after_download'

            if page.error
              @logger.warn { "Page #{page.url} has error: #{page.error}" }
              incr_error
              @on_page_error.each { |e| e.call(page) }
            end

            # Execute on_before_save blocks
            @on_before_save.each { |e| e.call(page) }

            page.storable? && @storage.add(page)

            @logger.debug { "[worker ##{worker_number}] Fetched page: [#{page.url}] Referrer: [#{page.referer}] Depth: [#{page.depth}] Code: [#{page.code}] Response Time: [#{page.response_time}]" }
            @logger.info  { "[worker ##{worker_number}] Page (#{page.url}) downloaded" }

            incr_pages

            # Execute on_page_downloaded blocks
            @on_page_downloaded.each { |e| e.call(page) }

            if @options[:depth_limit] == false || @options[:depth_limit] > page.depth
              links_for(page).each do |url_to_visit|
                next unless should_be_visited?(url_to_visit)
                enqueue url_to_visit, page
              end
            else
              @logger.info { "[worker ##{worker_number}] Depth limit reached #{page.depth}" }
            end

            @logger.debug { "[worker ##{worker_number}] Queue size: #{queue.size}" }
            @overflow_manager.perform if @overflow_manager && queue.empty?
            execute_plugin 'on_message_processed'

            if SignalHandler.terminated?
              @logger.info { 'About to exit! Thanks for using Polipus' }
              queue.commit
              break
            end
            true
          end
        end
      end

      @workers_pool.each(&:join)
      @on_crawl_end.each { |e| e.call(self) }
      execute_plugin 'on_crawl_end'
    end

    # A pattern or an array of patterns can be passed as argument
    # An url will be discarded if it doesn't match patterns
    def follow_links_like(*patterns)
      @follow_links_like = @follow_links_like += patterns.uniq.compact
      self
    end

    # A pattern or an array of patterns can be passed as argument
    # An url will be discarded if it matches a pattern
    def skip_links_like(*patterns)
      @skip_links_like = @skip_links_like += patterns.uniq.compact
      self
    end

    # A block of code will be executed on every page downloaded
    # The block takes the page as argument
    def on_page_downloaded(&block)
      @on_page_downloaded << block
      self
    end

    # A block of code will be executed when crawl session is over
    def on_crawl_end(&block)
      @on_crawl_end << block
      self
    end

    # A block of code will be executed when crawl session is starting
    def on_crawl_start(&block)
      @on_crawl_start << block
      self
    end

    # A block of code will be executed on every page downloaded
    # before being saved in the registered storage
    def on_before_save(&block)
      @on_before_save << block
      self
    end

    # A block of code will be executed whether a page contains an error
    def on_page_error(&block)
      @on_page_error << block
      self
    end

    # A block of code will be executed
    # on every page downloaded. The code is used to extract urls to visit
    # see links_for method
    def focus_crawl(&block)
      @focus_crawl_block = block
      self
    end

    def redis_options
      @options[:redis_options]
    end

    def queue_size
      internal_queue.size
    end

    def stats_reset!
      ["polipus:#{@job_name}:errors", "polipus:#{@job_name}:pages"].each { |e| redis.del e }
    end

    def redis_factory(&block)
      @redis_factory = block
      self
    end

    def url_tracker
      @url_tracker ||=
        @options[:url_tracker] ||=
          UrlTracker.bloomfilter(key_name: "polipus_bf_#{job_name}",
                                 redis: redis_factory_adapter,
                                 driver: 'lua')
    end

    def redis
      @redis ||= redis_factory_adapter
    end

    def add_to_queue(page)
      if [:url, :referer, :depth].all? { |method| page.respond_to?(method) }
        add_url(page.url, referer: page.referer, depth: page.depth)
      else
        add_url(page)
      end
    end

    # Enqueue an url, no matter what
    def add_url(url, params = {})
      page = Page.new(url, params)
      yield(page) if block_given?
      internal_queue << page.to_json
    end

    # Request to Polipus to stop its work (gracefully)
    # cler_queue = true if you want to delete all of the pending urls to visit
    def stop!(cler_queue = false)
      SignalHandler.terminate
      internal_queue.clear(true) if cler_queue
    end

    private

    # URLs enqueue policy
    def should_be_visited?(url, with_tracker = true)
      case
      # robots.txt
      when !allowed_by_robot?(url)
        false
      # Check against whitelist pattern matching
      when !@follow_links_like.empty? && @follow_links_like.none? { |p| url.path =~ p }
        false
      # Check against blacklist pattern matching
      when @skip_links_like.any? { |p| url.path =~ p }
        false
      # Page is marked as expired
      when page_expired?(Page.new(url))
        true
      # Check against url tracker
      when with_tracker && url_tracker.visited?(@options[:include_query_string_in_saved_page] ? url.to_s : url.to_s.gsub(/\?.*$/, ''))
        false
      else
        true
      end
    end

    # It extracts URLs from the page
    def links_for(page)
      page.domain_aliases = domain_aliases
      @focus_crawl_block.nil? ? page.links : @focus_crawl_block.call(page)
    end

    # whether a page is expired or not
    def page_expired?(page)
      return false if @options[:ttl_page].nil?
      stored_page = @storage.get(page)
      r = stored_page && stored_page.expired?(@options[:ttl_page])
      @logger.debug { "Page #{page.url} marked as expired" } if r
      r
    end

    # whether a page exists or not
    def page_exists?(page)
      return false if page.user_data && page.user_data.p_seeded
      @storage.exists?(page) && !page_expired?(page)
    end

    #
    # Returns +true+ if we are obeying robots.txt and the link
    # is granted access in it. Always returns +true+ when we are
    # not obeying robots.txt.
    #
    def allowed_by_robot?(link)
      return true if @robots.nil?
      @options[:obey_robots_txt] ? @robots.allowed?(link) : true
    end

    # The url is enqueued for a later visit
    def enqueue(url_to_visit, current_page)
      page_to_visit = Page.new(url_to_visit.to_s, referer: current_page.url.to_s, depth: current_page.depth + 1)
      internal_queue << page_to_visit.to_json
      to_track = @options[:include_query_string_in_saved_page] ? url_to_visit.to_s : url_to_visit.to_s.gsub(/\?.*$/, '')
      url_tracker.visit to_track
      @logger.debug { "Added (#{url_to_visit}) to the queue" }
    end

    # It creates a redis client
    def redis_factory_adapter
      if @redis_factory
        @redis_factory.call(redis_options)
      else
        Redis.new(redis_options)
      end
    end

    # It creates a new distributed queue
    def queue_factory
      Redis::Queue.new("polipus_queue_#{@job_name}", "bp_polipus_queue_#{@job_name}", redis: redis_factory_adapter)
    end

    # If stats enabled, it increments errors found
    def incr_error
      redis.incr "polipus:#{@job_name}:errors" if @options[:stats_enabled]
    end

    # If stats enabled, it increments pages downloaded
    def incr_pages
      redis.incr "polipus:#{@job_name}:pages" if @options[:stats_enabled]
    end

    # It handles the overflow item policy (if any)
    def overflow_items_controller
      @overflow_manager = QueueOverflow::Manager.new(self, queue_factory, @options[:queue_items_limit])

      # In the time, url policy may change so policy is re-evaluated
      @overflow_manager.url_filter do |page|
        should_be_visited?(page.url, false)
      end

      QueueOverflow::Worker.new(@overflow_manager)
    end

    def internal_queue
      @internal_queue ||= queue_factory
    end

    # It invokes a plugin method if any
    def execute_plugin(method)
      Polipus::Plugin.plugins.each do |k, p|
        next unless p.respond_to?(method)
        @logger.info { "Running plugin method #{method} on #{k}" }
        ret_val = p.send(method, self)
        instance_eval(&ret_val) if ret_val.is_a? Proc
      end
    end
  end
end