lib/xdrgen/generators/python.rb
# Note:
# 1. If the .x file contains Python reserved words, I suggest you change them to non-reserved words.
# 2. You can generate the file with the following command
# xdrgen -o OUTPUT_DIR INPUT -l python
# 3. The generated code is unformatted, I suggest you format it by the following command:
# autoflake --in-place --ignore-init-module-imports --remove-all-unused-imports OUTPUT_DIR/*.py
# isort OUTPUT_DIR/
# black OUTPUT_DIR/
module Xdrgen
module Generators
class Python < Xdrgen::Generators::Base
MAX_SIZE = (2 ** 32) - 1
CIRCLE_IMPORT_UNION = %w[SCVal SCSpecTypeDef]
def generate
@constants_out = @output.open("constants.py")
@constants_out.puts <<-EOS.strip_heredoc
# This is an automatically generated file.
# DO NOT EDIT or your changes may be overwritten
EOS
@init_out = @output.open("__init__.py")
@init_out.puts <<-EOS.strip_heredoc
# Automatically generated by xdrgen
# DO NOT EDIT or your changes may be overwritten
from .base import *
from .constants import *
EOS
render_base_classes
render_definitions(@top)
end
private
def render_definitions(node)
node.definitions.each { |n| render_definition n }
node.namespaces.each { |n| render_definitions n }
end
def render_nested_definitions(defn)
return unless defn.respond_to? :nested_definitions
defn.nested_definitions.each { |ndefn| render_definition ndefn }
end
def render_definition(defn)
render_nested_definitions(defn)
case defn
when AST::Definitions::Struct;
render_struct defn
when AST::Definitions::Enum
render_enum defn
when AST::Definitions::Union;
if CIRCLE_IMPORT_UNION.include?(defn.name)
render_union defn, true
else
render_union defn
end
when AST::Definitions::Typedef
render_typedef defn
when AST::Definitions::Const
render_const defn
end
end
def render_const(const)
render_const_source_comment @constants_out, const
@constants_out.puts "#{const.name}: int = #{const.value}"
end
def render_enum(enum)
enum_name = name enum
enum_name_underscore = enum_name.underscore
@init_out.puts "from .#{enum_name_underscore} import #{enum_name}"
file_name = "#{enum_name_underscore}.py"
out = @output.open(file_name)
render_common_import out
out.puts "__all__ = ['#{enum_name}']"
out.puts "class #{enum_name}(IntEnum):"
out.indent(2) do
render_source_comment out, enum
enum.members.each do |member|
out.puts "#{member.name} = #{member.value}"
end
out.puts <<~HEREDOC
def pack(self, packer: Packer) -> None:
packer.pack_int(self.value)
@classmethod
def unpack(cls, unpacker: Unpacker) -> #{enum_name}:
value = unpacker.unpack_int()
return cls(value)
HEREDOC
render_xdr_utils out, enum_name
out.close
end
end
def render_typedef(typedef)
typedef_name = typedef.name.camelize
typedef_name_underscore = typedef.name.underscore
@init_out.puts "from .#{typedef_name_underscore} import #{typedef_name}"
file_name = "#{typedef_name_underscore}.py"
out = @output.open(file_name)
render_common_import out
render_import out, typedef, typedef_name
out.puts "__all__ = ['#{typedef_name}']"
out.puts "class #{typedef_name}:"
out.indent(2) do
render_source_comment(out, typedef)
out.puts "def __init__(self, #{typedef_name_underscore}: #{type_hint_string typedef, typedef_name}) -> None:"
out.indent(2) do
render_array_length_checker typedef, out
out.puts "self.#{typedef_name_underscore} = #{typedef_name_underscore}"
end
out.puts "def pack(self, packer: Packer) -> None:"
out.indent(2) do
encode_member typedef, out
end
out.puts "@classmethod"
out.puts "def unpack(cls, unpacker: Unpacker) -> #{typedef_name}:"
out.indent(2) do
decode_member typedef, out
out.puts "return cls(#{typedef_name_underscore})"
end
render_xdr_utils(out, typedef_name)
out.puts <<~HEREDOC
def __hash__(self):
return hash(self.#{typedef_name_underscore})
def __eq__(self, other: object):
if not isinstance(other, self.__class__):
return NotImplemented
return self.#{typedef_name_underscore} == other.#{typedef_name_underscore}
def __str__(self):
return f"<#{typedef_name} [#{typedef_name_underscore}={self.#{typedef_name_underscore}}]>"
HEREDOC
end
out.close
end
def render_import(out, member, container_name)
member_type = type_string member.type
unless is_base_type member.type or container_name == member_type
out.puts "from .#{member_type.underscore} import #{member_type}"
end
end
def render_union(union, render_import_in_func = false)
union_name = name union
union_name_underscore = union_name.underscore
@init_out.puts "from .#{union_name_underscore} import #{union_name}"
file_name = "#{union_name_underscore}.py"
out = @output.open(file_name)
render_common_import out
render_import out, union.discriminant, union_name
if render_import_in_func
out.puts "if TYPE_CHECKING:"
out.indent(2) do
union.arms.each do |arm|
next if arm.void?
# This may cause duplicate imports, we can remove it with autoflake
render_import out, arm.declaration, union_name
end
end
else
union.arms.each do |arm|
next if arm.void?
# This may cause duplicate imports, we can remove it with autoflake
render_import out, arm.declaration, union_name
end
end
out.puts "__all__ = ['#{union_name}']"
out.puts "class #{union_name}:"
out.indent(2) do
render_source_comment(out, union)
union_discriminant_name_underscore = union.discriminant.name.underscore
out.puts <<~HEREDOC
def __init__(
self,
#{union_discriminant_name_underscore}: #{type_hint_string union.discriminant, union_name},
HEREDOC
out.indent(2) do
union.arms.each do |arm|
next if arm.void?
out.puts "#{arm.name.underscore}: #{type_hint_string arm.declaration, union_name} = None,"
end
end
out.puts ") -> None:"
out.indent(2) do
union.arms.each do |arm|
next if arm.void?
render_array_length_checker arm, out
end
out.puts "self.#{union_discriminant_name_underscore} = #{union_discriminant_name_underscore}"
union.arms.each do |arm|
next if arm.void?
arm_name_underscore = arm.name.underscore
out.puts "self.#{arm_name_underscore} = #{arm_name_underscore}"
end
end
out.puts "def pack(self, packer: Packer) -> None:"
out.indent(2) do
out.puts "#{encode_type union.discriminant, 'self.' + union_discriminant_name_underscore}"
union.normal_arms.each do |arm|
arm.cases.each do |c|
if c.value.is_a?(AST::Identifier)
out.puts "if self.#{union_discriminant_name_underscore} == #{type_string union.discriminant.type}.#{c.value.name}:"
else
out.puts "if self.#{union_discriminant_name_underscore} == #{c.value.value}:"
end
out.indent(2) do
unless arm.void?
encode_member arm, out, true
end
out.puts "return"
end
end
end
if union.default_arm.present? and not union.default_arm.void?
encode_member union.default_arm, out, true
end
end
out.puts "@classmethod"
out.puts "def unpack(cls, unpacker: Unpacker) -> #{union_name}:"
out.indent(2) do
out.puts "#{union_discriminant_name_underscore} = #{decode_type union.discriminant}"
union.normal_arms.each do |arm|
arm.cases.each do |c|
if c.value.is_a?(AST::Identifier)
out.puts "if #{union_discriminant_name_underscore} == #{type_string union.discriminant.type}.#{c.value.name}:"
else
out.puts "if #{union_discriminant_name_underscore} == #{c.value.value}:"
end
out.indent(2) do
if arm.void?
out.puts "return cls(#{union_discriminant_name_underscore}=#{union_discriminant_name_underscore})"
else
if render_import_in_func
render_import out, arm.declaration, union_name
end
decode_member arm, out
arm_name_underscore = arm.name.underscore
out.puts "return cls(#{union_discriminant_name_underscore}=#{union_discriminant_name_underscore}, #{arm_name_underscore}=#{arm_name_underscore})"
end
end
end
end
if union.default_arm.present? and not union.default_arm.void?
decode_member union.default_arm, out
arm_name_underscore = union.default_arm.name.underscore
out.puts "return cls(#{union_discriminant_name_underscore}=#{union_discriminant_name_underscore}, #{arm_name_underscore}=#{arm_name_underscore})"
else
out.puts "return cls(#{union_discriminant_name_underscore}=#{union_discriminant_name_underscore})"
end
end
render_xdr_utils(out, union_name)
attribute_names = []
attribute_names.push(union_discriminant_name_underscore)
union.arms.each do |arm|
next if arm.void?
attribute_names.push(arm.name.underscore)
end
out.puts <<~HEREDOC
def __hash__(self):
return hash((#{attribute_names.map { |m| 'self.' + m }.join(", ")},))
def __eq__(self, other: object):
if not isinstance(other, self.__class__):
return NotImplemented
return #{attribute_names.map { |m| 'self.' + m + '== other.' + m }.join(" and ")}
HEREDOC
out.puts "def __str__(self):"
out.indent(2) do
out.puts "out = []"
out.puts "out.append(f'#{union_discriminant_name_underscore}={self.#{union_discriminant_name_underscore}}')"
union.arms.each do |arm|
next if arm.void?
arm_name_underscore = arm.name.underscore
out.puts "out.append(f'#{arm_name_underscore}={self.#{arm_name_underscore}}') if self.#{arm_name_underscore} is not None else None"
end
out.puts "return f\"<#{union_name} [{', '.join(out)}]>\""
end
end
out.close
end
def render_struct(struct)
struct_name = name struct
struct_name_underscore = struct_name.underscore
@init_out.puts "from .#{struct_name_underscore} import #{struct_name}"
file_name = "#{struct_name_underscore}.py"
out = @output.open(file_name)
render_common_import out
struct.members.each do |member|
# This may cause duplicate imports, we can remove it through autoflake
render_import out, member.declaration, struct_name
end
out.puts "__all__ = ['#{struct_name}']"
out.puts "class #{struct_name}:"
out.indent(2) do
render_source_comment(out, struct)
out.puts <<~HEREDOC
def __init__(
self,
HEREDOC
out.indent(2) do
struct.members.each do |member|
out.puts "#{member.name.underscore}: #{type_hint_string member.declaration, struct_name},"
end
end
out.puts ") -> None:"
out.indent(2) do
struct.members.each do |member|
render_array_length_checker member, out
end
struct.members.each do |member|
member_name_underscore = member.name.underscore
out.puts "self.#{member_name_underscore} = #{member_name_underscore}"
end
end
out.puts "def pack(self, packer: Packer) -> None:"
out.indent(2) do
struct.members.each do |member|
encode_member member, out
end
end
out.puts "@classmethod"
out.puts "def unpack(cls, unpacker: Unpacker) -> #{struct_name}:"
out.indent(2) do
struct.members.each do |member|
decode_member member, out
end
out.puts "return cls("
out.indent(2) do
struct.members.each do |member|
member_name_underscore = member.name.underscore
out.puts "#{member_name_underscore}=#{member_name_underscore},"
end
end
out.puts ")"
end
render_xdr_utils(out, struct_name)
attribute_names = []
struct.members.each do |member|
attribute_names.push(member.name.underscore)
end
out.puts <<~HEREDOC
def __hash__(self):
return hash((#{attribute_names.map { |m| 'self.' + m }.join(", ")},))
def __eq__(self, other: object):
if not isinstance(other, self.__class__):
return NotImplemented
return #{attribute_names.map { |m| 'self.' + m + '== other.' + m }.join(" and ")}
HEREDOC
out.puts "def __str__(self):"
out.indent(2) do
out.puts "out = ["
out.indent(2) do
attribute_names.each do |name|
name = name
out.puts "f'#{name}={self.#{name}}',"
end
end
out.puts "]"
out.puts "return f\"<#{struct_name} [{', '.join(out)}]>\""
end
end
out.close
end
def encode_member(member, out, is_union_member = false)
case member.declaration
when AST::Declarations::Void
out.puts "return"
end
member_name_underscore = member.name.underscore
if member.type.sub_type == :optional
out.puts <<~HEREDOC
if self.#{member_name_underscore} is None:
packer.pack_uint(0)
else:
packer.pack_uint(1)
HEREDOC
end
out.indent(member.type.sub_type == :optional ? 2 : 0) do
if is_union_member # All members of union are actually optional
out.puts <<~HEREDOC
if self.#{member_name_underscore} is None:
raise ValueError("#{member_name_underscore} should not be None.")
HEREDOC
end
case member.declaration
when AST::Declarations::Array
unless member.declaration.fixed?
out.puts "packer.pack_uint(len(self.#{member_name_underscore}))"
end
out.puts <<~HEREDOC
for #{member_name_underscore}_item in self.#{member_name_underscore}:
#{encode_type member.declaration, member_name_underscore + '_item'}
HEREDOC
else
out.puts encode_type member.declaration, 'self.' + member_name_underscore
end
end
end
def decode_member(member, out)
case member.declaration
when AST::Declarations::Void;
out.puts "return"
end
member_name_underscore = member.name.underscore
decoded_member_declaration = decode_type member.declaration
case member.declaration
when AST::Declarations::Array
if member.declaration.fixed?
_, size = member.declaration.type.array_size
out.puts "length = #{size}"
else
out.puts "length = unpacker.unpack_uint()"
end
out.puts <<-EOS.strip_heredoc
#{member_name_underscore} = []
for _ in range(length):
#{member_name_underscore}.append(#{decoded_member_declaration})
EOS
else
if member.type.sub_type == :optional
out.puts "#{member_name_underscore} = #{decoded_member_declaration} if unpacker.unpack_uint() else None"
else
out.puts "#{member_name_underscore} = #{decoded_member_declaration}"
end
end
end
def render_common_import(out)
out.puts <<-EOS.strip_heredoc
# This is an automatically generated file.
# DO NOT EDIT or your changes may be overwritten
from __future__ import annotations
import base64
from enum import IntEnum
from typing import List, Optional, TYPE_CHECKING
from xdrlib3 import Packer, Unpacker
from .base import Integer, UnsignedInteger, Float, Double, Hyper, UnsignedHyper, Boolean, String, Opaque
from .constants import *
EOS
out.break
end
def render_array_length_checker(member, out)
case member.declaration
when AST::Declarations::Array
_, size = member.declaration.type.array_size
member_name_underscore = member.name.underscore
if member.declaration.fixed?
out.puts <<~HEREDOC
_expect_length = #{size}
if #{member_name_underscore} and len(#{member_name_underscore}) != _expect_length:
raise ValueError(f\"The length of `#{member_name_underscore}` should be {_expect_length}, but got {len(#{member_name_underscore})}.\")
HEREDOC
else
out.puts <<~HEREDOC
_expect_max_length = #{size || MAX_SIZE}
if #{member_name_underscore} and len(#{member_name_underscore}) > _expect_max_length:
raise ValueError(f\"The maximum length of `#{member_name_underscore}` should be {_expect_max_length}, but got {len(#{member_name_underscore})}.\")
HEREDOC
end
end
end
def render_xdr_utils(out, name)
out.puts <<~HEREDOC
def to_xdr_bytes(self) -> bytes:
packer = Packer()
self.pack(packer)
return packer.get_buffer()
@classmethod
def from_xdr_bytes(cls, xdr: bytes) -> #{name}:
unpacker = Unpacker(xdr)
return cls.unpack(unpacker)
def to_xdr(self) -> str:
xdr_bytes = self.to_xdr_bytes()
return base64.b64encode(xdr_bytes).decode()
@classmethod
def from_xdr(cls, xdr: str) -> #{name}:
xdr_bytes = base64.b64decode(xdr.encode())
return cls.from_xdr_bytes(xdr_bytes)
HEREDOC
end
def render_base_classes
file_name = "base.py"
out = @output.open(file_name)
base_py_content = IO.read(__dir__ + "/python/base.py")
out.puts base_py_content
out.close
end
def encode_type(decl, value)
case decl.type
when AST::Typespecs::Int;
"Integer(#{value}).pack(packer)"
when AST::Typespecs::UnsignedInt;
"UnsignedInteger(#{value}).pack(packer)"
when AST::Typespecs::Hyper;
"Hyper(#{value}).pack(packer)"
when AST::Typespecs::UnsignedHyper;
"UnsignedHyper(#{value}).pack(packer)"
when AST::Typespecs::Float;
"Float(#{value}).pack(packer)"
when AST::Typespecs::Double;
"Double(#{value}).pack(packer)"
when AST::Typespecs::Quadruple;
raise "cannot render quadruple in Python"
when AST::Typespecs::Bool;
"Boolean(#{value}).pack(packer)"
when AST::Typespecs::Opaque;
"Opaque(#{value}, #{decl.size || MAX_SIZE}, #{decl.fixed? ? "True" : "False"}).pack(packer)"
when AST::Typespecs::String;
"String(#{value}, #{decl.size || MAX_SIZE}).pack(packer)"
else
"#{value}.pack(packer)"
end
end
def decode_type(decl)
case decl.type
when AST::Typespecs::Int
"Integer.unpack(unpacker)"
when AST::Typespecs::UnsignedInt
"UnsignedInteger.unpack(unpacker)"
when AST::Typespecs::Hyper
"Hyper.unpack(unpacker)"
when AST::Typespecs::UnsignedHyper
"UnsignedHyper.unpack(unpacker)"
when AST::Typespecs::Float
"Float.unpack(unpacker)"
when AST::Typespecs::Double
"Double.unpack(unpacker)"
when AST::Typespecs::Quadruple
raise "cannot render quadruple in Python"
when AST::Typespecs::Bool
"Boolean.unpack(unpacker)"
when AST::Typespecs::Opaque
"Opaque.unpack(unpacker, #{decl.size || MAX_SIZE}, #{decl.fixed? ? "True" : "False"})"
when AST::Typespecs::String
"String.unpack(unpacker)"
when AST::Typespecs::Simple
"#{name decl.type.resolved_type}.unpack(unpacker)"
when AST::Concerns::NestedDefinition
"#{name decl.type}.unpack(unpacker)"
else
raise "Unknown typespec: #{decl.type.class.name}"
end
end
def render_source_comment(out, defn)
return if defn.is_a?(AST::Definitions::Namespace)
out.puts <<-EOS.strip_heredoc
"""
XDR Source Code::
EOS
out.indent(2) do
out.puts defn.text_value
end
out.puts '"""'
end
def render_const_source_comment(out, defn)
return if defn.is_a?(AST::Definitions::Namespace)
out.puts "#: #{defn.text_value}"
end
def type_hint_string(decl, container_name)
type_hint = type_string decl.type
if type_hint == container_name
type_hint = "\"#{type_hint}\""
end
case decl.type.sub_type
when :optional
"Optional[#{type_hint}]"
when :var_array, :array
"List[#{type_hint}]"
else
type_hint
end
end
def is_base_type(type)
case type
when AST::Typespecs::Bool,
AST::Typespecs::Double,
AST::Typespecs::Float,
AST::Typespecs::Hyper,
AST::Typespecs::Int,
AST::Typespecs::Opaque,
AST::Typespecs::String,
AST::Typespecs::UnsignedHyper,
AST::Typespecs::UnsignedInt
true
else
false
end
end
def type_string(type)
case type
when AST::Typespecs::Bool
"bool"
when AST::Typespecs::Double
"float"
when AST::Typespecs::Float
"float"
when AST::Typespecs::Hyper
"int"
when AST::Typespecs::Int
"int"
when AST::Typespecs::Opaque
"bytes"
when AST::Typespecs::Quadruple
raise "no quadruple support for Python"
when AST::Typespecs::String
"bytes"
when AST::Typespecs::UnsignedHyper
"int"
when AST::Typespecs::UnsignedInt
"int"
when AST::Typespecs::Simple
name type
when AST::Definitions::Base
name type
when AST::Concerns::NestedDefinition
name type
else
raise "Unknown reference type: #{type.class.name}, #{type.class.ancestors}"
end
end
def name(named)
parent = name named.parent_defn if named.is_a?(AST::Concerns::NestedDefinition)
result = named.name.camelize
"#{parent}#{result}"
end
end
end
end