enclose-io/compiler

View on GitHub
current/tools/inspector_protocol/code_generator.py

Summary

Maintainability
F
2 mos
Test Coverage
#!/usr/bin/env python
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import os.path
import sys
import argparse
import collections
import functools
import re
import copy
try:
    import json
except ImportError:
    import simplejson as json

import pdl

try:
    unicode
except NameError:
    # Define unicode for Py3
    def unicode(s, *_):
        return s

# Path handling for libraries and templates
# Paths have to be normalized because Jinja uses the exact template path to
# determine the hash used in the cache filename, and we need a pre-caching step
# to be concurrency-safe. Use absolute path because __file__ is absolute if
# module is imported, and relative if executed directly.
# If paths differ between pre-caching and individual file compilation, the cache
# is regenerated, which causes a race condition and breaks concurrent build,
# since some compile processes will try to read the partially written cache.
module_path, module_filename = os.path.split(os.path.realpath(__file__))

def read_config():
    # pylint: disable=W0703
    def json_to_object(data, output_base):
        def json_object_hook(object_dict):
            items = [(k, os.path.join(output_base, v) if k == "path" else v) for (k, v) in object_dict.items()]
            items = [(k, os.path.join(output_base, v) if k == "output" else v) for (k, v) in items]
            keys, values = list(zip(*items))
            # 'async' is a python 3.7 keyword. Don't use namedtuple(rename=True)
            # because that only renames it in python 3 but not python 2.
            keys = tuple('async_' if k == 'async' else k for k in keys)
            return collections.namedtuple('X', keys)(*values)
        return json.loads(data, object_hook=json_object_hook)

    def init_defaults(config_tuple, path, defaults):
        keys = list(config_tuple._fields)  # pylint: disable=E1101
        values = [getattr(config_tuple, k) for k in keys]
        for i in range(len(keys)):
            if hasattr(values[i], "_fields"):
                values[i] = init_defaults(values[i], path + "." + keys[i], defaults)
        for optional in defaults:
            if optional.find(path + ".") != 0:
                continue
            optional_key = optional[len(path) + 1:]
            if optional_key.find(".") == -1 and optional_key not in keys:
                keys.append(optional_key)
                values.append(defaults[optional])
        return collections.namedtuple('X', keys)(*values)

    try:
        cmdline_parser = argparse.ArgumentParser()
        cmdline_parser.add_argument("--output_base", type=unicode, required=True)
        cmdline_parser.add_argument("--jinja_dir", type=unicode, required=True)
        cmdline_parser.add_argument("--config", type=unicode, required=True)
        cmdline_parser.add_argument("--config_value", default=[], action="append")
        arg_options = cmdline_parser.parse_args()
        jinja_dir = arg_options.jinja_dir
        output_base = arg_options.output_base
        config_file = arg_options.config
        config_values = arg_options.config_value
    except Exception:
        # Work with python 2 and 3 http://docs.python.org/py3k/howto/pyporting.html
        exc = sys.exc_info()[1]
        sys.stderr.write("Failed to parse command-line arguments: %s\n\n" % exc)
        exit(1)

    try:
        config_json_file = open(config_file, "r")
        config_json_string = config_json_file.read()
        config_partial = json_to_object(config_json_string, output_base)
        config_json_file.close()
        defaults = {
            ".use_snake_file_names": False,
            ".use_title_case_methods": False,
            ".imported": False,
            ".imported.export_macro": "",
            ".imported.export_header": False,
            ".imported.header": False,
            ".imported.package": False,
            ".imported.options": False,
            ".protocol.export_macro": "",
            ".protocol.export_header": False,
            ".protocol.options": False,
            ".protocol.file_name_prefix": "",
            ".exported": False,
            ".exported.export_macro": "",
            ".exported.export_header": False,
            ".lib": False,
            ".lib.export_macro": "",
            ".lib.export_header": False,
        }
        for key_value in config_values:
            parts = key_value.split("=")
            if len(parts) == 2:
                defaults["." + parts[0]] = parts[1]
        return (jinja_dir, config_file, init_defaults(config_partial, "", defaults))
    except Exception:
        # Work with python 2 and 3 http://docs.python.org/py3k/howto/pyporting.html
        exc = sys.exc_info()[1]
        sys.stderr.write("Failed to parse config file: %s\n\n" % exc)
        exit(1)


