rapid7/metasploit-framework

View on GitHub
lib/msf/core/exploit/git/packfile.rb

Summary

Maintainability
D
2 days
Test Coverage
module Msf

module Exploit::Git

  SIGNATURE = 'PACK'
  VERSION = 2

  ##
  # object types
  ##
  OBJ_COMMIT    = 1
  OBJ_TREE      = 2
  OBJ_BLOB      = 3
  OBJ_TAG       = 4
  # ?           = 5
  # type 5 is reserved
  # see: https://git-scm.com/docs/pack-format
  OBJ_OFS_DELTA = 6
  OBJ_REF_DELTA = 7

  class Packfile

    attr_reader :version, :git_objects, :data, :checksum

    def initialize(version = nil, objs)
      @version = version.nil? ? VERSION : version.to_i
      @git_objects = objs.kind_of?(Array) ? objs : [ objs ]

      pre_check_data = header + format_objects
      @checksum = Digest::SHA1.hexdigest(pre_check_data)
      @data = pre_check_data + [ @checksum ].pack('H*')
    end

    def header
      SIGNATURE + [ @version ].pack('N') + [ @git_objects.length ].pack('N')
    end

    # Each object has a variable-sized
    # header, with the size being determined
    # by the length of the object's original,
    # uncompressed content
    def format_objects
      type = 0
      obj_stream = []

      @git_objects.each do |obj|
        byte_amt = 1
        obj_data_size = obj.content.length
        case obj.type          
        when 'blob'
          type = OBJ_BLOB
        when 'tree'
          type = OBJ_TREE
        when 'commit'
          type = OBJ_COMMIT
        end
        
        num_bits = 0
        num = obj_data_size
        while num != 0
          num /= 2
          num_bits += 1
        end

        # the first byte can only hold
        # four bits of the size of the
        # object's content since the
        # leading bits are reserved for
        # value of MSB and object type
        if num_bits > 4
          if num_bits > 11
            byte_amt = num_bits / 7
            byte_amt += 1 if (num_bits % 7 > 0)
          else
            byte_amt = 2
          end
        end

        shift = 0
        (1..byte_amt).each do |byte|
          curr_byte = 0
          # set msb if needed
          if byte < byte_amt
            curr_byte |= 128
          end

          # set the object type
          # set last four bits for content size
          if byte == 1
            curr_byte |= (type << 4)
            curr_byte |= (obj_data_size & 15)
          else
            curr_byte = (obj_data_size >> 4 >> shift) & 127
            shift += 7
          end

          obj_stream << [ curr_byte ].pack('C*')
        end

        # Since the object type is denoted in the preceding
        # info, we only store the compressed object data
        obj_stream << Rex::Text.zlib_deflate(obj.content, Zlib::DEFAULT_COMPRESSION)
      end

      obj_stream = obj_stream.join
    end

    # Read the contents of the packfile and constructs
    # the objects found
    # @param [ String ] the packfile data
    # return Array of GitObjects found in the packfile
    def self.read_packfile(data)
      return nil unless data
      return nil if data.empty?

      pack_start = data.index('PACK')
      return nil unless pack_start

      data = data[pack_start..-1]
      version = data[4..7].unpack('N').first
      obj_count = data[8..11].unpack('N').first
      curr_pos = 12

      type = ''
      pack_objs = []
      (1..obj_count).each do |obj_index|
        # determine the current object's type first
        first_byte = data[curr_pos].unpack('C').first
        num_type = (first_byte & 0b01110000) >> 4
        case num_type
        when OBJ_COMMIT
          type = 'commit'
        when OBJ_TREE
          type = 'tree'
        when OBJ_BLOB
          type = 'blob'
        when OBJ_OFS_DELTA
          type = 'ofs-delta'
        when OBJ_REF_DELTA
          type = 'ref-delta'
        end

        # now determine the size of the object's uncompressed data
        shift = 4
        curr_byte = first_byte
        size = curr_byte & 0b00001111
        keep_reading = false
        if curr_byte >= 128
          keep_reading = true
        end

        curr_pos += 1
        while keep_reading
          curr_byte = data[curr_pos].unpack('C').first
          if curr_byte < 128
            keep_reading = false
          end

          size = (curr_byte << shift) | size
          shift += 7
          curr_pos += 1
        end

        # now decompress content and create Git object
        case type
        when 'ofs-delta'
          # get negative offset
          offset, curr_pos = get_variable_len_num(data, curr_pos)
          base_start = curr_pos - offset
          base_obj_sha = data[base_start..base_start+19].unpack('H*').first
        when 'ref-delta'
          base_obj_sha = data[curr_pos..curr_pos+19].unpack('H*').first
          curr_pos += 20
        end

        content = Rex::Text.zlib_inflate(data[curr_pos..-1])

        # delta objects are object types specific to packfile
        # and do not follow same format as other Git objects
        if type == 'ofs-delta' || type == 'ref-delta'
          delta_obj = read_delta(type, content, base_obj_sha)
          pack_objs << apply_delta(delta_obj, pack_objs)
        else
          sha1, compressed = GitObject.build_object(type, content)
          pack_objs << GitObject.new(type, content, sha1, compressed)
        end

        # update curr_pos to point to next obj header
        compressed_len = Rex::Text.zlib_deflate(content, Zlib::DEFAULT_COMPRESSION).length
        curr_pos = curr_pos + compressed_len
      end

      pack_objs
    end

    def self.read_delta(type, content, base_obj_sha)
      source_len = 0
      target_len = 0

      delta = { type: type, base: base_obj_sha }

      start = 0
      base_len, start = get_variable_len_num(content, start)
      target_len, start = get_variable_len_num(content, start)

      inst_type = ''
      inst = content[start].unpack('C').first
      start += 1
      num_bytes = 0
      if inst >= 128
        inst_type = 'copy'
        # now determine the offset
        shift = 0
        offset_mask = []
        off_bits = inst & 0b1111
        (0..3).each do |idx|
          if (off_bits >> idx) & 1 == 1
            num_bytes += 1
            offset_mask.prepend(0b11111111)
          else
            offset_mask.prepend(0b00000000)
          end
        end

        offset = 0
        unless num_bytes == 0
          shift = 0
          byte_idx = 0
          off_bytes = content[start].unpack("C#{num_bytes}")

          (0..3).each do |idx|
            if offset_mask[3 - idx] == 255
              offset |= ((off_bytes[byte_idx] & offset_mask[3 - idx]) << shift)
              byte_idx += 1
            else
              offset |= (0 << shift)
            end
            shift += 7
          end
        end

        delta[:offset] = offset
        size = 0
        num_bytes = 0
        size_mask = []
        size_bits = (inst & 0b01110000) >> 4
        start += num_bytes
        if size_bits == 0
          size = 0x10000
        else
          (0..2).each do |idx|
            if (size_bits >> idx) & 1 == 1
              num_bytes += 1
              size_mask.prepend(0b11111111)
            else
              size_mask.prepend(0b00000000)
            end
          end

          shift = 0
          byte_num = 0
          size_bytes = content[start].unpack("C#{num_bytes}")
          start += num_bytes
          (0..2).each do |idx|
            if size_mask[2 - idx] == 255
              size |= ((size_bytes[byte_num] & size_mask[2 - idx]) << shift)
              byte_num += 1
            else
              size |= (0 << shift)
            end
            shift += 7
          end
        end
      else
        inst_type = 'insert'
        size = inst & 0b0111111
        delta[:data] = content[start..start + size - 1]
      end
      delta[:size] = size
      delta[:inst] = inst_type

      delta
    end

    def self.apply_delta(delta, git_objects)
      target = nil

      case delta[:inst]
      when 'copy'
        base_obj = GitObject.find_object(delta[:base], git_objects)
        return nil unless base_obj

        offset = delta[:offset]
        size = delta[:size]
        type = base_obj.type

        content = base_obj.content
        content = content[offset..offset + size - 1]
        sha1, compressed = GitObject.build_object(type, content)
        target = GitObject.new(type, content, sha1, compressed)
      when 'insert'
        size = delta[:size]
        base_obj = GitObject.find_object(delta[:base], git_objects)
        type = base_obj.type
        sha1, compressed = GitObject.build_object(type, delta[:data])
        target = GitObject.new(type, delta[:data], sha1, compressed)
      end

      target
    end

    def self.get_variable_len_num(data, curr_pos)
      shift = 7
      curr_byte = data[curr_pos].unpack('C').first
      offset = curr_byte & 0b01111111
      curr_pos += 1

      while curr_byte >= 128
        curr_byte = data[curr_pos].unpack('C').first
        offset = (offset << shift) | (curr_byte & 0b01111111)
        shift += 7
        curr_pos += 1
      end
      new_pos = curr_pos
    
      return offset, new_pos
    end
  end
end
end