protofuzz/protofuzz.py
"""The entry points to the protofuzz module.
Usage:
>>> message_fuzzers = protofuzz.from_description_string('''
... message Address {
... required int32 house = 1;
... required string street = 2;
... }
... ''')
>>> for fuzzer in message_fuzzers:
... for obj in fuzzer.permute():
... print("Generated object: {}".format(obj))
...
Generated object: house: -1
street: "!"
Generated object: house: 0
street: "!"
Generated object: house: 256
street: "!"
(etc)
"""
# FIXME: Package containing module 'google' is not listed in project requirements
from google.protobuf import descriptor as D
from google.protobuf import message
from google.protobuf.internal import containers
from protofuzz import pbimport, gen, values
__all__ = [
"ProtobufGenerator",
"from_file",
"from_description_string",
"from_protobuf_class",
]
def _int_generator(descriptor, bitwidth, unsigned):
vals = list(values.get_integers(bitwidth, unsigned))
return gen.IterValueGenerator(descriptor.name, vals)
def _string_generator(descriptor, max_length=0, limit=0):
vals = list(values.get_strings(max_length, limit))
return gen.IterValueGenerator(descriptor.name, vals)
def _bytes_generator(descriptor, max_length=0, limit=0):
strs = values.get_strings(max_length, limit)
vals = [bytes(_, "utf-8") for _ in strs]
return gen.IterValueGenerator(descriptor.name, vals)
def _float_generator(descriptor, bitwidth):
return gen.IterValueGenerator(descriptor.name, values.get_floats(bitwidth))
def _enum_generator(descriptor):
vals = descriptor.enum_type.values_by_number.keys()
return gen.IterValueGenerator(descriptor.name, vals)
def _prototype_to_generator(descriptor, cls):
"""Return map of descriptor to a protofuzz generator."""
_fd = D.FieldDescriptor
generator = None
ints32 = [
_fd.TYPE_INT32,
_fd.TYPE_UINT32,
_fd.TYPE_FIXED32,
_fd.TYPE_SFIXED32,
_fd.TYPE_SINT32,
]
ints64 = [
_fd.TYPE_INT64,
_fd.TYPE_UINT64,
_fd.TYPE_FIXED64,
_fd.TYPE_SFIXED64,
_fd.TYPE_SINT64,
]
ints_signed = [
_fd.TYPE_INT32,
_fd.TYPE_SFIXED32,
_fd.TYPE_SINT32,
_fd.TYPE_INT64,
_fd.TYPE_SFIXED64,
_fd.TYPE_SINT64,
]
if descriptor.type in ints32 + ints64:
bitwidth = [32, 64][descriptor.type in ints64]
unsigned = descriptor.type not in ints_signed
generator = _int_generator(descriptor, bitwidth, unsigned)
elif descriptor.type == _fd.TYPE_DOUBLE:
generator = _float_generator(descriptor, 64)
elif descriptor.type == _fd.TYPE_FLOAT:
generator = _float_generator(descriptor, 32)
elif descriptor.type == _fd.TYPE_STRING:
generator = _string_generator(descriptor)
elif descriptor.type == _fd.TYPE_BYTES:
generator = _bytes_generator(descriptor)
elif descriptor.type == _fd.TYPE_BOOL:
generator = gen.IterValueGenerator(descriptor.name, [True, False])
elif descriptor.type == _fd.TYPE_ENUM:
generator = _enum_generator(descriptor)
elif descriptor.type == _fd.TYPE_MESSAGE:
generator = descriptor_to_generator(descriptor.message_type, cls)
generator.set_name(descriptor.name)
else:
raise RuntimeError("type {} unsupported".format(descriptor.type))
return generator
def descriptor_to_generator(cls_descriptor, cls, limit=0):
"""Convert protobuf descriptor to a protofuzz generator for same type."""
generators = []
for descriptor in cls_descriptor.fields_by_name.values():
generator = _prototype_to_generator(descriptor, cls)
if limit != 0:
generator.set_limit(limit)
generators.append(generator)
obj = cls(cls_descriptor.name, *generators)
return obj
def _assign_to_field(obj, name, val):
"""Return map of arbitrary value to a protobuf field."""
target = getattr(obj, name)
if isinstance(target, containers.RepeatedScalarFieldContainer):
target.append(val)
elif isinstance(target, containers.RepeatedCompositeFieldContainer):
target = target.add()
target.CopyFrom(val)
elif isinstance(target, (int, float, bool, str, bytes)):
setattr(obj, name, val)
elif isinstance(target, message.Message):
target.CopyFrom(val)
else:
raise RuntimeError("Unsupported type: {}".format(type(target)))
def _fields_to_object(descriptor, fields):
"""Convert descriptor and a set of fields to a Protobuf instance."""
# pylint: disable=protected-access
obj = descriptor._concrete_class()
for name, value in fields:
if isinstance(value, tuple):
subtype = descriptor.fields_by_name[name].message_type
value = _fields_to_object(subtype, value)
_assign_to_field(obj, name, value)
return obj
class ProtobufGenerator(object):
"""A "fuzzing strategy" class that is associated with a Protobuf class.
Currently, two strategies are supported:
- permute()
Generate permutations of fuzzed values for the fields.
- linear()
Generate fuzzed instances in lock-step (this is equivalent to running zip(*fields).
"""
def __init__(self, descriptor):
"""Protobufgenerator constructor."""
self._descriptor = descriptor
self._dependencies = []
def _iteration_helper(self, iter_class, limit):
generator = descriptor_to_generator(self._descriptor, iter_class)
if limit:
generator.set_limit(limit)
# Create dependencies before beginning generation
for args in self._dependencies:
generator.make_dependent(*args)
for fields in generator:
yield _fields_to_object(self._descriptor, fields)
def add_dependency(self, source, target, action):
"""Create a dependency between fields source and target via callable action.
>>> permuter = protofuzz.from_description_string('''
... message Address {
... required uint32 one = 1;
... required uint32 two = 2;
... }''')['Address']
>>> permuter.add_dependency('one', 'two', lambda val: max(0,val-1))
>>> for obj in permuter.linear():
... print("obj = {}".format(obj))
...
obj = one: 0
two: 1
obj = one: 256
two: 257
obj = one: 4096
two: 4097
obj = one: 1073741823
two: 1073741824
"""
self._dependencies.append((source, target, action))
def permute(self, limit=0):
"""Return a fuzzer that permutes all the fields with fuzzed values."""
return self._iteration_helper(gen.Product, limit)
def linear(self, limit=0):
"""Return a fuzzer that emulates "zip" behavior."""
return self._iteration_helper(gen.Zip, limit)
def _module_to_generators(pb_module):
"""Convert protobuf module to dict of generators.
This is typically used with modules that contain multiple type definitions.
"""
if not pb_module:
return None
message_types = pb_module.DESCRIPTOR.message_types_by_name
return {k: ProtobufGenerator(v) for k, v in message_types.items()}
def from_file(protobuf_file):
"""Return dict of generators from a path to a .proto file or pre-generated _pb2.py file.
_pb2.py file should be the output of the Protobuf compiler; users should not attempt to import arbitrary Python files.
Args:
protobuf_file(str) -- The path to the .proto file or pre-generated _pb2.py file.
Returns:
A dict indexed by message name of ProtobufGenerator objects.
These can be used to create inter-field dependencies or to generate messages.
Raises:
FileNotFoundError: If the _pb2.py file is not found
ModuleNotFoundError: If there is a nested protobuf import, see issue #11
BadProtobuf: If the .proto file is incorrectly formatted or named.
ProtocNotFound: If the protoc compiler was not found on $PATH.
Any Import Python module errors: e.g. AttributeError, IndentationError, etc if the _pb2.py file is not a valid generated file
"""
module = pbimport.from_file(protobuf_file)
return _module_to_generators(module)
def from_description_string(protobuf_desc):
"""Return dict of generators from a string representation of the .proto file.
Args:
protobuf_desc(str) -- The description of protobuf messages; contents of
what would usually go into a .proto file.
Returns:
A dict indexed by message name of ProtobufGenerator objects. These can
be used to create inter-field dependencies or to generate messages.
Raises:
ProtocNotFound: If the protoc compiler was not found on $PATH.
"""
module = pbimport.from_string(protobuf_desc)
return _module_to_generators(module)
def from_protobuf_class(protobuf_class):
"""Return generator for an already-loaded Protobuf class.
Args:
protobuf_class(Message) -- A class object created from Protobuf-
generated code.
Returns:
A ProtobufGenerator instance that can be used to create inter-field
dependencies or to generate messages.
"""
return ProtobufGenerator(protobuf_class.DESCRIPTOR)