# ---- Begin of utilities exposed to generator ----


def to_title_case(name):
    return name[:1].upper() + name[1:]


def dash_to_camelcase(word):
    prefix = ""
    if word[0] == "-":
        prefix = "Negative"
        word = word[1:]
    return prefix + "".join(to_title_case(x) or "-" for x in word.split("-"))


def to_snake_case(name):
    return re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name, sys.maxsize).lower()


def to_method_case(config, name):
    if config.use_title_case_methods:
        return to_title_case(name)
    return name


def join_arrays(dict, keys):
    result = []
    for key in keys:
        if key in dict:
            result += dict[key]
    return result


def format_include(config, header, file_name=None):
    if file_name is not None:
        header = header + "/" + file_name + ".h"
    header = "\"" + header + "\"" if header[0] not in "<\"" else header
    if config.use_snake_file_names:
        header = to_snake_case(header)
    return header


def format_domain_include(config, header, file_name):
    return format_include(config, header, config.protocol.file_name_prefix + file_name)


def to_file_name(config, file_name):
    if config.use_snake_file_names:
        return to_snake_case(file_name).replace(".cpp", ".cc")
    return file_name


# ---- End of utilities exposed to generator ----


def initialize_jinja_env(jinja_dir, cache_dir, config):
    # pylint: disable=F0401
    sys.path.insert(1, os.path.abspath(jinja_dir))
    import jinja2

    jinja_env = jinja2.Environment(
        loader=jinja2.FileSystemLoader(module_path),
        # Bytecode cache is not concurrency-safe unless pre-cached:
        # if pre-cached this is read-only, but writing creates a race condition.
        bytecode_cache=jinja2.FileSystemBytecodeCache(cache_dir),
        keep_trailing_newline=True,  # newline-terminate generated files
        lstrip_blocks=True,  # so can indent control flow tags
        trim_blocks=True)
    jinja_env.filters.update({"to_title_case": to_title_case, "dash_to_camelcase": dash_to_camelcase, "to_method_case": functools.partial(to_method_case, config)})
    jinja_env.add_extension("jinja2.ext.loopcontrols")
    return jinja_env


def create_imported_type_definition(domain_name, type, imported_namespace):
    # pylint: disable=W0622
    return {
        "return_type": "std::unique_ptr<%s::%s::API::%s>" % (imported_namespace, domain_name, type["id"]),
        "pass_type": "std::unique_ptr<%s::%s::API::%s>" % (imported_namespace, domain_name, type["id"]),
        "to_raw_type": "%s.get()",
        "to_pass_type": "std::move(%s)",
        "to_rvalue": "std::move(%s)",
        "type": "std::unique_ptr<%s::%s::API::%s>" % (imported_namespace, domain_name, type["id"]),
        "raw_type": "%s::%s::API::%s" % (imported_namespace, domain_name, type["id"]),
        "raw_pass_type": "%s::%s::API::%s*" % (imported_namespace, domain_name, type["id"]),
        "raw_return_type": "%s::%s::API::%s*" % (imported_namespace, domain_name, type["id"]),
    }


def create_user_type_definition(domain_name, type):
    # pylint: disable=W0622
    return {
        "return_type": "std::unique_ptr<protocol::%s::%s>" % (domain_name, type["id"]),
        "pass_type": "std::unique_ptr<protocol::%s::%s>" % (domain_name, type["id"]),
        "to_raw_type": "%s.get()",
        "to_pass_type": "std::move(%s)",
        "to_rvalue": "std::move(%s)",
        "type": "std::unique_ptr<protocol::%s::%s>" % (domain_name, type["id"]),
        "raw_type": "protocol::%s::%s" % (domain_name, type["id"]),
        "raw_pass_type": "protocol::%s::%s*" % (domain_name, type["id"]),
        "raw_return_type": "protocol::%s::%s*" % (domain_name, type["id"]),
    }


