rapid7/metasploit-framework

View on GitHub
lib/snmp/varbind.rb

Summary

Maintainability
C
7 hrs
Test Coverage
#
# Copyright (c) 2004 David R. Halliday
# All rights reserved.
#
# This SNMP library is free software.  Redistribution is permitted under the
# same terms and conditions as the standard Ruby distribution.  See the
# COPYING file in the Ruby distribution for details.
#

require 'snmp/ber'

include SNMP::BER

module SNMP

class UnsupportedValueTag < RuntimeError; end
class InvalidIpAddress < ArgumentError; end

class VarBindList < Array
    def self.decode(data)
        list = VarBindList.new
        varbind_data, remainder = decode_sequence(data)
        while varbind_data != ""
            varbind, varbind_data = VarBind.decode(varbind_data)
            list << varbind
        end
        return list, remainder    
    end

    def initialize(varbind_list=[])
        super()
        if varbind_list.respond_to? :to_str
            self << ObjectId.new(varbind_list.to_str).to_varbind
        elsif varbind_list.respond_to? :to_varbind
            self << varbind_list.to_varbind
        else
            varbind_list.each do |item|
                if item.respond_to? :to_str
                    self << ObjectId.new(item.to_str).to_varbind
                else
                    self << item.to_varbind
                end
            end
        end
    end
    
    def asn1_type
        "VarBindList"
    end
    
    def encode
        varbind_data = ""
        self.each do |varbind|
            varbind_data << varbind.encode
        end
        encode_sequence(varbind_data)
    end
end

class Integer
    include Comparable
    
    def self.decode(value_data)
        Integer.new(decode_integer_value(value_data))
    end
    
    def asn1_type
        "INTEGER"
    end

    def initialize(value)
        @value = value.to_i
    end

    def <=>(other)
        @value <=> other.to_i
    end
    
    def coerce(other)
        if other.kind_of? Integer
            return [other, @value]
        else
            return [other.to_f, self.to_f]
        end
    end
    
    def to_s
        @value.to_s
    end

    def to_i
        @value
    end
    
    def to_f
        @value.to_f
    end
    
    def encode
        encode_integer(@value)
    end
    
    def to_oid
        raise RangeError, "@{value} cannot be an OID (must be >0)" if @value < 0
        ObjectId.new([@value])
    end
end

class Integer32 < Integer
    def initialize(value)
        super(value)
        raise ArgumentError, "Out of range: #{value}" if value < -2147483648
        raise ArgumentError, "Out of range: #{value}" if value > 2147483647
    end
end

class OctetString < String
    def self.decode(value_data)
        OctetString.new(value_data)
    end

    def asn1_type
        "OCTET STRING"
    end
    
    def encode
        encode_octet_string(self)
    end
    
    def to_oid
        oid = ObjectId.new
        each_byte { |b| oid << b }
        oid
    end
end

class ObjectId < Array
    include Comparable
    
    def self.decode(value_data)
        ObjectId.new(decode_object_id_value(value_data))
    end

    def asn1_type
        "OBJECT IDENTIFIER"
    end
    
    ##
    # Create an object id.  The input is expected to be either a string
    # in the format "n.n.n.n.n.n" or an array of integers.
    #
    def initialize(id=[])
        if id.nil?
            raise ArgumentError
        elsif id.respond_to? :to_str
            super(make_integers(id.to_str.split(".")))
        else
            super(make_integers(id.to_ary))
        end
    rescue ArgumentError
        raise ArgumentError, "#{id.inspect}:#{id.class} not a valid object ID"
    end
    
    def to_varbind
        VarBind.new(self, Null)
    end
    
    def to_oid
        self
    end
    
    def to_s
        self.join('.')
    end
    
    def inspect
        "[#{self.to_s}]"
    end
    
    def encode
        encode_object_id(self)
    end
    
    ##
    # Returns true if this ObjectId is a subtree of the provided parent tree
    # ObjectId.  For example, "1.3.6.1.5" is a subtree of "1.3.6.1".
    #
    def subtree_of?(parent_tree)
        parent_tree = make_object_id(parent_tree)
        if parent_tree.length > self.length
            false
        else
            parent_tree.each_index do |i|
                return false if parent_tree[i] != self[i]
            end
            true
        end
    end
    
    ##
    # Returns an index based on the difference between this ObjectId
    # and the provided parent ObjectId.
    #
    # For example, ObjectId.new("1.3.6.1.5").index("1.3.6.1") returns an
    # ObjectId of "5".
    #
    def index(parent_tree)
        parent_tree = make_object_id(parent_tree)
        if not subtree_of?(parent_tree)
            raise ArgumentError, "#{self.to_s} not a subtree of #{parent_tree.to_s}"
        elsif self.length == parent_tree.length
            raise ArgumentError, "OIDs are the same"
        else
            ObjectId.new(self[parent_tree.length..-1])
        end
    end
    
    private
    
    def make_integers(list)
        list.collect{|n| Integer(n)}
    end 
    
    def make_object_id(oid)
        oid.kind_of?(ObjectId) ? oid : ObjectId.new(oid)
    end
    
