npolar/api.npolar.no

View on GitHub
lib/npolar/storage/couch.rb

Summary

Maintainability
D
2 days
Test Coverage
# encoding: utf-8
require "rack/client"

module Npolar
  module Storage

    # CouchDB storage client
    # Needs drying, esp. share code between POST and PUT (before: valid?)
    class Couch

      # Delegate validation to model
      #extend Forwardable # http://www.ruby-doc.org/stdlib-1.9.3/libdoc/forwardable/rdoc/Forwardable.html
      #def_delegators :model, :valid?
  
      JSON_ARRAY_REGEX = /^(\s+)?\[.*\](\s+)?$/

      ALL_DOCS_QUERY_REGEX = /^\s*\{\s*"keys"\s*:.*$/

      LIMIT = 1000000
  
      HEADERS = {
        "Accept" => "application/json",
        #Accept-Encoding? 
        "Content-Type" => "application/json; charset=utf-8",
        "User-Agent" => self.name }

      attr_accessor :accepts, :client, :headers, :read, :write, :formats, :model, :limit

      def self.uri=uri
        if uri.respond_to? :gsub
          uri = uri.gsub(/[\/]$/, "")
        end
        @@uri=uri
      end

      def self.uri
        @@uri ||= URI  #"http://localhost:5984"
      end

      def accepts
        @accepts ||= ["application/json"]
      end
  
      def initialize(read, write = nil, config = {})
        if read.respond_to? :key? and read.key? "read"
          if write.nil? and read.key? "write"
            write = read["write"]
          end
          read = read["read"]
        end
        @read = read.gsub(/[\/]$/, "")
        @write = write.nil? ? read : write
        @write = @write.gsub(/[\/]$/, "")
      
        if @read !~ /^http(s)?:\/\// and self.class.uri =~ /^http(s)?:\/\//
          @read = self.class.uri+"/"+@read
        end

        if @write !~ /^http(s)?:\/\// and self.class.uri =~ /^http(s)?:\/\//
          @write = self.class.uri+"/"+@write
        end
        @headers = HEADERS

        if config.key? :limit and config[:limit].to_i > 0
          @limit = config[:limit]
        end

      end

      # DELETE document
      # @param id [String]
      # @param params [Hash]
      # @return [Rack::Response] Where body is the deleted document
      def delete(id, params={})
        if id.nil? or id.size < 1
          raise ArgumentError, "Missing or blank id"
        end
        
        response = reader.get(id, params)

        if response.status == 200

          deleted_document = response.body
          # If user did not provide "rev" parameter, use the last revision
          if not params.key?("rev")
            params["rev"] = response.headers["Etag"].gsub(/["]/, "")
          end
        
          log.debug "Before DELETE #{id} rev=#{params['rev']} [#{self.class.name}]"
          response =  writer.delete(id, headers, params)
          response.body = deleted_document
        end
        Rack::Response.new(response.body, response.status, response.headers)
      end
  
      def formats
        @formats ||= ["json"]
      end
  
      def get(id, params={})

        if params["limit"].to_i > 0
          @limit = params["limit"].to_i
        end

        case id
          when nil, "", "_meta" then meta
          when "_ids", "" then ids
          when "_feed", "_all" then feed(params)
          when "_invalid" then valid(false, params)
          when "_valid" then valid(true, params)
          when "_validate" then validate(params)
          when "_bundle" then bundle(params)
          when "_view" then begin
            if params["name"] =~ /\//
            
              ddoc, map_fx = params["name"].split("/")
              view(ddoc, map_fx, params)
              
            else
              raise ArgumentError, "Please provide view name like _view?name={design_document/function}"  
            end
          end
            

        else
          response = reader.get(id, headers, params)
          [response.status, response.headers, response.body]
        end
        
      end
  
      def head(id, params={})
        # FIXME => 500
        response = reader.head(id, headers, params)
        [response.status, response.headers, response.body]
      end
  
      def headers
        @headers ||= HEADERS
      end

      def valid(cond=true, params=nil)
        v = all.select {|d| cond == model.class.new(d).valid? }
        body = Yajl::Encoder.encode(v)
        Rack::Response.new(body, 200, {"Content-Type" => HEADERS["Content-Type"]})
      end

      def validate(params)
        report = []
        all.each do |d|
          m = model.class.new(d)
          v = m.valid?
          if false == v
            report << { d[:id] => m.errors }
          end
        end
        report = { "errors" => report }
        body = Yajl::Encoder.encode(report)
        Rack::Response.new(body, 200, {"Content-Type" => HEADERS["Content-Type"]})
      end

      # All documents
      def all(uri=nil)
        uri = uri.nil? ? all_docs_uri(true) : uri
        response = couch.get(uri)
        if 200 == response.status
          Yajl::Parser.parse(response.body, :symbolize_keys => true)[:rows].map { |row| row[:doc] }.select {|d| d.key? :_id and d[:_id] !~ /^_design\// }
        else
          raise "HTTP error: #{response.status}"
        end
      end
  
      def post(data, params={})
        #unless valid? data
        #  raise Exception
        #end

        if data =~ ALL_DOCS_QUERY_REGEX
          # XXX ugly hack to route request to right place
          return fetch_many(data, include_docs=true)
        elsif data =~ JSON_ARRAY_REGEX
          post_many(data, params)
        else
          unless data.is_a? Hash
            doc = Yajl::Parser.parse(data)
            
          else
            doc = data
          end
          doc = self.class.force_ids(doc)
          # Turn POST into PUT so that we get a real UUID id?

          #HTTP/1.1 201 Created
          #Content-Type: application/json
          #Server: CouchDB/1.2.0 (Erlang OTP/R15B01)
          #Location: http://localhost:5984/api/svc-polar-bear-interaction
          #Etag: "1-bf53e26c83adaaf5c4e3cb12ca018a4e"
          #Date: Wed, 19 Dec 2012 11:44:56 GMT
          #Content-Length: 674
          #Cache-Control: must-revalidate
          #Connection: keep-alive

          response = writer.put(doc["id"], headers, doc.to_json)

          # if conflict, and overwrite=true, autoresolve it
          if 409 == response.status and params["overwrite"] == "true"
            couch = Yajl::Parser.parse(response.body)
            if couch["error"] == "conflict"
              doc = update_revision(doc)
              response = writer.put(doc["id"], headers, doc.to_json)
            end
          end
          
          if 201 == response.status
            couch = Yajl::Parser.parse(response.body)
            response = reader.get(couch["id"], {"rev" => couch["rev"] })
          end
          response.headers["Content-Type"] = "application/json"
          [response.status, response.headers, response.body]
  
        end
  
      end
  
      # @param String id UUID or SHA1 hash, e.g. "69f3f072-27a0-4d25-a5bf-aac8f7e31d8f"
      # @param String|Hash data JSON data
      # @param Hash params
      # @todo force "_id" 
      def put(id, data, params={})

        if data.is_a? Hash
          data = data.to_json
        end
        #unless valid? data
        #  raise Exception
        #end
        # params?
        #if params.key? "attachment"
        # #couch.put("")
        #end
        response = writer.put(id, headers, data)

        # if conflict, and overwrite=true, autoresolve it
        if 409 == response.status and params["overwrite"] == "true"
          couch = Yajl::Parser.parse(response.body)
          if couch["error"] == "conflict"
            doc = update_revision(Yajl::Parser.parse(data))
            response = writer.put(doc["id"], headers, doc.to_json)
          end
        elsif 201 == response.status
          rev = response.headers["Etag"].gsub(/["]/, "") # Get the revision number from the Etag header
          created = writer.get(id, {"rev" => rev }) # GET document back from writer 
          response.body = created.body
        end
        [response.status, response.headers,response.body]
      end
      
      # Storage metadata (document count, data size, started, and (not) updated)
      def meta
        ids = []
        
        response = couch.get(read)
        views = Yajl::Parser.parse(couch.get(read+"/_all_docs?startkey=%22_design%2F%22&endkey=%22_design0%22&include_docs=false").body)["rows"].size || 0
              
        if 200 == response.status
          couch_desc = Yajl::Parser.parse(response.body)
          meta = {
            count: couch_desc["doc_count"]-views,
            size: couch_desc["data_size"],
            started: Time.at(couch_desc["instance_start_time"].to_i/1e6).utc.iso8601,
            updated: nil # Hard to know for CouchDB, especially if deleted
          }
          status = 200
        else
          status = 501
        end
        [status, {"Content-Type" => HEADERS["Content-Type"]}, [ Yajl::Encoder.encode(meta)+"\n"]] # Couch returns text/plain here!?
      end
  
      
      def ids
        ids = []

        response = couch.get(all_docs_uri(false))
        
        if 200 == response.status
          ids = Yajl::Parser.parse(response.body)["rows"].reject {|row| row["id"] =~ /^_\w+\//}.map {|row| row["id"] }
          status = 200
        else
          status = 501
        end
        body = Yajl::Encoder.encode({ "ids" => ids })
        headers = {"Content-Type" => HEADERS["Content-Type"]}
        #[status, headers, Yajl::Encoder.encode(body)+"\n"] # Couch returns text/plain here!?
        Rack::Response.new(body, status, headers)
      end

      def feed(params={})
        response = couch.get(all_docs_uri(true))

        if 200 == response.status

          feed = Yajl::Parser.parse(response.body, :symbolize_keys => true)[:rows].map { |row| row[:doc] }

          unless params["fields"].nil?
            if "*" == params["fields"]
              # no op
            else
              fields = params["fields"].split(",").map {|f|f.to_sym}
              feed = feed.map {|doc|
                doc = doc.select {|k,v| fields.include? k or fields.include? :* }
              
              }
              feed
            end
          else
            feed = Yajl::Parser.parse(response.body, :symbolize_keys => true)[:rows].map { |row|
              { :title => row[:doc][:title], :id => row[:doc][:id], :_id => row[:doc][:_id], :updated => row[:doc][:updated] }
            }
          end
          feed = feed.select { |row| row[:_id] !~ /_design/ }


        else
          feed = { "error" => { "status" => response.status, "explanation" => "Storage error #{response.status}" } }
        end
        [response.status, {"Content-Type" => HEADERS["Content-Type"]}, [ Yajl::Encoder.encode(feed)+"\n"]] # Couch returns text/plain here!?
      end
      
      #def parameter(params)
      #  ddoc = params.key?("view") ? params["view"] : "parameter"
      #  view(ddoc, params["parameter"], params)
      #end
      
      def bundle(params)
        view("parameter", "bundle", params)
      end
      
      # http://docs.couchdb.org/en/latest/api/ddoc/views.html#api-ddoc-view
      def view(ddoc, map_fx, params={})
        
        if not params.key? "include_docs"
          params["include_docs"] = "true"
        end
        
        uri = URI.parse"#{read}/_design/#{ddoc}/_view/#{map_fx}"
        uri.query = URI.encode_www_form(params.to_a)
        
        response = couch.get(uri)

        if 200 == response.status
          feed = Yajl::Parser.parse(response.body, :symbolize_keys => true)[:rows].map { |row| row[:doc] }
        else
          feed = { "error" => { "status" => response.status, "explanation" => "Storage error #{response.status}" } }
        end
        [response.status, {"Content-Type" => HEADERS["Content-Type"]}, [ Yajl::Encoder.encode(feed)+"\n"]] # Couch returns text/plain here!?
      end
      
      def all_docs_uri(include_docs=false)
        include_docs = (false == include_docs) ? "false" : "true" 
        # Use a view, if it exists
        # /#{read}/_design/feed/_view/fields?keys=["id","title","updated"]

        # Otherwise, fallback to _all_docs
        uri = "#{read}/_all_docs?include_docs=#{true}&limit=#{limit}" #&startkey=%22#{sk}%22&endkey=%22#{ek}%22"
      end

      # retrieve all docs matching requested id's
      # data takes form of { "keys" : [1, 2, 3, 4, 5] } 
      def fetch_many(data, include_docs=false)
        uri = "#{read}/_all_docs?"
        if include_docs
          uri += "include_docs=true"
        end
        response = reader.post(uri, headers, data)
        [response.status, response.headers, response.body]
      end

      def fetch(id,key=nil)

        begin
    
          status, headers, jsonstring = get(id)
    
          if 200 == status
            y = Yajl::Parser.new(:symbolize_keys => true)
            hash = y.parse(jsonstring)
            if key.nil?
              hash
            elsif hash.key? key.to_sym
              hash[key = key.to_sym]
            else
              nil
            end
          else
            raise Exception, "#{self.class.name}#fetch status: #{status}"
          end
        end
      end

      # Called by Core#valid? see https://github.com/npolar/api.npolar.no/blob/12e741d8897c8c5e9065a6ac283718e0b274b936/lib/npolar/api/core.rb#L290
      def valid? data, context="POST"
        @errors = []

        # First, check JSON syntax
        begin
          if data =~ JSON_ARRAY_REGEX
            docs = JSON.parse data
          else
            docs = [JSON.parse(data)]
          end

        rescue JSON::ParserError => e        
          @errors = "JSON syntax error"
          return false
        end

        if model.nil?
          return true
        end
        
        begin
          docs.each do | document |
          
            # @model already exists, but we need a new clean object
            m = model.class.new(document)

            v =  m.valid? document            
            if false == v and m.respond_to? :errors
               @errors << { document["id"] => m.errors }
            end
          end
          @errors = errors.flatten

          errors.any? ? false : true

        rescue => e
          raise e
        end
      end

      def errors
        @errors ||= nil
      end


      protected
  
      # Raw couch client, use to get _documents (@see #ids)
      def couch
        ::Rack::Client.new(@uri)
      end
  
      # Protected couch client
      def client(uri)
        @client ||= ::Rack::Client.new(uri) do
          # Security feature: Disallow blank ids, and ids starting with _
          # Blank ids plus DELETE means bam! (deleting entire collection), the second could leak special CouchDB documents
          use Npolar::Rack::ValidateId
          run ::Rack::Client::Handler::NetHTTP
        end
      end

      # look up couch doc by doc["id"] and update doc's _rev
      def update_revision(doc)
        response = get(doc["id"])
        if response[0] == 200
          couch_doc = Yajl::Parser.parse(response[2])
          rev = couch_doc["_rev"]
          doc["_rev"] = rev
        end
        doc
      end

      def ids_from_response(response)
        ids = []
        # couch responds with id's of written docs, parse them out
        info = Yajl::Parser.parse(response.body)
        info.each { |row| ids << row['id'] }
        ids
      end

      def self.force_ids(doc)

        # if _id defined make sure id=_id
        if doc.key? :id and not doc.key? "id"
          doc["id"] = doc[:id]
          doc.delete :id
        end
        

        if doc.key? "_id" and !doc["_id"].to_s.empty?
          doc["id"] = doc["_id"]
        # if _id not defined and id defined, _id=id
        elsif doc.key? "id" and !doc["id"].to_s.empty?
          doc["_id"] = doc["id"]
        # no _id or id, then generate uuid and set _id=id=uuid
        else
          doc["_id"] = self.uuid(doc)
          doc["id"] = doc["_id"]
        end
        doc
      end

      def self.uuid(doc)
        UUIDTools::UUID.random_create
      end

      def limit
        @limit ||= LIMIT
      end

      def log
        @log ||= Npolar::Api.log
      end
  
      def reader
        client(read+"/")
      end      
      
      def writer
        client(write+"/")
      end

      def post_many(data, params={})        
        if data !~ JSON_ARRAY_REGEX
          raise ArgumentException, "Please provide data as JSON Array"
        end
        t0 = Time.now

        # parse docs and make sure we have 'id' and '_id' set
        docs = Yajl::Parser.parse(data)
        docs = docs.map {|doc| self.class.force_ids(doc)} 

        # try to post them all
        couch =  { "docs" => docs }
        response = writer.post("_bulk_docs", headers, couch.to_json)

        # keep ids here
        conflict_ids = []
        couch_ids = []

        # inspect for any conflicts
        messages = Yajl::Parser.parse(response.body)
        messages.each do |msg|
          if msg.has_key? "error" and msg["error"] == "conflict"
            # collect any conflicted ids
            conflict_ids << msg["id"]
          end

          # collect all generated ids
          couch_ids << msg["id"]
        end

        # if we had conflicts
        status = !conflict_ids.empty? ? 409 : response.status

        # if overwrite=true then repost with updated _revs, overwriting docs in db
        if params["overwrite"] == "true" and !conflict_ids.empty?
          # docs we will repost
          docs_to_repost = []

          # hash the docs by id
          docs_hash = Hash[docs.collect { |doc| [doc["id"], doc]}]

          # update _revs of docs
          resp = fetch_many({ "keys" => conflict_ids }.to_json, include_docs=false)
          resp_data = Yajl::Parser.parse(resp[2])
          resp_data["rows"].each do |info|
            if docs_hash.has_key? info["id"]
              doc = docs_hash[info["id"]]
              doc["_rev"] = info["value"]["rev"]
              docs_to_repost << doc
            end
          end 

          # repost to _bulk_docs
          response = writer.post("_bulk_docs", headers, { "docs" => docs_to_repost }.to_json)
          status = response.status
        end

        headers = {"Content-Type" => HEADERS["Content-Type"]}
        elapsed = Time.now-t0
        size = couch['docs'].size

        if 201 == status
          summary = "Posted #{size} CouchDB documents in #{elapsed} seconds"
          rk = "response"
          explanation =  "CouchDB success"
        elsif 409 == status
          summary = "document write conflict"
          explanation  = "CouchDB error"
          rk = "error"
        else
          summary = JSON.parse(response.body)["reason"]
          explanation =  "CouchDB error"
          rk = "error"
        end
        
        [status, headers , [{rk => { "status" => status,
          # "uri" => "",
          "ids" => couch_ids, # provide these so solrizer can use them
          "summary" => summary, "explanation" => explanation, "system" => response.headers["Server"] },
          "method" => "", "qps" => size/elapsed
          }.to_json+"\n"]]
      end

    end
  end
end
# http://theschemeway.blogspot.no/2011/02/securing-couchdb-database.html
# SSL only write?
# http://blog.couchbase.com/what%E2%80%99s-new-couchdb-10-%E2%80%94-part-4-security%E2%80%99n-stuff-users-authentication-authorisation-and-permissions
# Authorization: AWS + KeyId + : + base64(hmac-sha1(VERB + CONTENT-MD5 + CONTENT-TYPE + DATE + …))
#http://dagi3d.net/
# http://dagi3d.net/posts/5-api-authentication
      #if body.respond_to? :body and body.body.respond_to? :force_encoding
      #  body = body.body.force_encoding("UTF-8")
      #end