def create_object_type_definition():
    # pylint: disable=W0622
    return {
        "return_type": "std::unique_ptr<protocol::DictionaryValue>",
        "pass_type": "std::unique_ptr<protocol::DictionaryValue>",
        "to_raw_type": "%s.get()",
        "to_pass_type": "std::move(%s)",
        "to_rvalue": "std::move(%s)",
        "type": "std::unique_ptr<protocol::DictionaryValue>",
        "raw_type": "protocol::DictionaryValue",
        "raw_pass_type": "protocol::DictionaryValue*",
        "raw_return_type": "protocol::DictionaryValue*",
    }


def create_any_type_definition():
    # pylint: disable=W0622
    return {
        "return_type": "std::unique_ptr<protocol::Value>",
        "pass_type": "std::unique_ptr<protocol::Value>",
        "to_raw_type": "%s.get()",
        "to_pass_type": "std::move(%s)",
        "to_rvalue": "std::move(%s)",
        "type": "std::unique_ptr<protocol::Value>",
        "raw_type": "protocol::Value",
        "raw_pass_type": "protocol::Value*",
        "raw_return_type": "protocol::Value*",
    }


def create_string_type_definition():
    # pylint: disable=W0622
    return {
        "return_type": "String",
        "pass_type": "const String&",
        "to_pass_type": "%s",
        "to_raw_type": "%s",
        "to_rvalue": "%s",
        "type": "String",
        "raw_type": "String",
        "raw_pass_type": "const String&",
        "raw_return_type": "String",
    }


def create_binary_type_definition():
    # pylint: disable=W0622
    return {
        "return_type": "Binary",
        "pass_type": "const Binary&",
        "to_pass_type": "%s",
        "to_raw_type": "%s",
        "to_rvalue": "%s",
        "type": "Binary",
        "raw_type": "Binary",
        "raw_pass_type": "const Binary&",
        "raw_return_type": "Binary",
    }


def create_primitive_type_definition(type):
    # pylint: disable=W0622
    typedefs = {
        "number": "double",
        "integer": "int",
        "boolean": "bool"
    }
    defaults = {
        "number": "0",
        "integer": "0",
        "boolean": "false"
    }
    jsontypes = {
        "number": "TypeDouble",
        "integer": "TypeInteger",
        "boolean": "TypeBoolean",
    }
    return {
        "return_type": typedefs[type],
        "pass_type": typedefs[type],
        "to_pass_type": "%s",
        "to_raw_type": "%s",
        "to_rvalue": "%s",
        "type": typedefs[type],
        "raw_type": typedefs[type],
        "raw_pass_type": typedefs[type],
        "raw_return_type": typedefs[type],
        "default_value": defaults[type]
    }


def wrap_array_definition(type):
    # pylint: disable=W0622
    return {
        "return_type": "std::unique_ptr<protocol::Array<%s>>" % type["raw_type"],
        "pass_type": "std::unique_ptr<protocol::Array<%s>>" % type["raw_type"],
        "to_raw_type": "%s.get()",
        "to_pass_type": "std::move(%s)",
        "to_rvalue": "std::move(%s)",
        "type": "std::unique_ptr<protocol::Array<%s>>" % type["raw_type"],
        "raw_type": "protocol::Array<%s>" % type["raw_type"],
        "raw_pass_type": "protocol::Array<%s>*" % type["raw_type"],
        "raw_return_type": "protocol::Array<%s>*" % type["raw_type"],
        "out_type": "protocol::Array<%s>&" % type["raw_type"],
    }