end

class IpAddress
    class << self
        def decode(value_data)
            IpAddress.new(value_data)
        end
    end

    def asn1_type
        "IpAddress"
    end

    ##
    # Create an IpAddress object.  The constructor accepts either a raw
    # four-octet string or a formatted string of integers separated by dots
    # (i.e. "10.1.2.3").
    #
    def initialize(value_data)
        ip = value_data.to_str
        if ip.length > 4
            ip = parse_string(ip)
        elsif ip.length != 4
            raise InvalidIpAddress, "Expected 4 octets or formatted string, got #{value_data.inspect}"
        end
        @value = ip
    end
    
    ##
    # Returns a raw four-octet string representing this IpAddress.
    #
    def to_str
        @value.dup
    end
    
    ##
    # Returns a formatted, dot-separated string representing this IpAddress.
    #
    def to_s
        octets = []
        @value.each_byte { |b| octets << b.to_s }
        octets.join('.')    
    end
    
    def to_oid
        oid = ObjectId.new
        @value.each_byte { |b| oid << b }
        oid
    end
    
    def ==(other)
        if other.respond_to? :to_str
            return @value.eql?(other.to_str)
        else
            return false
        end
    end
    
    def eql?(other)
        self == other
    end
    
    def hash
        @value.hash
    end
    
    def encode
        encode_tlv(IpAddress_TAG, @value)
    end

    private
    def parse_string(ip_string)
        parts = ip_string.split(".")
        raise InvalidIpAddress, ip_string.inspect if parts.length != 4
        value_data = ""
        parts.each do |s| 
            octet = s.to_i
            raise InvalidIpAddress, ip_string.inspect if octet > 255
            raise InvalidIpAddress, ip_string.inspect if octet < 0
            value_data << octet.chr
        end
        value_data
    end

end

class UnsignedInteger < Integer
    def initialize(value)
        super(value)
        raise ArgumentError, "Negative integer invalid: #{value}" if value < 0
        raise ArgumentError, "Out of range: #{value}" if value > 4294967295
    end
    
    def self.decode(value_data)
        self.new(decode_uinteger_value(value_data))
    end
end

class Counter32 < UnsignedInteger
    def asn1_type
        "Counter32"
    end

    def encode
        encode_tagged_integer(Counter32_TAG, @value)
    end
end

class Gauge32 < UnsignedInteger
    def asn1_type
        "Gauge32"
    end

    def encode
        encode_tagged_integer(Gauge32_TAG, @value)
    end
end

class Unsigned32 < UnsignedInteger
    def asn1_type
        "Unsigned32"
    end

    def encode
        encode_tagged_integer(Unsigned32_TAG, @value)
    end
end

class TimeTicks < UnsignedInteger
    def asn1_type
        "TimeTicks"
    end

    def encode
        encode_tagged_integer(TimeTicks_TAG, @value)
    end
    
    def to_s
        days, remainder = @value.divmod(8640000)
        hours, remainder = remainder.divmod(360000)
        minutes, remainder = remainder.divmod(6000)
        seconds, hundredths = remainder.divmod(100)
        case
            when days < 1
                sprintf('%02d:%02d:%02d.%02d',
                        hours, minutes, seconds, hundredths)
            when days == 1
                sprintf('1 day, %02d:%02d:%02d.%02d',
                        hours, minutes, seconds, hundredths)
            when days > 1
                sprintf('%d days, %02d:%02d:%02d.%02d',
                        days, hours, minutes, seconds, hundredths)
        end
    end
