holderdeord/hdo-transcript-search

View on GitHub
indexer/lib/hdo-transcript-indexer.rb

Summary

Maintainability
C
1 day
Test Coverage
# -*- coding: utf-8 -*-
require 'nokogiri'
require 'pathname'
require 'uri'
require 'logger'
require 'json'
require 'time'
require 'set'
require 'pry'
require 'forwardable'
require 'mail'

require 'hdo-transcript-indexer/text_utils'
require 'hdo-transcript-indexer/converter'
require 'hdo-transcript-indexer/cache'
require 'hdo-transcript-indexer/index'

Faraday.default_adapter = :patron
Faraday.default_connection_options.request.timeout = 30 # we sometimes see hangs in the API
Faraday.default_connection_options.headers = {
  'User-Agent' => 'hdo-transcript-downloader | https://www.holderdeord.no/'
}

module Hdo
  module Transcript
    class Indexer
      def initialize(options)
        @data_dir     = data_dir_from(options)
        @sessions     = options.fetch(:sessions)
        @faraday      = Faraday.new('http://data.stortinget.no')
        @logger       = Logger.new(STDOUT)
        @create_index = options.fetch(:create_index)
        @force        = options.fetch(:force)
        @extras       = JSON.parse(File.read(File.expand_path("../hdo-transcript-indexer/extras.json", __FILE__)))
        @ner          = options.fetch(:ner)
        @mail         = options.fetch(:mail)
        @lix          = options.fetch(:lix)

        @finished_cache = {}
        @errors       = []
        @new_transcripts = []

        if @sessions == ['all']
          @sessions = Hdo::StortingImporter::ParsingDataSource.default.
            parliament_sessions.
            select { |e| e.start_date.year >= 1998 }.
            map { |e| [e.start_date.year, e.end_date.year].join('-') }
        end

        if @ner && !system("which polyglot 2>&1 >/dev/null")
          raise "polyglot not installed, please run `pip install polyglot && polyglot download embeddings2.no ner2.no`"
        end

        @index = Index.new(
          options.fetch(:elasticsearch_url),
          options.fetch(:index_name),
          @logger
        )

        @party_cache = Cache.new(cache_path('name-to-party'), ttl_in_days: 30)
        @party_cache.load_if_exists

        @slug_cache = Cache.new(cache_path('name-to-slug'), ttl_in_days: 30)
        @slug_cache.load_if_exists

        @id_to_person = Cache.new(cache_path('id-to-person'), ttl_in_days: 30)
        @id_to_person.load_if_exists

        @stats = Hash.new { |hash, session| hash[session] = Hash.new(0) }
      end

      def execute
        download
        convert
        create_index
        index_docs
        notify

        @stats
      end

      private

      def download
        @sessions.each { |s| fetch_session s }
      end

      def convert
        build_slug_cache
        build_party_cache

        xml_transcripts.each { |input| convert_to_json(input) }
      end

      def notify
        return unless @mail
        return if @new_transcripts.empty?

        count = @new_transcripts.size
        list = @new_transcripts.map { |e| "* #{e}" }.join("\n")

        stats = @stats.sort_by { |s, _| s }.reverse.map do |session, data|
          "#{session}: #{data[:speeches]} innlegg i #{data[:transcripts]} referater"
        end.join("\n")

        content = "Nye referater lagt til:\n\n#{list}\n\n#{stats}\n\n-- https://tale.holderdeord.no/"

        Mail.deliver do
          from     'noreply@holderdeord.no'
          to       ['jari@holderdeord.no']
          subject  "#{count} nye referater"
          body     content
        end
      end

      def index_docs
        json_transcripts.each { |input| index_file(input) }
      end

      def xml_transcripts
        files_matching '{[sS]*,refs-*}.xml'
      end

      def json_transcripts
        files_matching '{[sS]*,refs-*}.json'
      end

      def files_matching(glob)
        Pathname.glob(@data_dir.join(glob)).sort_by(&:to_s)
      end

      def cache_path(name)
        p = @data_dir.join("cache/#{name}.json")
        p.dirname.mkpath unless p.dirname.exist?

        p
      end

      def build_party_cache
        return unless @party_cache.empty?

        @logger.info "building name -> party cache, this could take a while"

        xml_transcripts.each do |input_file|          
          Converter.parse(input_file.to_s, skip_errors: true).sections.each do |section|
            n = section[:name]
            p = section[:party]

            if n && p
              @party_cache[n] ||= p
            end
          end
        end

        # manually maintained list of people we can't infer from the transcript data
        @party_cache.merge!(@extras.fetch('parties'))
        @party_cache.save

        json_transcripts.each { |t| t.delete }
      end

      def build_slug_cache
        return unless @slug_cache.empty? || @id_to_person.empty?

        @logger.info "building name -> slug cache, id -> person cache"

        periods = JSON.parse(@faraday.get("http://data.stortinget.no/eksport/stortingsperioder?format=json").body).
          fetch('stortingsperioder_liste').
          map { |e| e['id'] }

        periods.each do |period|
          res = @faraday.get("http://data.stortinget.no/eksport/representanter?stortingsperiodeid=#{period}&format=json")

          if res.status != 200
            @logger.warn "unable to fetch representatives for #{period}"
            next
          end

          data = JSON.parse(res.body)

          data['representanter_liste'].each do |rep|
            full_name = rep.values_at('fornavn', 'etternavn').join(' ')

            @slug_cache[full_name] = rep['id']
            @id_to_person[rep['id']] = {
              "id": rep['id'],
              "name": full_name,
              "party": rep['parti'] ? rep['parti']['id'] : nil
            }
          end
        end

        res = @faraday.get('http://data.stortinget.no/eksport/dagensrepresentanter?format=json')
        data = JSON.parse(res.body)
        data['dagensrepresentanter_liste'].each do |rep|
          full_name = rep.values_at('fornavn', 'etternavn').join(' ')
          @slug_cache[full_name] = rep['id']

          @id_to_person[rep['id']] = {
            "id": rep['id'],
            "name": full_name,
            "party": rep['parti'] ? rep['parti']['id'] : nil
          }
        end

        # manually maintained list of people we can't infer from the transcript data
        @slug_cache.merge!(@extras.fetch('slugs'))

        @slug_cache.save
        @id_to_person.save
      end

      def data_dir_from(options)
        dir = Pathname.new(options.fetch(:data_dir))
        dir.mkpath unless dir.exist?

        dir
      end

      def fetch_session(session)
        if session.split("-").first.to_i < 2008
          # fetch the older transcripts that we were emailed
          dest = @data_dir.join('old-data.zip')

          unless dest.exist?
            @logger.info "fetching older transcripts"
            ok = system "curl", "-o", dest.to_s, "-L", "http://files.holderdeord.no/data/transcripts/cleaned/1998-2009.zip"
            ok or raise "could not fetch transcripts"

            Dir.chdir(@data_dir) do
              ok = system "unzip", "-o", dest.to_s
              ok or raise "could not unzip"
            end
          end

          FileUtils.cp_r Dir.glob(@data_dir.join("#{session}/*.xml").to_s), @data_dir.to_s
        else
          # fetch from the API
          res = @faraday.get "http://data.stortinget.no/eksport/publikasjoner?publikasjontype=referat&sesjonid=#{URI.escape session}&format=json"
          data = JSON.parse(res.body)

          data.fetch('publikasjoner_liste').each { |t| fetch_transcript t }
        end
      end

      def fetch_transcript(t)
        id   = t['id']
        dest = @data_dir.join("#{id}.xml")

        if dest.exist? && !@force && finished_transcript?(dest)
          @logger.info "download cached: #{dest}"
        else
          @new_transcripts << id
          @logger.info "fetching transcript: #{id} => #{dest}"

          res = @faraday.get("http://data.stortinget.no/eksport/publikasjon?publikasjonid=#{id}")
          dest.open('w') { |io| io << res.body }
        end
      end

      def convert_to_json(input_file)
        dest = Pathname.new(input_file.to_s.sub(input_file.extname, '.json'))

        if dest.exist? && !@force && finished_transcript?(input_file)
          @logger.info "conversion cached: #{dest}"
        else
          @logger.info "converting: #{input_file} => #{dest}"

          json = Converter.parse(
            input_file.to_s,
            cache: @party_cache,
            id_to_person: @id_to_person,
            names: @extras.fetch('names'),
            transitions: @extras.fetch('transitions'),
            ner: @ner,
            lix: @lix
          ).to_json

          dest.open('w') { |io| io << json }
        end
      end

      def index_file(file)
        transcript_id = file.basename.to_s.sub(file.extname, '')
        data          = JSON.parse(file.read)
        presidents    = data['presidents'].map { |e| {name: e, external_id: @slug_cache[e]} }
        session       = data['session']

        unless @sessions.include?(session)
          return
        end

        @stats[session][:transcripts] += 1

        docs = data['sections'].map.with_index do |section, idx|
          @stats[session][:speeches] += 1

          id = "#{transcript_id}-#{idx}"

          doc = {
            'presidents'  => presidents,
            'session'     => data['session'],
            'transcript'  => transcript_id,
            'order'       => idx,
            'external_id' => @slug_cache[section['name']],
          }.merge(section)

          [id, doc]
        end

        @index.index docs

        if data['errors']
          data['errors'].each do |err|
            @logger.error err.inspect
          end
        end

        @logger.info "indexed #{file}"
      end

      def finished_transcript?(file)
        if !@finished_cache.key?(file.to_s)
          doc = Nokogiri::XML.parse(file.read)
          node = doc.css('Forhandlinger').first

          @finished_cache[file.to_s] = node.nil? || (node.attr('Status') == 'Komplett')
        end

        @finished_cache[file.to_s]
      rescue => err
        @logger.error err.inspect

        false
      end

      def create_index
        if @create_index || !@index.exists?
          @logger.info "creating index #{@index_name}"
          @index.recreate!
        end
      end
    end
  end
end