class Protocol(object):
    def __init__(self, config):
        self.config = config
        self.json_api = {"domains": []}
        self.imported_domains = []
        self.exported_domains = []
        self.generate_domains = self.read_protocol_file(config.protocol.path)

        if config.protocol.options:
            self.generate_domains = [rule.domain for rule in config.protocol.options]
            self.exported_domains = [rule.domain for rule in config.protocol.options if hasattr(rule, "exported")]

        if config.imported:
            self.imported_domains = self.read_protocol_file(config.imported.path)
            if config.imported.options:
                self.imported_domains = [rule.domain for rule in config.imported.options]

        self.patch_full_qualified_refs()
        self.create_notification_types()
        self.create_type_definitions()
        self.generate_used_types()


    def read_protocol_file(self, file_name):
        input_file = open(file_name, "r")
        parsed_json = pdl.loads(input_file.read(), file_name)
        input_file.close()
        version = parsed_json["version"]["major"] + "." + parsed_json["version"]["minor"]
        domains = []
        for domain in parsed_json["domains"]:
            domains.append(domain["domain"])
            domain["version"] = version
        self.json_api["domains"] += parsed_json["domains"]
        return domains


    def patch_full_qualified_refs(self):
        def patch_full_qualified_refs_in_domain(json, domain_name):
            if isinstance(json, list):
                for item in json:
                    patch_full_qualified_refs_in_domain(item, domain_name)
            if not isinstance(json, dict):
                return
            for key in json:
                if key == "type" and json[key] == "string":
                    json[key] = domain_name + ".string"
                if key != "$ref":
                    patch_full_qualified_refs_in_domain(json[key], domain_name)
                    continue
                if json["$ref"].find(".") == -1:
                    json["$ref"] = domain_name + "." + json["$ref"]
            return

        for domain in self.json_api["domains"]:
            patch_full_qualified_refs_in_domain(domain, domain["domain"])


    def all_references(self, json):
        refs = set()
        if isinstance(json, list):
            for item in json:
                refs |= self.all_references(item)
        if not isinstance(json, dict):
            return refs
        for key in json:
            if key != "$ref":
                refs |= self.all_references(json[key])
            else:
                refs.add(json["$ref"])
        return refs

    def generate_used_types(self):
        all_refs = set()
        for domain in self.json_api["domains"]:
            domain_name = domain["domain"]
            if "commands" in domain:
                for command in domain["commands"]:
                    if self.generate_command(domain_name, command["name"]):
                        all_refs |= self.all_references(command)
            if "events" in domain:
                for event in domain["events"]:
                    if self.generate_event(domain_name, event["name"]):
                        all_refs |= self.all_references(event)
                        all_refs.add(domain_name + "." + to_title_case(event["name"]) + "Notification")

        dependencies = self.generate_type_dependencies()
        queue = set(all_refs)
        while len(queue):
            ref = queue.pop()
            if ref in dependencies:
                queue |= dependencies[ref] - all_refs
                all_refs |= dependencies[ref]
        self.used_types = all_refs


    def generate_type_dependencies(self):
        dependencies = dict()
        domains_with_types = (x for x in self.json_api["domains"] if "types" in x)
        for domain in domains_with_types:
            domain_name = domain["domain"]
            for type in domain["types"]:
                related_types = self.all_references(type)
                if len(related_types):
                    dependencies[domain_name + "." + type["id"]] = related_types
        return dependencies


    def create_notification_types(self):
        for domain in self.json_api["domains"]:
            if "events" in domain:
                for event in domain["events"]:
                    event_type = dict()
                    event_type["description"] = "Wrapper for notification params"
                    event_type["type"] = "object"
                    event_type["id"] = to_title_case(event["name"]) + "Notification"
                    if "parameters" in event:
                        event_type["properties"] = copy.deepcopy(event["parameters"])
                    if "types" not in domain:
                        domain["types"] = list()
                    domain["types"].append(event_type)


    def create_type_definitions(self):
        imported_namespace = "::".join(self.config.imported.namespace) if self.config.imported else ""
        self.type_definitions = {}
        self.type_definitions["number"] = create_primitive_type_definition("number")
        self.type_definitions["integer"] = create_primitive_type_definition("integer")
        self.type_definitions["boolean"] = create_primitive_type_definition("boolean")
        self.type_definitions["object"] = create_object_type_definition()
        self.type_definitions["any"] = create_any_type_definition()
        self.type_definitions["binary"] = create_binary_type_definition()
        for domain in self.json_api["domains"]:
            self.type_definitions[domain["domain"] + ".string"] = create_string_type_definition()
            self.type_definitions[domain["domain"] + ".binary"] = create_binary_type_definition()
            if not ("types" in domain):
                continue
            for type in domain["types"]:
                type_name = domain["domain"] + "." + type["id"]
                if type["type"] == "object" and domain["domain"] in self.imported_domains:
                    self.type_definitions[type_name] = create_imported_type_definition(domain["domain"], type, imported_namespace)
                elif type["type"] == "object":
                    self.type_definitions[type_name] = create_user_type_definition(domain["domain"], type)
                elif type["type"] == "array":
                    self.type_definitions[type_name] = self.resolve_type(type)
                elif type["type"] == domain["domain"] + ".string":
                    self.type_definitions[type_name] = create_string_type_definition()
                elif type["type"] == domain["domain"] + ".binary":
                    self.type_definitions[type_name] = create_binary_type_definition()
                else:
                    self.type_definitions[type_name] = create_primitive_type_definition(type["type"])


    def check_options(self, options, domain, name, include_attr, exclude_attr, default):
        for rule in options:
            if rule.domain != domain:
                continue
            if include_attr and hasattr(rule, include_attr):
                return name in getattr(rule, include_attr)
            if exclude_attr and hasattr(rule, exclude_attr):
                return name not in getattr(rule, exclude_attr)
            return default
        return False


    # ---- Begin of methods exposed to generator


    def type_definition(self, name):
        return self.type_definitions[name]


    def resolve_type(self, prop):
        if "$ref" in prop:
            return self.type_definitions[prop["$ref"]]
        if prop["type"] == "array":
            return wrap_array_definition(self.resolve_type(prop["items"]))
        return self.type_definitions[prop["type"]]


    def generate_command(self, domain, command):
        if not self.config.protocol.options:
            return domain in self.generate_domains
        return self.check_options(self.config.protocol.options, domain, command, "include", "exclude", True)


    def generate_event(self, domain, event):
        if not self.config.protocol.options:
            return domain in self.generate_domains
        return self.check_options(self.config.protocol.options, domain, event, "include_events", "exclude_events", True)


    def generate_type(self, domain, typename):
        return domain + "." + typename in self.used_types


    def is_async_command(self, domain, command):
        if not self.config.protocol.options:
            return False
        return self.check_options(self.config.protocol.options, domain, command, "async_", None, False)


    def is_exported(self, domain, name):
        if not self.config.protocol.options:
            return False
        return self.check_options(self.config.protocol.options, domain, name, "exported", None, False)


    def is_imported(self, domain, name):
        if not self.config.imported:
            return False
        if not self.config.imported.options:
            return domain in self.imported_domains
        return self.check_options(self.config.imported.options, domain, name, "imported", None, False)


    def is_exported_domain(self, domain):
        return domain in self.exported_domains


    def generate_disable(self, domain):
        if "commands" not in domain:
            return True
        for command in domain["commands"]:
            if command["name"] == "disable" and self.generate_command(domain["domain"], "disable"):
                return False
        return True


    def is_imported_dependency(self, domain):
        return domain in self.generate_domains or domain in self.imported_domains