end

class Opaque < OctetString
    def self.decode(value_data)
        Opaque.new(value_data)
    end
    
    def asn1_type
        "Opaque"
    end
    
    def encode
        encode_tlv(Opaque_TAG, self)
    end
end

class Counter64 < Integer
    def self.decode(value_data)
        Counter64.new(decode_integer_value(value_data))
    end

    def asn1_type
        "Counter64"
    end

    def initialize(value)
        super(value)
        raise ArgumentError, "Negative integer invalid: #{value}" if value < 0
        raise ArgumentError, "Out of range: #{value}" if value > 18446744073709551615
    end    
    
    def encode
        encode_tagged_integer(Counter64_TAG, @value)
    end
end

class Null
    class << self
        def decode(value_data)
            Null
        end

        def encode
            encode_null
        end
        
        def asn1_type
            'Null'
        end

        def to_s
            asn1_type
        end
    end
end

class NoSuchObject
    class << self
        def decode(value_data)
            NoSuchObject
        end

        def encode
            encode_exception(NoSuchObject_TAG)
        end

        def asn1_type
            'noSuchObject'
        end
        
        def to_s
            asn1_type
        end
    end
end

class NoSuchInstance
    class << self
        def decode(value_data)
            NoSuchInstance
        end

        def encode
            encode_exception(NoSuchInstance_TAG)
        end
        
        def asn1_type
            'noSuchInstance'
        end
        
        def to_s
            asn1_type
        end
    end
end

class EndOfMibView
    class << self
        def decode(value_data)
            EndOfMibView
        end

        def encode
            encode_exception(EndOfMibView_TAG)
        end
        
        def asn1_type
            'endOfMibView'
        end
        
        def to_s
            asn1_type
        end
    end
end

class VarBind
    attr_accessor :name
    attr_accessor :value
    
    alias :oid :name
    
    class << self
        def decode(data)
            varbind_data, remaining_varbind_data = decode_sequence(data)
            name, remainder = decode_object_id(varbind_data)
            value, remainder = decode_value(remainder)
            assert_no_remainder(remainder)
            return VarBind.new(name, value), remaining_varbind_data
        end

        ValueDecoderMap = {
            INTEGER_TAG           => Integer,
            OCTET_STRING_TAG      => OctetString,
            NULL_TAG              => Null,
            OBJECT_IDENTIFIER_TAG => ObjectId,
            IpAddress_TAG         => IpAddress,
            Counter32_TAG         => Counter32,
            Gauge32_TAG           => Gauge32,
            # note Gauge32 tag same as Unsigned32
            TimeTicks_TAG         => TimeTicks,
            Opaque_TAG            => Opaque,
            Counter64_TAG         => Counter64,
            NoSuchObject_TAG      => NoSuchObject,
            NoSuchInstance_TAG    => NoSuchInstance,
            EndOfMibView_TAG      => EndOfMibView
        }
        
        def decode_value(data)
            value_tag, value_data, remainder = decode_tlv(data)
            decoder_class = ValueDecoderMap[value_tag]
            if decoder_class
                value = decoder_class.decode(value_data)
            else
               raise UnsupportedValueTag, value_tag.to_s
            end
            return value, remainder
        end
    end
    
    def initialize(name, value=Null)
        if name.kind_of? ObjectId
            @name = name
        else
            @name = ObjectName.new(name)
        end
        @value = value
    end

    def asn1_type
        "VarBind"
    end
    
    def to_varbind
        self
    end
    
    def to_s
        "[name=#{@name.to_s}, value=#{@value.to_s} (#{@value.asn1_type})]"
    end
    
    def each
        yield self
    end
    
    def encode
        data = encode_object_id(@name) << value.encode
        encode_sequence(data)
    end
end

class ObjectName < ObjectId
    def asn1_type
        "ObjectName"
    end
end

end