#!/usr/bin/env python
"""
Python format parser
"""
import collections
import copy
import glob
import logging
import os
import re
import six
import sys
import traceback
import py010parser
import py010parser.c_parser
from py010parser import c_ast as AST
import pfp
import pfp.bitwrap as bitwrap
import pfp.errors as errors
import pfp.fields as fields
import pfp.functions as functions
import pfp.native as native
import pfp.utils as utils
logging.basicConfig(level=logging.CRITICAL)
class Decls(object):
def __init__(self, decls, coord):
self.decls = decls
self.coord = coord
class UnionDecls(Decls):
pass
class StructDecls(Decls):
pass
def StructDeclWithParams(scope, struct_cls, struct_args):
def _pfp__init(self, stream):
for param in self._pfp__node.args.params:
param.is_func_param = True
params = self._pfp__interp._handle_node(
self._pfp__node.args, scope, self, None
)
param_list = params.instantiate(scope, struct_args, self._pfp__interp)
if hasattr(super(self.__class__, self), "_pfp__init"):
super(self.__class__, self)._pfp__init(stream)
new_class = type(
struct_cls.__name__ + "_", (struct_cls,), {"_pfp__init": _pfp__init}
)
return new_class
[docs]def StructUnionTypeRef(curr_scope, typedef_name, refd_name, interp, node):
"""Create a typedef that resolves itself dynamically. This is needed in
situations like:
.. code-block:: c
struct MY_STRUCT {
char magic[4];
unsigned int filesize;
};
typedef struct MY_STRUCT ME;
LittleEndian();
ME s;
The typedef ``ME`` is handled before the ``MY_STRUCT`` declaration actually
occurs. The typedef value for ``ME`` should not the empty struct that is
resolved, but should be a dynamically-looked up struct definition when
a ``ME`` instance is actually declared.
"""
if isinstance(node, AST.Struct):
cls = fields.Struct
elif isinstance(node, AST.Union):
cls = fields.Union
def __new__(cls_, *args, **kwargs):
refd_type = curr_scope.get_type(refd_name)
if refd_type is None:
refd_node = node
else:
refd_node = refd_type._pfp__node
def merged_init(self, stream):
if six.PY3:
cls_._pfp__init(self, stream)
else:
cls_._pfp__init.__func__(self, stream)
self._pfp__init_orig(stream)
overrides = {}
if hasattr(cls_, "_pfp__init"):
overrides["_pfp__init"] = merged_init
res = base_cls = StructUnionDef(
typedef_name, interp, refd_node, overrides=overrides,
)
return res(*args, **kwargs)
new_class = type(
typedef_name,
(cls,),
{
"__new__": __new__,
},
)
return new_class
def StructUnionDef(typedef_name, interp, node, overrides=None, cls=None):
if overrides is None:
overrides = {}
if isinstance(node, AST.Struct):
if cls is None:
cls = fields.Struct
decls = StructDecls(node.decls, node.coord)
elif isinstance(node, AST.Union):
if cls is None:
cls = fields.Union
decls = UnionDecls(node.decls, node.coord)
# this is so that we can have all nested structs added to
# the root DOM, even if there's an error in parsing the data.
# If we didn't do this, any errors parsing the data would cause
# the new struct to not be added to its parent, and the user would
# not be able to see how far the script got
def __init__(self, stream=None, metadata_processor=None, do_init=True):
cls.__init__(
self,
stream,
metadata_processor=metadata_processor,
)
if do_init:
self._pfp__init(stream)
def _pfp__init(self, stream):
self._pfp__interp._handle_node(decls, ctxt=self, stream=stream)
cls_members = {
"__init__": __init__,
"_pfp__init": _pfp__init,
"_pfp__node": node,
"_pfp__interp": interp,
}
for k, v in six.iteritems(overrides or {}):
if k in cls_members:
cls_members[k + "_orig"] = cls_members[k]
cls_members[k] = v
new_class = type(
typedef_name,
(cls,),
cls_members,
)
return new_class
def EnumDef(typedef_name, base_cls, enum_vals):
new_class = type(
typedef_name,
(fields.Enum,),
{
"signed": base_cls.signed,
"width": base_cls.width,
"endian": base_cls.endian,
"format": base_cls.format,
"enum_vals": enum_vals,
"enum_cls": base_cls,
},
)
return new_class
def ArrayDecl(item_cls, item_count):
width = fields.PYVAL(item_count)
def __init__(self, stream=None, metadata_processor=None):
fields.Array.__init__(
self,
self.width,
self.field_cls,
stream,
metadata_processor=metadata_processor,
)
new_class = type(
"Array_{}_{}".format(item_cls.__name__, width),
(fields.Array,),
{"__init__": __init__, "width": width, "field_cls": item_cls},
)
return new_class
[docs]def LazyField(lookup_name, scope):
"""Super non-standard stuff here. Dynamically changing the base
class using the scope and the lazy name when the class is
instantiated. This works as long as the original base class is
not directly inheriting from object (which we're not, since
our original base class is fields.Field).
"""
def __init__(self, stream=None):
base_cls = self._pfp__scope.get_id(self._pfp__lazy_name)
self.__class__.__bases__ = (base_cls,)
base_cls.__init__(self, stream)
new_class = type(
lookup_name + "_lazy",
(fields.Field,),
{
"__init__": __init__,
"_pfp__scope": scope,
"_pfp__lazy_name": lookup_name,
},
)
return new_class
# class StructUnionDef(object):
#
# """A class used to instantiate structs/unions as
# needed (used for typedefs)"""
#
# def __init__(self, interp, node):
# """Save the interpreter and the node so that when
# this instance is called (will act like instantiation),
# the interpreter is just told to handle the node
#
# :interp: The interpreter
# :node: The node to interpret upon instantiation
# :stream: The stream that data will be parsed from
# """
# self._interp = interp
# self._node = node
# self._typedef_name = node._pfp__typedef_name
#
# def __call__(self, stream=None):
# """Create an instance of the struct/union
#
# :stream: The stream that data will be parsed from
# :returns: A struct or union instance
# """
# # TODO stream should be optional to act like other fields classes
# res = self._interp._handle_node(self._node, stream=stream)
# res._pfp__typedef_name = self._typedef_name
# # UGH TODO HACK HACK HACK!!! stupid
# res._pfp__class = self
# return res
class DebugLogger(object):
def __init__(self, active=False):
self._log = logging.getLogger("")
self._indent = 0
self._active = active
if self._active:
self._log.setLevel(logging.DEBUG)
def debug(self, prefix, msg, indent_change=0, filename=None, coord=None):
if not self._active:
return
self._indent += indent_change
if coord is not None and filename:
prefix += ":{}:{}".format(filename, coord.line)
self._log.debug(
"\n".join(
prefix + ": " + " " * self._indent + line
for line in msg.split("\n")
)
)
def inc(self):
self._indent += 1
def dec(self):
self._indent -= 1
class NullStream(object):
def __init__(self):
self._pos = 0
def read(self, num):
return utils.binary("\x00" * num)
def write(self, data):
pass
def close(self):
pass
def seek(self, pos, seek_type=0):
if seek_type == 0:
self._pos = pos
elif seek_type == 1:
self._pos += pos
elif seek_type == 2:
# we never use this anyways
pass
def tell(self):
return self._pos
[docs]class PfpTypes(object):
"""A class to hold all typedefd types in a template. Note that
types are instantiated by having them parse a null-stream. This
means that type creation will not work correctly for complicated
structs that have internal control-flow"""
_interp = None
_scope = None
_types_map = None
_null_stream = None
def __init__(self, interp, scope):
"""Init the ``PfpTypes`` class
:param pfp.interp.PfpInterp interp: The pfp interpreter
:param pfp.interp.Scope scope: The scope to pull all the types from
"""
self._interp = interp
self._scope = scope
self._null_stream = bitwrap.BitwrappedStream(NullStream())
self._types_map = {}
for scope_ctxt in self._scope._scope_stack:
for type_name, type_cls in six.iteritems(scope_ctxt["types"]):
if isinstance(type_cls, list):
type_cls = self._interp._resolve_to_field_class(
type_cls, self._scope
)
self._types_map[type_name] = type_cls
def _wrap_type_instantiation(self, type_cls):
"""Wrap the creation of the type so that we can provide
a null-stream to initialize it"""
def wrapper(*args, **kwargs):
# use args for struct arguments??
return type_cls(stream=self._null_stream)
return wrapper
def __getattr__(self, attr_name):
if attr_name in self._types_map:
return self._wrap_type_instantiation(self._types_map[attr_name])
else:
# let this raise any errors
return super(self.__class__, self).__getattribute__(attr_name)
def __getitem__(self, attr_name):
if attr_name in self._types_map:
return self._wrap_type_instantiation(self._types_map[attr_name])
else:
raise KeyError(attr_name)
[docs]class Scope(object):
"""A class to keep track of the current scope of the interpreter"""
def __init__(self, logger, parent=None):
super(Scope, self).__init__()
self._log = logger
self._parent = parent
self._scope_stack = []
self.push()
[docs] def level(self):
"""Return the current scope level
"""
res = len(self._scope_stack)
if self._parent is not None:
res += self._parent.level()
return res
[docs] def push(self, new_scope=None):
"""Create a new scope
:returns: TODO
"""
if new_scope is None:
new_scope = {"types": {}, "vars": {}, "meta": {}}
self._curr_scope = new_scope
self._dlog("pushing new scope, scope level = {}".format(self.level()))
self._scope_stack.append(self._curr_scope)
[docs] def clone(self):
"""Return a new Scope object that has the curr_scope
pinned at the current one
:returns: A new scope object
"""
self._dlog("cloning the stack")
# TODO is this really necessary to create a brand new one?
# I think it is... need to think about it more.
# or... are we going to need ref counters and a global
# scope object that allows a view into (or a snapshot of)
# a specific scope stack?
res = Scope(self._log)
res._scope_stack = self._scope_stack
res._curr_scope = self._curr_scope
return res
[docs] def pop(self):
"""Leave the current scope
:returns: TODO
"""
res = self._scope_stack.pop()
self._dlog("popping scope, scope level = {}".format(self.level()))
self._curr_scope = self._scope_stack[-1]
return res
[docs] def add_var(self, field_name, field, root=False):
"""Add a var to the current scope (vars are fields that
parse the input stream)
:field_name: TODO
:field: TODO
:returns: TODO
"""
self._dlog("adding var '{}' (root={})".format(field_name, root))
# do both so it's not clobbered by intermediate values of the same name
if root:
self._scope_stack[0]["vars"][field_name] = field
# TODO do we allow clobbering of vars???
self._curr_scope["vars"][field_name] = field
[docs] def get_var(self, name, recurse=True):
"""Return the first var of name ``name`` in the current
scope stack (remember, vars are the ones that parse the
input stream)
:name: The name of the id
:recurse: Whether parent scopes should also be searched (defaults to True)
:returns: TODO
"""
self._dlog("getting var '{}'".format(name))
return self._search("vars", name, recurse)
[docs] def add_local(self, field_name, field):
"""Add a local variable in the current scope
:field_name: The field's name
:field: The field
:returns: None
"""
self._dlog("adding local '{}'".format(field_name))
field._pfp__name = field_name
# TODO do we allow clobbering of locals???
self._curr_scope["vars"][field_name] = field
[docs] def get_local(self, name, recurse=True):
"""Get the local field (search for it) from the scope stack. An alias
for ``get_var``
:name: The name of the local field
"""
self._dlog("getting local '{}'".format(name))
return self._search("vars", name, recurse)
[docs] def add_type_class(self, name, cls):
"""Store the class with the name
"""
self._curr_scope["types"][name] = cls
[docs] def add_refd_struct_or_union(self, name, refd_name, interp, node):
"""Add a lazily-looked up typedef struct or union
:name: name of the typedefd struct/union
:node: the typedef node
:interp: the 010 interpreter
"""
self.add_type_class(name, StructUnionTypeRef(self, name, refd_name, interp, node))
[docs] def add_type_struct_or_union(self, name, interp, node):
"""Store the node with the name. When it is instantiated,
the node itself will be handled.
:name: name of the typedefd struct/union
:node: the union/struct node
:interp: the 010 interpreter
"""
self.add_type_class(name, StructUnionDef(name, interp, node))
[docs] def add_type(self, new_name, orig_names):
"""Record the typedefd name for orig_names. Resolve orig_names
to their core names and save those.
:new_name: TODO
:orig_names: TODO
:returns: TODO
"""
self._dlog("adding a type '{}'".format(new_name))
# TODO do we allow clobbering of types???
res = copy.copy(orig_names)
resolved_names = self._resolve_name(res[-1])
if resolved_names is not None:
res.pop()
res += resolved_names
self._curr_scope["types"][new_name] = res
[docs] def get_type(self, name, recurse=True):
"""Get the names for the typename (created by typedef)
:name: The typedef'd name to resolve
:returns: An array of resolved names associated with the typedef'd name
"""
self._dlog("getting type '{}'".format(name))
return self._search("types", name, recurse)
[docs] def get_id(self, name, recurse=True):
"""Get the first id matching ``name``. Will either be a local
or a var.
:name: TODO
:returns: TODO
"""
self._dlog("getting id '{}'".format(name))
var = self._search("vars", name, recurse)
return var
# ------------------
# PRIVATE
# ------------------
def _dlog(self, msg):
self._log.debug(" scope({:08x})".format(id(self)), msg)
def _resolve_name(self, name):
"""TODO: Docstring for _resolve_names.
:name: TODO
:returns: TODO
"""
res = [name]
while True:
orig_names = self._search("types", name)
if orig_names is not None:
name = orig_names[-1]
# pop off the typedefd name
res.pop()
# add back on the original names
res += orig_names
else:
break
return res
def _search(self, category, name, recurse=True):
"""Search the scope stack for the name in the specified
category (types/locals/vars).
:category: the category to search in (locals/types/vars)
:name: name to search for
:returns: None if not found, the result of the found local/type/id
"""
idx = len(self._scope_stack) - 1
curr = self._curr_scope
for scope in reversed(self._scope_stack):
res = scope[category].get(name, None)
if res is not None:
return res
if recurse and self._parent is not None:
return self._parent._search(category, name, recurse)
return None
# def __getattr__
# def __setattr__
[docs]class PfpInterp(object):
"""
"""
BITFIELD_DIR_LEFT_RIGHT = -1
BITFIELD_DIR_DEFAULT = 0
BITFIELD_DIR_RIGHT_LEFT = 1
# do not break (execute until finished)
BREAK_NONE = 0
# break on the next instruction on the same level
BREAK_OVER = 1
# break on the next instruction regardless of level
BREAK_INTO = 2
_natives = {}
_predefines = []
[docs] @classmethod
def add_native(cls, name, func, ret, interp=None, send_interp=False):
"""Add the native python function ``func`` into the pfp interpreter with the
name ``name`` and return value ``ret`` so that it can be called from
within a template script.
.. note::
The :any:`@native <pfp.native.native>` decorator exists to simplify this.
All native functions must have the signature ``def func(params, ctxt, scope, stream, coord [,interp])``,
optionally allowing an interpreter param if ``send_interp`` is ``True``.
Example:
The example below defines a function ``Sum`` using the ``add_native`` method. ::
import pfp.fields
from pfp.fields import PYVAL
def native_sum(params, ctxt, scope, stream, coord):
return PYVAL(params[0]) + PYVAL(params[1])
pfp.interp.PfpInterp.add_native("Sum", native_sum, pfp.fields.Int64)
:param basestring name: The name the function will be exposed as in the interpreter.
:param function func: The native python function that will be referenced.
:param type(pfp.fields.Field) ret: The field class that the return value should be cast to.
:param pfp.interp.PfpInterp interp: The specific pfp interpreter the function should be defined in.
:param bool send_interp: If true, the current pfp interpreter will be added as an argument to the function.
"""
if interp is None:
natives = cls._natives
else:
# the instance's natives
natives = interp._natives
natives[name] = functions.NativeFunction(name, func, ret, send_interp)
[docs] @classmethod
def add_predefine(cls, template):
"""Add a template that should be run prior to running any other templates.
This is useful for predefining types, etc.
:param basestring template: The template text (unicode is also fine here)
"""
cls._predefines.append(template)
[docs] @classmethod
def define_natives(cls):
"""Define the native functions for PFP
"""
if len(cls._natives) > 0:
return
glob_pattern = os.path.join(
os.path.dirname(__file__), "native", "*.py"
)
for filename in glob.glob(glob_pattern):
basename = os.path.basename(filename).replace(".py", "")
if basename == "__init__":
continue
try:
mod_base = __import__(
"pfp.native", globals(), locals(), fromlist=[basename]
)
except Exception as e:
sys.stderr.write(
"cannot import native module {} at '{}'".format(
basename, filename
)
)
raise e
continue
mod = getattr(mod_base, basename)
setattr(mod, "PYVAL", fields.get_value)
setattr(mod, "PYSTR", fields.get_str)
def __init__(self, debug=False, parser=None, int3=True):
"""Create a new instance of the ``PfpInterp`` class.
:param bool debug: if debug output should be used (default=``False``)
:param :any:`py010parser.c_parser.CParser` parser: The ``py010parser.c_parser.CParser`` to use (default=``None``)
:param bool int3: If debug breakpoints (calls to :any:`pfp.native.dbg.int3` ``Int3()``) are active (default=``True``)
"""
self.__class__.define_natives()
self._log = DebugLogger(debug)
# TODO nested debuggers aren't currently allowed
self._debugger = None
# why is this here?? this isn't used at all
self._debug = debug
self._printf = True
self._break_type = self.BREAK_NONE
self._break_level = 0
self._no_debug = False
self._padded_bitfield = True
# TODO does this default change based on the endianness?
self._bitfield_direction = self.BITFIELD_DIR_DEFAULT
# whether or not debugging is allowed (ie Int3())
self._int3 = int3
self._ast_frozen = False
self._ctxt = None
self._scope = None
self._coord = None
self._orig_filename = None
if parser is None:
parser = py010parser.c_parser.CParser()
# this speeds things up a bit
self._parser = parser
self._node_switch = {
AST.FileAST: self._handle_file_ast,
AST.Decl: self._handle_decl,
AST.TypeDecl: self._handle_type_decl,
AST.ByRefDecl: self._handle_byref_decl,
AST.Struct: self._handle_struct,
AST.Union: self._handle_union,
AST.StructRef: self._handle_struct_ref,
AST.IdentifierType: self._handle_identifier_type,
AST.Typedef: self._handle_typedef,
AST.Constant: self._handle_constant,
AST.BinaryOp: self._handle_binary_op,
AST.Assignment: self._handle_assignment,
AST.ID: self._handle_id,
AST.UnaryOp: self._handle_unary_op,
AST.FuncDef: self._handle_func_def,
AST.FuncCall: self._handle_func_call,
AST.FuncDecl: self._handle_func_decl,
AST.ParamList: self._handle_param_list,
AST.ExprList: self._handle_expr_list,
AST.Compound: self._handle_compound,
AST.Return: self._handle_return,
AST.ArrayDecl: self._handle_array_decl,
AST.InitList: self._handle_init_list,
AST.If: self._handle_if,
AST.For: self._handle_for,
AST.While: self._handle_while,
AST.DeclList: self._handle_decl_list,
AST.Break: self._handle_break,
AST.Continue: self._handle_continue,
AST.ArrayRef: self._handle_array_ref,
AST.Enum: self._handle_enum,
AST.Switch: self._handle_switch,
AST.Cast: self._handle_cast,
AST.Typename: self._handle_typename,
AST.EmptyStatement: self._handle_empty_statement,
AST.DoWhile: self._handle_do_while,
AST.StructCallTypeDecl: self._handle_struct_call_type_decl,
AST.TernaryOp: self._handle_if,
StructDecls: self._handle_struct_decls,
UnionDecls: self._handle_union_decls,
}
def _dlog(self, msg, indent_increase=0):
"""log the message to the log"""
self._log.debug(
"interp",
msg,
indent_increase,
filename=self._orig_filename,
coord=self._coord,
)
# --------------------
# PUBLIC
# --------------------
[docs] def load_template(self, template):
"""Load a template and all required predefines into this interpreter.
Future calls to ``parse`` will not require the template to be parsed.
"""
self._template = template
self._template_lines = self._template.split("\n")
self._ast = self._parse_string(template, predefines=True)
self._dlog("parsed template into ast")
self._ast_frozen = True
[docs] def parse(
self,
stream,
template=None,
predefines=True,
orig_filename=None,
keep_successful=False,
printf=True,
):
"""Parse the data stream using the template (e.g. parse the 010 template
and interpret the template using the stream as the data source).
:stream: The input data stream
:template: The template to parse the stream with
:keep_successful: Return whatever was successfully parsed before an error. ``_pfp__error`` will contain the exception (if one was raised)
:param bool printf: If ``False``, printfs will be noops (default=``True``)
:returns: Pfp Dom
"""
self._dlog("parsing")
if not isinstance(stream, bitwrap.BitwrappedStream):
stream = bitwrap.BitwrappedStream(stream)
if template is None and not self._ast_frozen:
raise errors.InterpError("A template must be provided")
self._printf = printf
self._orig_filename = orig_filename
self._stream = stream
if not self._ast_frozen:
self._template = template
self._template_lines = self._template.split("\n")
self._ast = self._parse_string(template, predefines)
self._dlog("parsed template into ast")
res = self._run(keep_successful)
res._pfp__finalize()
return res
[docs] def step_over(self):
"""Perform one step of the interpreter
"""
self.set_break(self.BREAK_OVER)
[docs] def step_into(self):
"""Step over/into the next statement
"""
self.set_break(self.BREAK_INTO)
[docs] def cont(self):
"""Continue the interpreter
"""
self.set_break(self.BREAK_NONE)
[docs] def eval(self, statement, ctxt=None):
"""Eval a single statement (something returnable)
"""
self._no_debug = True
statement = statement.strip()
if not statement.endswith(";"):
statement += ";"
ast = self._parse_string(statement, predefines=False)
self._dlog("evaluating statement: {}".format(statement))
try:
res = None
for child in ast.children():
res = self._handle_node(
child, self._scope, self._ctxt, self._stream,
)
return res
except errors.InterpReturn as e:
return e.value
finally:
self._no_debug = False
[docs] def set_break(self, break_type):
"""Set if the interpreter should break.
:returns: TODO
"""
self._break_type = break_type
self._break_level = self._scope.level()
[docs] def get_curr_lines(self):
"""Return the current line number in the template,
as well as the surrounding source lines
"""
start = max(0, self._coord.line - 5)
end = min(len(self._template_lines), self._coord.line + 4)
lines = [
(x, self._template_lines[x])
for x in six.moves.range(start, end, 1)
]
return self._coord.line, lines
[docs] def set_bitfield_padded(self, val):
"""Set if the bitfield input/output stream should be padded
:val: True/False
:returns: None
"""
self._padded_bitfield = val
self._stream.padded = val
self._ctxt._pfp__padded_bitfield = val
[docs] def set_bitfield_direction(self, val):
"""Set the bitfields to parse from left to right (1), the default (None), or right to left (-1)
"""
self._bitfield_direction = val
[docs] def get_bitfield_padded(self):
"""Return if the bitfield input/output stream should be padded
:returns: True/False
"""
return self._padded_bitfield
[docs] def get_bitfield_direction(self):
"""Return if the bitfield direction
.. note:: This should be applied AFTER taking into account endianness.
"""
return self._bitfield_direction
[docs] def get_filename(self):
"""Return the filename of the data that is currently being
parsed
:returns: The name of the data file being parsed.
"""
return self._orig_filename
[docs] def get_types(self):
"""Return a types object that will contain all of the typedefd structs'
classes.
:returns: Types object
Example:
Create a new PNG_CHUNK object from a PNG_CHUNK type that was defined
in a template: ::
types = interp.get_types()
chunk = types.PNG_CHUNK()
"""
return PfpTypes(self, self._scope)
# --------------------
# PRIVATE
# --------------------
def _parse_string(self, string, predefines=True):
exts = []
if predefines:
for idx, predefine in enumerate(self._predefines):
try:
ast = py010parser.parse_string(
predefine,
parser=self._parser,
# clear out the scopes for the first one
# that we run
keep_scopes=(idx != 0),
)
exts += ast.ext
except:
pass
res = py010parser.parse_string(
string,
parser=self._parser,
# only keep the scopes if we ran the predefines
keep_scopes=predefines,
)
res.ext = exts + res.ext
return res
def _run(self, keep_successfull):
"""Interpret the parsed 010 AST
:returns: PfpDom
"""
# example self._ast.show():
# FileAST:
# Decl: data, [], [], []
# TypeDecl: data, []
# Struct: DATA
# Decl: a, [], [], []
# TypeDecl: a, []
# IdentifierType: ['char']
# Decl: b, [], [], []
# TypeDecl: b, []
# IdentifierType: ['char']
# Decl: c, [], [], []
# TypeDecl: c, []
# IdentifierType: ['char']
# Decl: d, [], [], []
# TypeDecl: d, []
# IdentifierType: ['char']
self._dlog("interpreting template")
try:
# it is important to pass the stream in as the stream
# may change (e.g. compressed data)
res = self._handle_node(self._ast, None, None, self._stream)
except errors.InterpReturn as e:
# TODO handle exit/return codes (e.g. return -1)
res = self._root
except errors.InterpExit as e:
res = self._root
except Exception as e:
if keep_successfull:
# return the root and set _pfp__error
res = self._root
res._pfp__error = e
else:
exc_type, exc_obj, traceback = sys.exc_info()
more_info = "\nException at {}:{}".format(
self._orig_filename, self._coord.line
)
six.reraise(
errors.PfpError,
errors.PfpError(
exc_obj.__class__.__name__
+ ": "
+ exc_obj.args[0]
+ more_info
if len(exc_obj.args) > 0
else more_info
),
traceback,
)
# final drop-in after everything has executed
if self._break_type != self.BREAK_NONE:
self.debugger.cmdloop("execution finished")
types = self.get_types()
res._pfp__types = types
return res
def _handle_node(self, node, scope=None, ctxt=None, stream=None):
"""Recursively handle nodes in the 010 AST
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
if scope is None:
if self._scope is None:
self._scope = scope = self._create_scope()
else:
scope = self._scope
if ctxt is None and self._ctxt is not None:
ctxt = self._ctxt
else:
self._ctxt = ctxt
if type(node) is tuple:
node = node[1]
# TODO probably a better way to do this...
# this occurs with if-statements that have a single statement
# instead of a compound statement (no curly braces)
elif type(node) is list and len(
list(filter(lambda x: isinstance(x, AST.Node), node))
) == len(node):
node = AST.Compound(block_items=node, coord=node[0].coord)
return self._handle_node(node, scope, ctxt, stream)
# need to check this so that debugger-eval'd statements
# don't mess with the current state
if not self._no_debug:
self._coord = node.coord
self._dlog(
"handling node type {}, line {}".format(
node.__class__.__name__,
node.coord.line if node.coord is not None else "?",
)
)
self._log.inc()
breakable = self._node_is_breakable(node)
if (
breakable
and not self._no_debug
and self._break_type != self.BREAK_NONE
):
# always break
if self._break_type == self.BREAK_INTO:
self._break_level = self._scope.level()
self.debugger.cmdloop()
# level <= _break_level
elif self._break_type == self.BREAK_OVER:
if self._scope.level() <= self._break_level:
self._break_level = self._scope.level()
self.debugger.cmdloop()
else:
pass
if node.__class__ not in self._node_switch:
raise errors.UnsupportedASTNode(
node.coord, node.__class__.__name__
)
res = self._node_switch[node.__class__](node, scope, ctxt, stream)
self._log.dec()
return res
def _handle_file_ast(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_file_ast.
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._root = ctxt = fields.Dom(stream)
ctxt._pfp__scope = scope
self._root._pfp__name = "__root"
self._root._pfp__interp = self
self._dlog(
"handling file AST with {} children".format(len(node.children()))
)
children = list(node.children())
# one pass to define all functions. Functions may only live at the
# top-level (functions may not be nested or contained within structs,
# if/else statements, or other code block types). aka hoisting
for child in children:
if type(child) is tuple:
child = child[1]
if not isinstance(child, (AST.FuncDef, AST.Typedef)) \
and not is_forward_declared_struct(child):
continue
self._handle_node(child, scope, ctxt, stream)
scope.clear_meta()
for child in children:
if type(child) is tuple:
child = child[1]
if isinstance(child, (AST.FuncDef, AST.Typedef)) or \
is_forward_declared_struct(child):
continue
self._handle_node(child, scope, ctxt, stream)
ctxt._pfp__process_fields_metadata()
return ctxt
def _handle_empty_statement(self, node, scope, ctxt, stream):
"""Handle empty statements
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling empty statement")
def _handle_cast(self, node, scope, ctxt, stream):
"""Handle cast nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling cast")
to_type = self._handle_node(node.to_type, scope, ctxt, stream)
scope.push_meta("dest_type", to_type)
val_to_cast = self._handle_node(node.expr, scope, ctxt, stream)
scope.pop_meta("dest_type")
res = to_type()
res._pfp__set_value(val_to_cast)
return res
def _handle_typename(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_typename
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling typename")
return self._handle_node(node.type, scope, ctxt, stream)
def _get_node_name(self, node):
"""Get the name of the node - check for node.name and
node.type.declname. Not sure why the second one occurs
exactly - it happens with declaring a new struct field
with parameters"""
res = getattr(node, "name", None)
if res is None:
return res
if isinstance(res, AST.TypeDecl):
return res.declname
return res
def _handle_decl(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_decl.
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling decl")
metadata_processor = None
if node.metadata is not None:
# metadata_info = self._handle_metadata(node, scope, ctxt, stream)
def process_metadata():
metadata_info = self._handle_metadata(
node, scope, ctxt, stream
)
return metadata_info
metadata_processor = process_metadata
field_name = self._get_node_name(node)
field = self._handle_node(node.type, scope, ctxt, stream)
bitsize = None
bitfield_rw = None
if getattr(node, "bitsize", None) is not None:
bitsize = self._handle_node(node.bitsize, scope, ctxt, stream)
has_prev = len(ctxt._pfp__children) > 0
bitfield_rw = None
if has_prev:
prev = ctxt._pfp__children[-1]
# if it was a bitfield as well
# TODO I don't think this will handle multiple bitfield groups in a row.
# E.g.
# char a: 8, b:8;
# char c: 8, d:8;
if (
isinstance(prev, fields.NumberBase)
and
(
(
self._padded_bitfield
and prev.__class__.width == field.width
)
or not self._padded_bitfield
)
and prev.bitsize is not None
and prev.bitfield_rw.reserve_bits(bitsize, stream)
):
bitfield_rw = prev.bitfield_rw
# either because there was no previous bitfield, or the previous was full
if bitfield_rw is None:
bitfield_rw = fields.BitfieldRW(self, field)
bitfield_rw.reserve_bits(bitsize, stream)
if is_forward_declared_struct(node):
scope.add_type_class(node.type.name, field)
elif getattr(node, "is_func_param", False):
# we want to keep this as a class and not instantiate it
# instantiation will be done in functions.ParamListDef.instantiate
field = (field_name, field)
# locals and consts still get a field instance, but DON'T parse the
# stream!
elif "local" in node.quals or "const" in node.quals:
is_struct = issubclass(field, fields.Struct)
if not isinstance(field, fields.Field) and not is_struct:
field = field()
scope.add_local(field_name, field)
# this should only be able to be done with locals, right?
# if not, move it to the bottom of the function
if node.init is not None:
val = self._handle_node(node.init, scope, ctxt, stream)
if is_struct:
field = val
scope.add_local(field_name, field)
else:
field._pfp__set_value(val)
if "const" in node.quals:
field._pfp__freeze()
field._pfp__interp = self
elif isinstance(field, functions.Function):
# eh, just add it as a local...
# maybe the whole local/vars thinking needs to change...
# and we should only have ONE map TODO
field.name = field_name
scope.add_local(field_name, field)
elif field_name is not None:
added_child = False
# by this point, structs are already instantiated (they need to be
# in order to set the new context)
if not isinstance(field, fields.Field):
if issubclass(field, fields.NumberBase):
# use the default bitfield direction
if self._bitfield_direction is self.BITFIELD_DIR_DEFAULT:
bitfield_left_right = (
True
if field.endian == fields.BIG_ENDIAN
else False
)
else:
bitfield_left_right = (
self._bitfield_direction
is self.BITFIELD_DIR_LEFT_RIGHT
)
field = field(
stream,
bitsize=bitsize,
metadata_processor=metadata_processor,
bitfield_rw=bitfield_rw,
bitfield_padded=self._padded_bitfield,
bitfield_left_right=bitfield_left_right,
)
# TODO
# for now if there's a struct inside of a union that is being
# parsed when there's an error, the user will lose information
# about how far the parsing got. Here we are explicitly checking for
# adding structs and unions to a parent union.
elif (
(
issubclass(field, fields.Struct)
or issubclass(field, fields.Union)
)
and not isinstance(ctxt, fields.Union)
and hasattr(field, "_pfp__init")
):
# this is so that we can have all nested structs added to
# the root DOM, even if there's an error in parsing the data.
# If we didn't do this, any errors parsing the data would cause
# the new struct to not be added to its parent, and the user would
# not be able to see how far the script got
field = field(
stream,
metadata_processor=metadata_processor,
do_init=False,
)
field._pfp__interp = self
field_res = ctxt._pfp__add_child(field_name, field, stream)
# when adding a new field to a struct/union/fileast, add it to the
# root of the ctxt's scope so that it doesn't get lost by being declared
# from within a function
scope.add_var(field_name, field_res, root=True)
field_res._pfp__interp = self
field._pfp__init(stream)
added_child = True
else:
field = field(
stream, metadata_processor=metadata_processor
)
if not added_child:
field._pfp__interp = self
field_res = ctxt._pfp__add_child(field_name, field, stream)
field_res._pfp__interp = self
# when adding a new field to a struct/union/fileast, add it to the
# root of the ctxt's scope so that it doesn't get lost by being declared
# from within a function
scope.add_var(field_name, field_res, root=True)
# this shouldn't be used elsewhere, but should still be explicit with
# this flag
added_child = True
# enums will get here. If there is no name, then no
# field is being declared (but the enum values _will_
# get defined). E.g.:
# enum <uchar blah {
# BLAH1,
# BLAH2,
# BLAH3
# };
elif field_name is None:
pass
return field
def _handle_metadata(self, node, scope, ctxt, stream):
"""Handle metadata for the node
"""
self._dlog("handling node metadata {}".format(node.metadata.keyvals))
keyvals = node.metadata.keyvals
metadata_info = []
if "watch" in node.metadata.keyvals or "update" in keyvals:
metadata_info.append(
self._handle_watch_metadata(node, scope, ctxt, stream)
)
if "packtype" in node.metadata.keyvals or "packer" in keyvals:
metadata_info.append(
self._handle_packed_metadata(node, scope, ctxt, stream)
)
return metadata_info
# char blah[60] <pack=Zip, unpack=Unzip, packtype=DataType>;
# char blah[60] <packer=Zip, packtype=DataType>;
# int checksum <watch=field1,field2,field3, update=Crc32>;
def _handle_watch_metadata(self, node, scope, ctxt, stream):
"""Handle watch vars for fields
"""
keyvals = node.metadata.keyvals
if "watch" not in keyvals:
raise errors.PfpError(
"Packed fields require a packer function set"
)
if "update" not in keyvals:
raise errors.PfpError(
"Packed fields require a packer function set"
)
watch_field_name = keyvals["watch"]
update_func_name = keyvals["update"]
watch_fields = list(
map(lambda x: self.eval(x.strip()), watch_field_name.split(";"))
)
update_func = scope.get_id(update_func_name)
return {
"type": "watch",
"watch_fields": watch_fields,
"update_func": update_func,
"func_call_info": (ctxt, scope, stream, self, self._coord),
}
def _handle_packed_metadata(self, node, scope, ctxt, stream):
"""Handle packed metadata
"""
keyvals = node.metadata.keyvals
if "packer" not in keyvals and (
"pack" not in keyvals or "unpack" not in keyvals
):
raise errors.PfpError(
"Packed fields require a packer function to be set or pack and unpack functions to be set"
)
if "packtype" not in keyvals:
raise errors.PfpError("Packed fields require a packtype to be set")
args_ = {}
if "packer" in keyvals:
packer_func_name = keyvals["packer"]
packer_func = scope.get_id(packer_func_name)
args_["packer"] = packer_func
elif "pack" in keyvals and "unpack" in keyvals:
pack_func = scope.get_id(keyvals["pack"])
unpack_func = scope.get_id(keyvals["unpack"])
args_["pack"] = pack_func
args_["unpack"] = unpack_func
packtype_cls_name = keyvals["packtype"]
packtype_cls = scope.get_type(packtype_cls_name)
args_["pack_type"] = packtype_cls
args_["type"] = "packed"
args_["func_call_info"] = (ctxt, scope, stream, self, self._coord)
return args_
def _handle_byref_decl(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_byref_decl.
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling byref decl")
field = self._handle_node(node.type.type, scope, ctxt, stream)
# this will not really be used (maybe except for introspection)
# with byref function params
# see issue #35 - we need to wrap the field cls so that the byref
# doesn't permanently stay on the class
field = functions.ParamClsWrapper(field)
field.byref = True
return field
def _handle_type_decl(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_type_decl.
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling type decl")
decl = self._handle_node(node.type, scope, ctxt, stream)
return decl
def _handle_struct_ref(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_struct_ref.
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling struct ref")
# name
# field
struct = self._handle_node(node.name, scope, ctxt, stream)
try:
sub_field = getattr(struct, node.field.name)
except AttributeError as e:
# should be able to access implicit array items by index OR
# access the last one's members directly without index
#
# E.g.:
#
# local int total_length = 0;
# while(!FEof()) {
# HEADER header;
# total_length += header.length;
# }
if isinstance(struct, fields.Array) and struct.implicit:
last_item = struct[-1]
sub_field = getattr(last_item, node.field.name)
else:
raise
return sub_field
def _handle_union(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_union.
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling union")
union_cls = StructUnionDef("union", self, node)
return union_cls
def _handle_union_decls(self, node, scope, ctxt, stream):
self._dlog("handling union decls")
# new scope
scope = ctxt._pfp__scope = Scope(self._log, parent=scope)
try:
max_pos = 0
for decl in node.decls:
self._handle_node(decl, scope, ctxt, stream)
scope.clear_meta()
finally:
# the union will have reset the stream
stream.seek(stream.tell() + ctxt._pfp__width(), 0)
self._scope = scope._parent
def _handle_init_list(self, node, scope, ctxt, stream):
"""Handle InitList nodes (e.g. when initializing a struct)
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling init list")
res = []
for _, init_child in node.children():
init_field = self._handle_node(init_child, scope, ctxt, stream)
res.append(init_field)
return res
def _handle_struct_call_type_decl(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_struct_call_type_decl.
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling struct with parameters")
struct_cls = self._handle_node(node.type, scope, ctxt, stream)
struct_args = self._handle_node(node.args, scope, ctxt, stream)
res = StructDeclWithParams(scope, struct_cls, struct_args)
return res
def _handle_struct(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_struct.
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling struct")
if node.args is not None:
for param in node.args.params:
param.is_func_param = True
if node.decls is not None:
struct_cls = StructUnionDef("struct", self, node)
if node.name is not None:
scope.add_type_class(node.name, struct_cls)
return struct_cls
# it's declaring a struct field. E.g.
# struct IFD subDir;
else:
res = scope.get_type(node.name)
if res is None:
res = StructUnionDef(node.name, self, node)
return res
def _handle_struct_decls(self, node, scope, ctxt, stream):
self._dlog("handling struct decls")
# new scope
scope = ctxt._pfp__scope = Scope(self._log, parent=scope)
self._scope = scope
try:
for decl in node.decls:
# new context! (struct)
self._handle_node(decl, scope, ctxt, stream)
scope.clear_meta()
ctxt._pfp__process_fields_metadata()
# so that even if return statements/other exceptions
# happen, we'll still pop scope
finally:
# need to pop the scope!
self._scope = scope._parent
def _handle_identifier_type(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_identifier_type.
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling identifier")
cls = self._resolve_to_field_class(node.names, scope)
return cls
def _handle_typedef(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_typedef.
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
is_union_or_struct = node.type.type.__class__ in [
AST.Union,
AST.Struct,
]
is_enum = node.type.type.__class__ is AST.Enum
if is_union_or_struct:
self._dlog("handling typedef struct/union '{}'".format(node.name))
if node.type.type.name is None:
scope.add_type_struct_or_union(node.name, self, node.type.type)
else:
scope.add_refd_struct_or_union(node.name, node.type.type.name, self, node.type.type)
elif is_enum:
enum_cls = self._handle_node(node.type, scope, ctxt, stream)
scope.add_type_class(node.name, enum_cls)
elif isinstance(node.type, AST.ArrayDecl):
# this does not parse data, just creates the ArrayDecl class
array_cls = self._handle_node(node.type, scope, ctxt, stream)
scope.add_type_class(node.name, array_cls)
else:
names = node.type.type.names
self._dlog("handling typedef '{}' ({})".format(node.name, names))
# don't actually handle the TypeDecl and Identifier nodes,
# just directly add the types. Example structure:
#
# Typedef: BLAH, [], ['typedef']
# TypeDecl: BLAH, []
# IdentifierType: ['unsigned', 'char']
#
scope.add_type(node.name, names)
def _str_to_int(self, string):
"""Check for the hex
"""
string = string.lower()
if string.endswith("l"):
string = string[:-1]
if string.lower().startswith("0x"):
# should always match
match = re.match(r"0[xX]([a-fA-F0-9]+)", string)
return int(match.group(1), 0x10)
else:
return int(string)
def _choose_const_int_class(self, val):
if -0x80000000 < val < 0x80000000:
return fields.Int
elif 0 <= val < 0x100000000:
return fields.UInt
elif -0x8000000000000000 < val < 0x8000000000000000:
return fields.Int64
elif 0 <= val < 0x10000000000000000:
return fields.UInt64
def _handle_constant(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_constant.
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling constant type {}".format(node.type))
switch = {
"int": (self._str_to_int, self._choose_const_int_class),
"long": (self._str_to_int, self._choose_const_int_class),
# TODO this isn't quite right, but py010parser wouldn't have
# parsed it if it wasn't correct...
"float": (
lambda x: float(x.lower().replace("f", "")),
fields.Float,
),
"double": (float, fields.Double),
# cut out the quotes
"char": (lambda x: ord(utils.string_escape(x[1:-1])), fields.Char),
# TODO should this be unicode?? will probably bite me later...
# cut out the quotes
"string": (
lambda x: str(utils.string_escape(x[1:-1])),
fields.String,
),
}
if node.type in switch:
# return switch[node.type](node.value)
conversion, field_cls = switch[node.type]
val = conversion(node.value)
if hasattr(field_cls, "__call__") and not type(field_cls) is type:
field_cls = field_cls(val)
field = field_cls()
field._pfp__set_value(val)
return field
raise UnsupportedConstantType(node.coord, node.type)
def _handle_binary_op(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_binary_op.
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling binary operation {}".format(node.op))
switch = {
"+": lambda x, y: x + y,
"-": lambda x, y: x - y,
"*": lambda x, y: x * y,
"/": lambda x, y: x / y,
"|": lambda x, y: x | y,
"^": lambda x, y: x ^ y,
"&": lambda x, y: x & y,
"%": lambda x, y: x % y,
">": lambda x, y: x > y,
"<": lambda x, y: x < y,
"||": lambda x, y: 1 if x or y else 0,
">=": lambda x, y: x >= y,
"<=": lambda x, y: x <= y,
"==": lambda x, y: x == y,
"!=": lambda x, y: x != y,
"&&": lambda x, y: 1 if x and y else 0,
">>": lambda x, y: x >> y,
"<<": lambda x, y: x << y,
}
dest_type = scope.get_meta("dest_type")
left_val = self._handle_node(node.left, scope, ctxt, stream)
if dest_type is not None and not isinstance(left_val, dest_type):
new_left_val = dest_type()
new_left_val._pfp__set_value(left_val)
left_val = new_left_val
# short circuit power!
if node.op == "||" and left_val:
res = 1
else:
right_val = self._handle_node(node.right, scope, ctxt, stream)
if dest_type is not None and not isinstance(right_val, dest_type):
new_right_val = dest_type()
new_right_val._pfp__set_value(right_val)
right_val = new_right_val
if node.op not in switch:
raise errors.UnsupportedBinaryOperator(node.coord, node.op)
res = switch[node.op](left_val, right_val)
if type(res) is bool:
new_res = fields.Int()
if res:
new_res._pfp__set_value(1)
else:
new_res._pfp__set_value(0)
res = new_res
return res
def _handle_unary_op(self, node, scope, ctxt, stream):
"""TODO: Docstring for _handle_unary_op.
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling unary op {}".format(node.op))
special_switch = {
"parentof": self._handle_parentof,
"exists": self._handle_exists,
"function_exists": self._handle_function_exists,
"p++": self._handle_post_plus_plus,
"p--": self._handle_post_minus_minus,
}
switch = {
# for ++i and --i
"++": lambda x, v: x.__iadd__(1),
"--": lambda x, v: x.__isub__(1),
"~": lambda x, v: ~x,
"!": lambda x, v: not x,
"-": lambda x, v: -x,
"sizeof": lambda x, v: (fields.UInt64() + x._pfp__width()),
"startof": lambda x, v: (fields.UInt64() + x._pfp__offset),
}
if node.op not in switch and node.op not in special_switch:
raise errors.UnsupportedUnaryOperator(node.coord, node.op)
if node.op in special_switch:
return special_switch[node.op](node, scope, ctxt, stream)
field = self._handle_node(node.expr, scope, ctxt, stream)
if type(field) is type:
field = field()
res = switch[node.op](field, 1)
if type(res) is bool:
new_res = field.__class__()
new_res._pfp__set_value(1 if res == True else 0)
res = new_res
return res
def _handle_post_plus_plus(self, node, scope, ctxt, stream):
field = self._handle_node(node.expr, scope, ctxt, stream)
clone = field.__class__()
clone._pfp__set_value(field)
field += 1
return clone
def _handle_post_minus_minus(self, node, scope, ctxt, stream):
field = self._handle_node(node.expr, scope, ctxt, stream)
clone = field.__class__()
clone._pfp__set_value(field)
field -= 1
return clone
def _handle_parentof(self, node, scope, ctxt, stream):
"""Handle the parentof unary operator
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
# if someone does something like parentof(this).blah,
# we'll end up with a StructRef instead of an ID ref
# for node.expr, but we'll also end up with a structref
# if the user does parentof(a.b.c)...
#
# TODO how to differentiate between the two??
#
# the proper way would be to do (parentof(a.b.c)).a or
# (parentof a.b.c).a
field = self._handle_node(node.expr, scope, ctxt, stream)
parent = field._pfp__parent
return parent
def _handle_exists(self, node, scope, ctxt, stream):
"""Handle the exists unary operator
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
res = fields.Int()
try:
self._handle_node(node.expr, scope, ctxt, stream)
res._pfp__set_value(1)
except AttributeError:
res._pfp__set_value(0)
return res
def _handle_function_exists(self, node, scope, ctxt, stream):
"""Handle the function_exists unary operator
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
res = fields.Int()
try:
func = self._handle_node(node.expr, scope, ctxt, stream)
if isinstance(func, functions.BaseFunction):
res._pfp__set_value(1)
else:
res._pfp__set_value(0)
except errors.UnresolvedID:
res._pfp__set_value(0)
return res
def _handle_id(self, node, scope, ctxt, stream):
"""Handle an ID node (return a field object for the ID)
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
if node.name == "__root":
return self._root
if node.name == "__this" or node.name == "this":
return ctxt
self._dlog("handling id {}".format(node.name))
field = scope.get_id(node.name)
is_lazy = getattr(node, "is_lazy", False)
if field is None and not is_lazy:
raise errors.UnresolvedID(node.coord, node.name)
elif is_lazy:
return LazyField(node.name, scope)
return field
def _handle_assignment(self, node, scope, ctxt, stream):
"""Handle assignment nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
def add_op(x, y):
x += y
def sub_op(x, y):
x -= y
def div_op(x, y):
x.__idiv__(y)
def mod_op(x, y):
x %= y
def mul_op(x, y):
x *= y
def xor_op(x, y):
x ^= y
def and_op(x, y):
x &= y
def or_op(x, y):
x |= y
def lshift_op(x, y):
x <<= y
def rshift_op(x, y):
x >>= y
def assign_op(x, y):
x._pfp__set_value(y)
switch = {
"+=": add_op,
"-=": sub_op,
"/=": div_op,
"%=": mod_op,
"*=": mul_op,
"^=": xor_op,
"&=": and_op,
"|=": or_op,
"<<=": lshift_op,
">>=": rshift_op,
"=": assign_op,
}
scope.clear_meta()
self._dlog("handling assignment")
field = self._handle_node(node.lvalue, scope, ctxt, stream)
self._dlog("field = {}".format(field))
scope.push_meta("dest_type", field._pfp__get_class())
value = self._handle_node(
node.rvalue,
scope,
ctxt,
stream,
)
if node.op is None:
self._dlog("value = {}".format(value))
field._pfp__set_value(value)
else:
self._dlog("value {}= {}".format(node.op, value))
if node.op not in switch:
raise errors.UnsupportedAssignmentOperator(node.coord, node.op)
switch[node.op](field, value)
return field
def _handle_func_def(self, node, scope, ctxt, stream):
"""Handle FuncDef nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling function definition")
func = self._handle_node(node.decl, scope, ctxt, stream)
func.body = node.body
def _handle_param_list(self, node, scope, ctxt, stream):
"""Handle ParamList nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling param list")
# params should be a list of tuples:
# [(<name>, <field_class>), ...]
params = []
for param in node.params:
self._mark_id_as_lazy(param)
param_info = self._handle_node(param, scope, ctxt, stream)
params.append(param_info)
param_list = functions.ParamListDef(params, node.coord)
return param_list
def _handle_func_decl(self, node, scope, ctxt, stream):
"""Handle FuncDecl nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling func decl")
if node.args is not None:
# could just call _handle_param_list directly...
for param in node.args.params:
# see the check in _handle_decl for how this is kept from
# being added to the local context/scope
param.is_func_param = True
params = self._handle_node(node.args, scope, ctxt, stream)
else:
params = functions.ParamListDef([], node.coord)
func_type = self._handle_node(node.type, scope, ctxt, stream)
func = functions.Function(func_type, params, scope)
return func
def _handle_func_call(self, node, scope, ctxt, stream):
"""Handle FuncCall nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling function call to '{}'".format(node.name.name))
if node.args is None:
func_args = []
else:
func_args = self._handle_node(node.args, scope, ctxt, stream)
func = self._handle_node(node.name, scope, ctxt, stream)
return func.call(func_args, ctxt, scope, stream, self, node.coord)
def _handle_expr_list(self, node, scope, ctxt, stream):
"""Handle ExprList nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling expression list")
exprs = [
self._handle_node(expr, scope, ctxt, stream) for expr in node.exprs
]
return exprs
def _handle_compound(self, node, scope, ctxt, stream):
"""Handle Compound nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling compound statement")
# scope.push()
try:
for child in node.children():
scope.clear_meta()
self._handle_node(child, scope, ctxt, stream)
# in case a return occurs, be sure to pop the scope
# (returns are implemented by raising an exception)
finally:
# scope.pop()
pass
def _handle_return(self, node, scope, ctxt, stream):
"""Handle Return nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling return")
if node.expr is None:
ret_val = None
else:
ret_val = self._handle_node(node.expr, scope, ctxt, stream)
self._dlog("return value = {}".format(ret_val))
raise errors.InterpReturn(ret_val)
def _handle_enum(self, node, scope, ctxt, stream):
"""Handle enum nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling enum")
if node.type is None:
enum_cls = fields.Int
else:
enum_cls = self._handle_node(node.type, scope, ctxt, stream)
enum_vals = {}
curr_val = enum_cls()
curr_val._pfp__value = 0
prev_val = None
for enumerator in node.values.enumerators:
if enumerator.value is not None:
curr_val_parsed = self._handle_node(
enumerator.value, scope, ctxt, stream
)
curr_val = enum_cls()
curr_val._pfp__set_value(curr_val_parsed._pfp__value)
elif prev_val is not None:
curr_val = prev_val + 1
curr_val.signed = enum_cls.signed
curr_val._pfp__freeze()
enum_vals[enumerator.name] = curr_val
enum_vals[fields.PYVAL(curr_val)] = enumerator.name
scope.add_local(enumerator.name, curr_val)
prev_val = curr_val
if node.name is not None:
enum_cls = EnumDef(node.name, enum_cls, enum_vals)
scope.add_type_class(node.name, enum_cls)
else:
enum_cls = EnumDef(
"enum_" + enum_cls.__name__, enum_cls, enum_vals
)
# don't add to scope if we don't have a name
return enum_cls
def _handle_array_decl(self, node, scope, ctxt, stream):
"""Handle ArrayDecl nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog(
"handling array declaration '{}'".format(node.type.declname)
)
if node.dim is None:
# will be used
array_size = None
else:
array_size = self._handle_node(node.dim, scope, ctxt, stream)
self._dlog("array size = {}".format(array_size))
# TODO node.dim_quals
# node.type
field_cls = self._handle_node(node.type, scope, ctxt, stream)
self._dlog("field class = {}".format(field_cls))
array = ArrayDecl(field_cls, array_size)
# array = fields.Array(array_size, field_cls)
array._pfp__name = node.type.declname
# array._pfp__parse(stream)
return array
def _handle_array_ref(self, node, scope, ctxt, stream):
"""Handle ArrayRef nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
ary = self._handle_node(node.name, scope, ctxt, stream)
subscript = self._handle_node(node.subscript, scope, ctxt, stream)
return ary[fields.PYVAL(subscript)]
def _handle_if(self, node, scope, ctxt, stream):
"""Handle If nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling if/ternary_op")
cond = self._handle_node(node.cond, scope, ctxt, stream)
if cond:
# there should always be an iftrue
return self._handle_node(node.iftrue, scope, ctxt, stream)
else:
if node.iffalse is not None:
return self._handle_node(node.iffalse, scope, ctxt, stream)
def _handle_for(self, node, scope, ctxt, stream):
"""Handle For nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling for")
if node.init is not None:
# perform the init
self._handle_node(node.init, scope, ctxt, stream)
while node.cond is None or self._handle_node(
node.cond, scope, ctxt, stream
):
if node.stmt is not None:
try:
# do the for body
self._handle_node(node.stmt, scope, ctxt, stream)
except errors.InterpBreak as e:
break
# we still need to interpret the "next" statement,
# so just pass
except errors.InterpContinue as e:
pass
if node.next is not None:
# do the next statement
self._handle_node(node.next, scope, ctxt, stream)
def _handle_while(self, node, scope, ctxt, stream):
"""Handle break node
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling while")
while node.cond is None or self._handle_node(
node.cond, scope, ctxt, stream
):
if node.stmt is not None:
try:
self._handle_node(node.stmt, scope, ctxt, stream)
except errors.InterpBreak as e:
break
except errors.InterpContinue as e:
pass
def _handle_do_while(self, node, scope, ctxt, stream):
"""Handle break node
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling do while")
while True:
if node.stmt is not None:
try:
self._handle_node(node.stmt, scope, ctxt, stream)
except errors.InterpBreak as e:
break
except errors.InterpContinue as e:
pass
if node.cond is not None and not self._handle_node(
node.cond, scope, ctxt, stream,
):
break
def _flatten_list(self, l):
for el in l:
if isinstance(el, list) and not isinstance(el, AST.Node):
for sub in self._flatten_list(el):
yield sub
else:
yield el
def _handle_switch(self, node, scope, ctxt, stream):
"""Handle break node
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
def exec_case(idx, cases):
# keep executing cases until a break is found,
# or they've all been executed
for case in cases[idx:]:
stmts = case.stmts
try:
for stmt in stmts:
self._handle_node(stmt, scope, ctxt, stream)
except errors.InterpBreak as e:
break
def get_stmts(stmts, res=None):
if res is None:
res = []
stmts = self._flatten_list(stmts)
for stmt in stmts:
if isinstance(stmt, tuple):
stmt = stmt[1]
res.append(stmt)
if stmt.__class__ in [AST.Case, AST.Default]:
get_stmts(stmt.stmts, res)
return res
def get_cases(nodes, acc=None):
cases = []
stmts = get_stmts(nodes)
for stmt in stmts:
if stmt.__class__ in [AST.Case, AST.Default]:
cases.append(stmt)
stmt.stmts = []
else:
cases[-1].stmts.append(stmt)
return cases
cond = self._handle_node(node.cond, scope, ctxt, stream)
default_idx = None
found_match = False
cases = getattr(node, "pfp_cases", None)
if cases is None:
cases = get_cases(node.stmt.children())
node.pfp_cases = cases
for idx, child in enumerate(cases):
if child.__class__ == AST.Default:
default_idx = idx
continue
elif child.__class__ == AST.Case:
expr = self._handle_node(child.expr, scope, ctxt, stream)
if expr == cond:
found_match = True
exec_case(idx, cases)
break
if default_idx is not None and not found_match:
exec_case(default_idx, cases)
def _handle_break(self, node, scope, ctxt, stream):
"""Handle break node
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling break")
raise errors.InterpBreak()
def _handle_continue(self, node, scope, ctxt, stream):
"""Handle continue node
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling continue")
raise errors.InterpContinue()
def _handle_decl_list(self, node, scope, ctxt, stream):
"""Handle For nodes
:node: TODO
:scope: TODO
:ctxt: TODO
:stream: TODO
:returns: TODO
"""
self._dlog("handling decl list")
# just handle each declaration
for decl in node.decls:
self._handle_node(decl, scope, ctxt, stream)
# -----------------------------
# UTILITY
# -----------------------------
def _mark_id_as_lazy(self, node):
curr = node
while curr is not None and curr.__class__ is not AST.ID:
if getattr(curr, "type", None) is not None:
curr = curr.type
else:
curr = None
break
if curr is not None:
curr.is_lazy = True
def _node_is_breakable(self, node):
if not self._int3:
return False
breakable_classes = [
AST.FileAST,
AST.Decl,
# AST.ByRefDecl,
# AST.TypeDecl,
# AST.Struct,
# AST.IdentifierType,
AST.Typedef,
# AST.Constant,
AST.BinaryOp,
AST.Assignment,
# AST.ID,
AST.UnaryOp,
# AST.FuncDef,
AST.FuncCall,
# AST.FuncDecl,
# AST.ParamList,
# AST.ExprList,
# AST.Compound,
AST.Return,
AST.ArrayDecl,
AST.Continue,
AST.Break,
AST.Switch,
AST.Case,
]
return node.__class__ in breakable_classes
def _create_scope(self):
"""TODO: Docstring for _create_scope.
:returns: TODO
"""
res = Scope(self._log)
for func_name, native_func in six.iteritems(self._natives):
res.add_local(func_name, native_func)
return res
def _get_value(self, node, scope, ctxt, stream):
"""Return the value of the node. It is expected to be
either an AST.ID instance or a constant
:node: TODO
:returns: TODO
"""
res = self._handle_node(node, scope, ctxt, stream)
if isinstance(res, fields.Field):
return res._pfp__value
# assume it's a constant
else:
return res
def _resolve_to_field_class(self, names, scope):
"""Resolve the names to a class in fields.py, resolving past
typedefs, etc
:names: TODO
:scope: TODO
:ctxt: TODO
:returns: TODO
"""
switch = {
"char": "Char",
"int": "Int",
"long": "Int",
"int64": "Int64",
"uint64": "UInt64",
"short": "Short",
"double": "Double",
"float": "Float",
"void": "Void",
"string": "String",
"wstring": "WString",
}
core = names[-1]
if core not in switch:
# will return a list of resolved names
type_info = scope.get_type(core)
if type(type_info) is type and issubclass(type_info, fields.Field):
return type_info
resolved_names = type_info
if resolved_names is None:
raise errors.UnresolvedType(self._coord, " ".join(names), " ")
if resolved_names[-1] not in switch:
raise errors.UnresolvedType(
self._coord, " ".join(names), " ".join(resolved_names)
)
names = copy.copy(names)
names.pop()
names += resolved_names
if len(names) >= 2 and names[-1] == names[-2] and names[-1] == "long":
res = "Int64"
else:
res = switch[names[-1]]
if (
names[-1] in ["char", "short", "int", "long"]
and "unsigned" in names[:-1]
):
res = "U" + res
cls = getattr(fields, res)
return cls
def is_forward_declared_struct(node):
return (
isinstance(node, AST.Decl)
and node.init is None
and isinstance(node.type, AST.Struct)
and node.type.decls is None
)