def main():
    jinja_dir, config_file, config = read_config()

    protocol = Protocol(config)

    if not config.exported and len(protocol.exported_domains):
        sys.stderr.write("Domains [%s] are exported, but config is missing export entry\n\n" % ", ".join(protocol.exported_domains))
        exit(1)

    if not os.path.exists(config.protocol.output):
        os.mkdir(config.protocol.output)
    if len(protocol.exported_domains) and not os.path.exists(config.exported.output):
        os.mkdir(config.exported.output)
    jinja_env = initialize_jinja_env(jinja_dir, config.protocol.output, config)

    inputs = []
    inputs.append(__file__)
    inputs.append(config_file)
    inputs.append(config.protocol.path)
    if config.imported:
        inputs.append(config.imported.path)
    templates_dir = os.path.join(module_path, "templates")
    inputs.append(os.path.join(templates_dir, "TypeBuilder_h.template"))
    inputs.append(os.path.join(templates_dir, "TypeBuilder_cpp.template"))
    inputs.append(os.path.join(templates_dir, "Exported_h.template"))
    inputs.append(os.path.join(templates_dir, "Imported_h.template"))

    h_template = jinja_env.get_template("templates/TypeBuilder_h.template")
    cpp_template = jinja_env.get_template("templates/TypeBuilder_cpp.template")
    exported_template = jinja_env.get_template("templates/Exported_h.template")
    imported_template = jinja_env.get_template("templates/Imported_h.template")

    outputs = dict()

    for domain in protocol.json_api["domains"]:
        class_name = domain["domain"]
        file_name = config.protocol.file_name_prefix + class_name
        template_context = {
            "protocol": protocol,
            "config": config,
            "domain": domain,
            "join_arrays": join_arrays,
            "format_include": functools.partial(format_include, config),
            "format_domain_include": functools.partial(format_domain_include, config),
        }

        if domain["domain"] in protocol.generate_domains:
            outputs[os.path.join(config.protocol.output, to_file_name(config, file_name + ".h"))] = h_template.render(template_context)
            outputs[os.path.join(config.protocol.output, to_file_name(config, file_name + ".cpp"))] = cpp_template.render(template_context)
            if domain["domain"] in protocol.exported_domains:
                outputs[os.path.join(config.exported.output, to_file_name(config, file_name + ".h"))] = exported_template.render(template_context)
        if domain["domain"] in protocol.imported_domains:
            outputs[os.path.join(config.protocol.output, to_file_name(config, file_name + ".h"))] = imported_template.render(template_context)

    if config.lib:
        template_context = {
            "config": config,
            "format_include": functools.partial(format_include, config),
        }

        lib_templates_dir = os.path.join(module_path, "lib")
        # Note these should be sorted in the right order.
        # TODO(dgozman): sort them programmatically based on commented includes.
        protocol_h_templates = [
            "ErrorSupport_h.template",
            "Values_h.template",
            "Object_h.template",
            "ValueConversions_h.template",
            "Maybe_h.template",
            "Array_h.template",
            "DispatcherBase_h.template",
            "Parser_h.template",
            "encoding_h.template",
        ]

        protocol_cpp_templates = [
            "Protocol_cpp.template",
            "ErrorSupport_cpp.template",
            "Values_cpp.template",
            "Object_cpp.template",
            "DispatcherBase_cpp.template",
            "Parser_cpp.template",
            "encoding_cpp.template",
        ]

        forward_h_templates = [
            "Forward_h.template",
            "Allocator_h.template",
            "FrontendChannel_h.template",
        ]

        base_string_adapter_h_templates = [
            "base_string_adapter_h.template",
        ]

        base_string_adapter_cc_templates = [
            "base_string_adapter_cc.template",
        ]

        def generate_lib_file(file_name, template_files):
            parts = []
            for template_file in template_files:
                inputs.append(os.path.join(lib_templates_dir, template_file))
                template = jinja_env.get_template("lib/" + template_file)
                parts.append(template.render(template_context))
            outputs[file_name] = "\n\n".join(parts)

        generate_lib_file(os.path.join(config.lib.output, to_file_name(config, "Forward.h")), forward_h_templates)
        generate_lib_file(os.path.join(config.lib.output, to_file_name(config, "Protocol.h")), protocol_h_templates)
        generate_lib_file(os.path.join(config.lib.output, to_file_name(config, "Protocol.cpp")), protocol_cpp_templates)
        generate_lib_file(os.path.join(config.lib.output, to_file_name(config, "base_string_adapter.h")), base_string_adapter_h_templates)
        generate_lib_file(os.path.join(config.lib.output, to_file_name(config, "base_string_adapter.cc")), base_string_adapter_cc_templates)

    # Make gyp / make generatos happy, otherwise make rebuilds world.
    inputs_ts = max(map(os.path.getmtime, inputs))
    up_to_date = True
    for output_file in outputs.keys():
        if not os.path.exists(output_file) or os.path.getmtime(output_file) < inputs_ts:
            up_to_date = False
            break
    if up_to_date:
        sys.exit()

    for file_name, content in outputs.items():
        out_file = open(file_name, "w")
        out_file.write(content)
        out_file.close()


main()