#!/usr/bin/env python
# encoding: utf-8
import contextlib
from intervaltree import IntervalTree, Interval
import json
import math
import re
import six
import struct
import pfp.errors as errors
import pfp.utils as utils
import pfp.bitwrap as bitwrap
import pfp.functions as functions
BIG_ENDIAN = ">"
LITTLE_ENDIAN = "<"
def true():
res = Int()
res._pfp__value = 1
return res
def false():
res = Int()
res._pfp__value = 0
return res
def get_value(field):
if isinstance(field, Field):
if isinstance(field, Array):
return field._array_to_str()
return field._pfp__value
else:
return field
def get_width(field):
if isinstance(field, Field):
return field.width
elif isinstance(field, int):
if field == 0:
return 1
if field > 0:
return int(math.ceil(field.bit_length() / 8.0))
else:
return int(math.ceil(((~field).bit_length() + 1) / 8.0))
else:
raise Exception("Unexpected type: {}".format(field))
def get_str(field):
if isinstance(field, Array):
res = field._array_to_str()
elif isinstance(field, Char):
res = chr(PYVAL(field))
else:
res = get_value(field)
return utils.string(res)
def inherit_hash(cls):
cls.__hash__ = Field.__hash__
return cls
PYVAL = get_value
PYSTR = get_str
[docs]class BitfieldRW(object):
"""Handles reading and writing the total bits for the bitfield
data type from the input stream, and correctly applying
endian and bit direction settings.
"""
def __init__(self, interp, cls):
"""Set the interpreter and stream for BitFieldReader
:param pfp.interp.PfpInterp interp: The interpreter being used
:param pfp.bitwrap.BitwrappedStream stream: The bitwrapped stream
:param pfp.fields.Field cls: The class (a subclass of :any:`pfp.fields.Field`) - used to determine total size of bitfield (if padded).
"""
self.interp = interp
self.cls = cls
self.max_bits = self.cls.width * 8
self.reserved_bits = 0
self.offset = None
self.total_bits_read = 0
# only used with padding is enabled
self._cls_bits = None
# used to write to the stream
self._write_bits = []
[docs] def reserve_bits(self, num_bits, stream):
"""Used to "reserve" ``num_bits`` amount of bits in order to keep track
of consecutive bitfields (or are the called bitfield groups?).
E.g. ::
struct {
char a:8, b:8;
char c:4, d:4, e:8;
}
:param int num_bits: The number of bits to claim
:param pfp.bitwrap.BitwrappedStream stream: The stream to reserve bits on
:returns: If room existed for the reservation
"""
padded = self.interp.get_bitfield_padded()
num_bits = PYVAL(num_bits)
if padded:
num_bits = PYVAL(num_bits)
if num_bits + self.reserved_bits > self.max_bits:
return False
# if unpadded, always allow it
if not padded:
if self._cls_bits is None:
self._cls_bits = []
# reserve bits will only be called just prior to reading the bits,
# so check to see if we have enough bits in self._cls_bits, else
# read what's missing
diff = len(self._cls_bits) - num_bits
if diff < 0:
self._cls_bits += self._do_read_bits(stream, -diff)
self.reserved_bits += num_bits
return True
def tell(self, stream):
if self.offset is None:
return stream.tell()
else:
bytes_offset = self.total_bits_read // 8
return self.offset + bytes_offset
def tell_bits(self):
return 8 - (self.total_bits_read % 8)
[docs] def read_bits(self, stream, num_bits, padded, left_right, endian):
"""Return ``num_bits`` bits, taking into account endianness and
left-right bit directions
"""
if self._cls_bits is None and padded:
raw_bits = self._do_read_bits(stream, self.cls.width * 8)
self._cls_bits = self._endian_transform(raw_bits, endian)
if self._cls_bits is not None:
if num_bits > len(self._cls_bits):
raise errors.PfpError("BitfieldRW reached invalid state")
if left_right:
res = self._cls_bits[:num_bits]
self._cls_bits = self._cls_bits[num_bits:]
else:
res = self._cls_bits[-num_bits:]
self._cls_bits = self._cls_bits[:-num_bits]
else:
res = self._do_read_bits(stream, num_bits)
self.total_bits_read += len(res)
return res
[docs] def write_bits(self, stream, raw_bits, padded, left_right, endian):
"""Write the bits. Once the size of the written bits is equal
to the number of the reserved bits, flush it to the stream
"""
if padded:
if left_right:
self._write_bits += raw_bits
else:
self._write_bits = raw_bits + self._write_bits
if len(self._write_bits) == self.reserved_bits:
bits = self._endian_transform(self._write_bits, endian)
# if it's padded, and all of the bits in the field weren't used,
# we need to flush out the unused bits
# TODO should we save the value of the unused bits so the data that
# is written out matches exactly what was read?
if self.reserved_bits < self.cls.width * 8:
filler = [0] * ((self.cls.width * 8) - self.reserved_bits)
if left_right:
bits += filler
else:
bits = filler + bits
stream.write_bits(bits)
self._write_bits = []
else:
# if an unpadded field ended up using the same BitfieldRW and
# as a previous padded field, there will be unwritten bits left in
# self._write_bits. These need to be flushed out as well
if len(self._write_bits) > 0:
stream.write_bits(self._write_bits)
self._write_bits = []
stream.write_bits(raw_bits)
def _endian_transform(self, bits, endian):
res = []
# perform endianness transformation
for x in six.moves.range(self.cls.width):
if endian == BIG_ENDIAN:
curr_byte_idx = x
else:
curr_byte_idx = self.cls.width - x - 1
res += bits[curr_byte_idx * 8 : (curr_byte_idx + 1) * 8]
return res
def _do_read_bits(self, stream, num):
"""Read ``num`` number of bits from the stream
"""
if self.offset is None:
self.offset = stream.tell()
return stream.read_bits(num)
[docs]class Field(object):
"""Core class for all fields used in the Pfp DOM.
All methods use the _pfp__XXX naming convention to
avoid conflicting names used in templates, since
struct fields will implement ``__getattr__`` and
``__setattr__`` to directly access child fields"""
_pfp__interp = None
_pfp__name = None
"""The name of the Field"""
_pfp__parent = None
"""The parent of the field"""
_pfp__prev_sibling = None
"""The previous field in this scope"""
_pfp__next_sibling = None
"""The next field in this scope"""
_pfp__watchers = []
"""All fields that are watching this field"""
_pfp__watch_fields = []
"""All fields that this field is watching"""
def __init__(self, stream=None, metadata_processor=None):
super(Field, self).__init__()
self._pfp__name = None
self._pfp__frozen = False
self._pfp__offset = -1
self._pfp__offset_bits = None
# watchers to update when something changes
self._pfp__watchers = []
self._pfp__parent = None
self._pfp__watch_fields = []
self._pfp__no_notify = False
self._pfp__packer = None
self._pfp__unpack = None
self._pfp__pack = None
self._pfp__pack_type = None
self._pfp__no_unpack = False
self._pfp__parsed_packed = None
self._pfp__metadata_processor = metadata_processor
self._ = None
self._pfp__array_idx = None
self._pfp__snapshot_stack = []
if stream is not None:
self._pfp__parse(stream, save_offset=True)
def _pfp__get_class(self):
"""Return the class for this field. This would be used for things like
integer promotion and type casting.
"""
return self.__class__
# see #51 - fields should have a method of returning the full path of its name
[docs] def _pfp__path(self):
"""Return the full pathname of this field. E.g. given
the template below, the ``a`` field would have a full
path of ``root.nested.a``
.. code-block:: c
struct {
struct {
char a;
} nested;
} root;
"""
curr = self
res = []
while curr is not None:
# don't show the meta __root name in the path
if curr._pfp__name == "__root" and curr._pfp__parent is None:
break
if curr._pfp__name is not None:
res.append(curr._pfp__name)
if isinstance(curr._pfp__parent, Array):
curr = curr._pfp__parent._pfp__parent
else:
curr = curr._pfp__parent
return ".".join(reversed(res))
def _pfp__snapshot(self, recurse=True):
"""Save off the current value of the field
"""
if hasattr(self, "_pfp__value"):
self._pfp__snapshot_stack.append(self._pfp__value)
def _pfp__restore_snapshot(self, recurse=True):
"""Restore a saved value snapshot
"""
if hasattr(self, "_pfp__value"):
self._pfp__value = self._pfp__snapshot_stack.pop()
def _pfp__process_metadata(self):
"""Process the metadata once the entire struct has been
declared.
"""
if self._pfp__metadata_processor is None:
return
metadata_info = self._pfp__metadata_processor()
if isinstance(metadata_info, list):
for metadata in metadata_info:
if metadata["type"] == "watch":
self._pfp__set_watch(
metadata["watch_fields"],
metadata["update_func"],
*metadata["func_call_info"]
)
elif metadata["type"] == "packed":
del metadata["type"]
self._pfp__set_packer(**metadata)
if self._pfp__can_unpack():
self._pfp__unpack_data(self.raw_data)
def _pfp__watch(self, watcher):
"""Add the watcher to the list of fields that
are watching this field
"""
if self._pfp__parent is not None and isinstance(
self._pfp__parent, Union
):
self._pfp__parent._pfp__watch(watcher)
else:
self._pfp__watchers.append(watcher)
def _pfp__set_watch(self, watch_fields, update_func, *func_call_info):
"""Subscribe to update events on each field in ``watch_fields``, using
``update_func`` to update self's value when ``watch_field``
changes"""
self._pfp__watch_fields = watch_fields
for watch_field in watch_fields:
watch_field._pfp__watch(self)
self._pfp__update_func = update_func
self._pfp__update_func_call_info = func_call_info
def _pfp__set_packer(
self,
pack_type,
packer=None,
pack=None,
unpack=None,
func_call_info=None,
):
"""Set the packer/pack/unpack functions for this field, as
well as the pack type.
:pack_type: The data type of the packed data
:packer: A function that can handle packing and unpacking. First
arg is true/false (to pack or unpack). Second arg is the stream.
Must return an array of chars.
:pack: A function that packs data. It must accept an array of chars and return an
array of chars that is a packed form of the input.
:unpack: A function that unpacks data. It must accept an array of chars and
return an array of chars
"""
self._pfp__pack_type = pack_type
self._pfp__unpack = unpack
self._pfp__pack = pack
self._pfp__packer = packer
self._pfp__pack_func_call_info = func_call_info
def _pfp__pack_data(self):
"""Pack the nested field
"""
if self._pfp__pack_type is None:
return
tmp_stream = six.BytesIO()
self._._pfp__build(bitwrap.BitwrappedStream(tmp_stream))
raw_data = tmp_stream.getvalue()
unpack_func = self._pfp__packer
unpack_args = []
if self._pfp__packer is not None:
unpack_func = self._pfp__packer
unpack_args = [true(), raw_data]
elif self._pfp__pack is not None:
unpack_func = self._pfp__pack
unpack_args = [raw_data]
# does not need to be converted to a char array
if not isinstance(unpack_func, functions.NativeFunction):
io_stream = bitwrap.BitwrappedStream(six.BytesIO(raw_data))
unpack_args[-1] = Array(len(raw_data), Char, io_stream)
res = unpack_func.call(
unpack_args, *self._pfp__pack_func_call_info, no_cast=True
)
if isinstance(res, Array):
res = res._pfp__build()
io_stream = six.BytesIO(res)
tmp_stream = bitwrap.BitwrappedStream(io_stream)
self._pfp__no_unpack = True
self._pfp__parse(tmp_stream)
self._pfp__no_unpack = False
def _pfp__can_unpack(self):
"""Return if this field has a packer/pack/unpack methods
set as well as a pack type
"""
return self._pfp__pack_type is not None
def _pfp__unpack_data(self, raw_data):
"""Means that the field has already been parsed normally,
and that it now needs to be unpacked.
:raw_data: A string of the data that the field consumed while parsing
"""
if self._pfp__pack_type is None:
return
if self._pfp__no_unpack:
return
unpack_func = self._pfp__packer
unpack_args = []
if self._pfp__packer is not None:
unpack_func = self._pfp__packer
unpack_args = [false(), raw_data]
elif self._pfp__unpack is not None:
unpack_func = self._pfp__unpack
unpack_args = [raw_data]
# does not need to be converted to a char array
if not isinstance(unpack_func, functions.NativeFunction):
io_stream = bitwrap.BitwrappedStream(six.BytesIO(raw_data))
unpack_args[-1] = Array(len(raw_data), Char, io_stream)
res = unpack_func.call(
unpack_args, *self._pfp__pack_func_call_info, no_cast=True
)
if isinstance(res, Array):
res = res._pfp__build()
io_stream = six.BytesIO(res)
tmp_stream = bitwrap.BitwrappedStream(io_stream)
tmp_stream.padded = self._pfp__interp.get_bitfield_padded()
self._ = self._pfp__parsed_packed = self._pfp__pack_type(tmp_stream)
self._._pfp__watch(self)
def _pfp__handle_updated(self, watched_field):
"""Handle the watched field that was updated
"""
self._pfp__no_notify = True
# nested data has been changed, so rebuild the
# nested data to update the field
# TODO a global setting to determine this behavior?
# could slow things down a bit for large nested structures
# notice the use of _is_ here - 'is' != '=='. '==' uses
# the __eq__ operator, while is compares id(object) results
if watched_field is self._:
self._pfp__pack_data()
elif self._pfp__update_func is not None:
self._pfp__update_func.call(
[self] + self._pfp__watch_fields,
*self._pfp__update_func_call_info
)
self._pfp__no_notify = False
def _pfp__notify_update(self, child=None):
for watcher in self._pfp__watchers:
watcher._pfp__handle_updated(self)
if self._pfp__parent is not None:
self._pfp__parent._pfp__notify_update(self)
[docs] def _pfp__width(self):
"""Return the width of the field (sizeof)
"""
raw_output = six.BytesIO()
output = bitwrap.BitwrappedStream(raw_output)
self._pfp__build(output)
output.flush()
return len(raw_output.getvalue())
def _pfp__freeze(self):
"""Freeze the field so that it cannot be modified (const)
"""
self._pfp__frozen = True
def _pfp__get_root_value(self, val):
"""helper function to fetch the root value of an object"""
if isinstance(val, Field):
return val._pfp__value
else:
return val
[docs] def _pfp__set_value(self, new_val):
"""Set the new value if type checking is passes, potentially
(TODO? reevaluate this) casting the value to something else
:new_val: The new value
:returns: TODO
"""
if self._pfp__frozen:
raise errors.UnmodifiableConst()
self._pfp__value = get_value(new_val)
return self._pfp__notify_parent()
def _pfp__notify_parent(self):
if self._pfp__no_notify:
return []
notified_watchers = []
for watcher in self._pfp__watchers:
watcher._pfp__handle_updated(self)
notified_watchers.append(watcher)
if self._pfp__parent is not None:
self._pfp__parent._pfp__notify_update(self)
return notified_watchers
[docs] def _pfp__build(self, output_stream=None, save_offset=False):
"""Pack this field into a string. If output_stream is specified,
write the output into the output stream
:output_stream: Optional output stream to write the results to
:save_offset: If true, the current offset into the stream will be saved in the field
:returns: Resulting string if ``output_stream`` is not specified. Else the number of bytes writtern.
"""
raise NotImplemented(
"Inheriting classes must implement the _pfp__build function"
)
[docs] def _pfp__parse(self, stream, save_offset=False):
"""Parse this field from the ``stream``
:stream: An IO stream that can be read from
:save_offset: Save the offset into the stream
:returns: None
"""
raise NotImplemented(
"Inheriting classes must implement the _pfp__parse function"
)
def _pfp__maybe_unpack(self):
"""Should be called after initial parsing to unpack any
nested data types
"""
if self._pfp__pack_type is None or (
self._pfp__pack is not None and self._pfp__packer is not None
):
return
pass
def _pfp__pack(self):
"""Should be called after initial parsing
"""
pass
def __cmp__(self, other):
"""Compare the Field to something else, either another
Field or something else
:other: Another Field instance or something else
:returns: result of cmp()
"""
val = get_value(other)
return cmp(self._pfp__value, val)
def __lt__(self, other):
"""Compare the Field to something else, either another
Field or something else
:other: The other field
:returns: True if equal
"""
val = get_value(other)
return self._pfp__value < val
def __le__(self, other):
val = get_value(other)
return self._pfp__value <= val
def __gt__(self, other):
"""Compare the Field to something else, either another
Field or something else
:other: The other field
:returns: True if equal
"""
val = get_value(other)
return self._pfp__value > val
def __ge__(self, other):
val = get_value(other)
return self._pfp__value >= val
def __ne__(self, other):
val = get_value(other)
return self._pfp__value != val
def __eq__(self, other):
"""See if the two items are equal (True/False)
:other:
:returns:
"""
val = get_value(other)
return self._pfp__value == other
# see #49 - needed for some fuzzing functionality changes (python3
# was complaining about Fields not being hashable)
#
# Also note - this is not inheritable in python3 and MUST be explicitly
# set: https://stackoverflow.com/a/1608907
def __hash__(self):
#return self._pfp__value.__hash__()
res = self._pfp__build()
# bit fields return an array, not bytes
if isinstance(res, list):
return str(res).__hash__()
return res.__hash__()
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, self._pfp__value)
def __getitem__(self, idx):
if idx != 0:
raise IndexError(idx)
return self
[docs] def _pfp__show(self, level=0, include_offset=False):
"""Return a representation of this field
:param int level: The indent level of the output
:param bool include_offset: Include the parsed offsets of this field
"""
return repr(self)
def _pfp__promote(self, other):
"""Noop promotion on the base NumberBase class
"""
return other
[docs]@inherit_hash
class Void(Field):
"""The void field - used for return value of a function"""
pass
[docs]@inherit_hash
class ImplicitArrayWrapper(Field):
"""
"""
last_field = None
implicit_array = None
def __init__(self, last_field, implicit_array):
"""Redirect all attribute accesses to the ``last_field``, except for
array indexing. Array indexing is forwarded on to the
``implicit_array``.
"""
super(Field, self).__setattr__("last_field", last_field)
super(Field, self).__setattr__("implicit_array", implicit_array)
def _pfp__get_class(self):
"""Return the last field class type and not the array type. Overrides
:any:`Field._pfp__get_class`.
"""
return self.last_field._pfp__get_class()
def __setattr__(self, name, value):
"""Custom setattr that forwards all sets to the last_field
"""
return setattr(self.last_field, name, value)
def __getattr__(self, name):
"""Custom getattr that forwards all gets to last_field.
"""
return getattr(self.last_field, name)
def __getitem__(self, key):
"""Let this ImplicitArrayWrapper act like an array
"""
return self.implicit_array[key]
[docs]@inherit_hash
class Struct(Field):
"""The struct field"""
_pfp__show_name = "struct"
_pfp__children = []
"""All children of the struct, in order added"""
_pfp__implicit_arrays = {}
"""Mapping of all implicit arrays in this struct. All implicit arrays will
be resolved to a concrete array after parsing is complete"""
_pfp__name_collisions = {}
"""Counters for any naming collisions"""
_pfp__scope = None
def __init__(self, stream=None, metadata_processor=None):
# ordered list of children
super(Struct, self).__setattr__("_pfp__children", [])
# initialize implicit arrays for this struct instance
super(Struct, self).__setattr__("_pfp__implicit_arrays", {})
# for quick child access
super(Struct, self).__setattr__("_pfp__children_map", {})
# reinit it here just for this instance...
self._pfp__name_collisions = {}
super(Struct, self).__init__(metadata_processor=metadata_processor)
if stream is not None:
self._pfp__offset = stream.tell()
def _pfp__finalize(self):
"""Finalize the results of parsing the data. Currently this involves:
* resolving implicit arrays to concrete arrays
"""
to_swap = []
for child_name, child in six.iteritems(self._pfp__children_map):
if isinstance(child, Struct):
child._pfp__finalize()
continue
if child_name not in self._pfp__implicit_arrays:
continue
to_swap.append((child_name, child))
for child_name, child in to_swap:
implicit_array = self._pfp__implicit_arrays[child_name]
self._pfp__children_map[child_name] = implicit_array
self._pfp__scope.add_var(child_name, implicit_array)
def _pfp__snapshot(self, recurse=True):
"""Save off the current value of the field
"""
super(Struct, self)._pfp__snapshot(recurse=recurse)
if recurse:
for child in self._pfp__children:
child._pfp__snapshot(recurse=recurse)
def _pfp__restore_snapshot(self, recurse=True):
"""Restore the snapshotted value without triggering any events
"""
super(Struct, self)._pfp__restore_snapshot(recurse=recurse)
if recurse:
for child in self._pfp__children:
child._pfp__restore_snapshot(recurse=recurse)
def _pfp__process_fields_metadata(self):
"""Tell each child to process its metadata
"""
for child in self._pfp__children:
child._pfp__process_metadata()
def _pfp__set_value(self, value):
"""Initialize the struct. Value should be an array of
fields, one each for each struct member.
:value: An array of fields to initialize the struct with
:returns: None
"""
if self._pfp__frozen:
raise errors.UnmodifiableConst()
if len(value) != len(self._pfp__children):
raise errors.PfpError(
"struct initialization has wrong number of members"
)
modified_watchers = []
for x in six.moves.range(len(self._pfp__children)):
modified_watchers += self._pfp__children[x]._pfp__set_value(value[x])
return modified_watches
[docs] def _pfp__add_child(self, name, child, stream=None, overwrite=False):
"""Add a child to the Struct field. If multiple consecutive fields are
added with the same name, an implicit array will be created to store
all fields of that name.
:param str name: The name of the child
:param pfp.fields.Field child: The field to add
:param bool overwrite: Overwrite existing fields (False)
:param pfp.bitwrap.BitwrappedStream stream: unused, but her for compatability with Union._pfp__add_child
:returns: The resulting field added
"""
res = None
if not overwrite and self._pfp__is_non_consecutive_duplicate(
name, child
):
res = self._pfp__handle_non_consecutive_duplicate(name, child)
elif not overwrite and name in self._pfp__children_map:
implicit_array = self._pfp__handle_implicit_array(name, child)
# see #110 (https://github.com/d0c-s4vage/pfp/issues/110)
# during parsing, duplicate (implicit) arrays should always
# reference the last parsed variable of ``name``. However, if the
# variable ``name`` is indexed, then the nth duplicate array item
# should be returned.
#
# E.g.
#
# int x;
# int x;
# int x;
# Printf("%d\n", x); // prints the latest x value
# Printf("%d\n", x[0]); // prints the first x value
#
self._pfp__implicit_arrays[name] = implicit_array
wrapper = self._pfp__children_map[name] = ImplicitArrayWrapper(child, implicit_array)
res = wrapper
else:
child._pfp__parent = self
self._pfp__children.append(child)
child._pfp__name = name
self._pfp__children_map[name] = child
res = child
if len(self._pfp__children) > 1:
res._pfp__prev_sibling = self._pfp__children[-2]
self._pfp__children[-2]._pfp__next_sibling = res
return res
def _pfp__handle_non_consecutive_duplicate(self, name, child, insert=True):
"""This new child, and potentially one already existing child, need to
have a numeric suffix appended to their name.
An entry will be made for this name in ``self._pfp__name_collisions`` to keep
track of the next available suffix number"""
if name in self._pfp__children_map:
previous_child = self._pfp__children_map[name]
# DO NOT cause __eq__ to be called, we want to test actual objects, not comparison
# operators
if previous_child is not child:
self._pfp__handle_non_consecutive_duplicate(
name, previous_child, insert=False
)
del self._pfp__children_map[name]
next_suffix = self._pfp__name_collisions.setdefault(name, 0)
new_name = "{}_{}".format(name, next_suffix)
child._pfp__name = new_name
self._pfp__name_collisions[name] = next_suffix + 1
self._pfp__children_map[new_name] = child
child._pfp__parent = self
if insert:
self._pfp__children.append(child)
return child
def _pfp__is_non_consecutive_duplicate(self, name, child):
"""Return True/False if the child is a non-consecutive duplicately named
field. Consecutive duplicately-named fields are stored in an implicit array,
non-consecutive duplicately named fields have a numeric suffix appended to their name"""
if len(self._pfp__children) == 0:
return False
# it should be an implicit array
if self._pfp__children[-1]._pfp__name == name:
return False
# if it's elsewhere in the children name map OR a collision sequence has already been
# started for this name, it should have a numeric suffix
# appended
elif (
name in self._pfp__children_map
or name in self._pfp__name_collisions
):
return True
# else, no collision
return False
def _pfp__handle_implicit_array(self, name, child):
"""Handle inserting implicit array elements
"""
existing_child = self._pfp__children_map[name]
existing_implicit_array = self._pfp__implicit_arrays.get(name, None)
if isinstance(existing_implicit_array, Array):
# I don't think we should check this
#
# if existing_child.field_cls != child.__class__:
# raise errors.PfpError("implicit arrays must be sequential!")
existing_implicit_array.append(child)
return existing_implicit_array
else:
cls = (
child._pfp__class
if hasattr(child, "_pfp__class")
else child.__class__
)
ary = Array(0, cls)
# since the array starts with the first item
ary._pfp__offset = existing_child._pfp__offset
ary._pfp__parent = self
ary._pfp__name = name
ary.implicit = True
ary.append(existing_child)
ary.append(child)
exist_idx = -1
for idx, child in enumerate(self._pfp__children):
if child is existing_child:
exist_idx = idx
break
self._pfp__children[exist_idx] = ary
self._pfp__children_map[name] = ary
return ary
def _pfp__parse(self, stream, save_offset=False):
"""Parse the incoming stream
:stream: Input stream to be parsed
:returns: Number of bytes parsed
"""
if save_offset:
self._pfp__offset = stream.tell()
res = 0
for child in self._pfp__children:
res += child._pfp__parse(stream, save_offset)
return res
def _pfp__build(self, stream=None, save_offset=False):
"""Build the field and write the result into the stream
:stream: An IO stream that can be written to
:returns: None
"""
if save_offset and stream is not None:
self._pfp__offset = stream.tell()
# returns either num bytes written or total data
res = utils.binary("") if stream is None else 0
# iterate IN ORDER
for child in self._pfp__children:
child_res = child._pfp__build(stream, save_offset)
res += child_res
return res
def __getattr__(self, name):
"""Custom __getattr__ for quick access to the children"""
children_map = super(Struct, self).__getattribute__(
"_pfp__children_map"
)
if name in children_map:
return children_map[name]
else:
res = self._pfp__scope.get_var(name, recurse=False)
if res is not None:
return res
# default getattr instead
return super(Struct, self).__getattribute__(name)
def __setattr__(self, name, value):
"""Custom __setattr__ for quick setting of children values
If value is not an instance of ``Field``, assume it is the
value for the field and that the field itself should not
be overridden"""
children_map = super(Struct, self).__getattribute__(
"_pfp__children_map"
)
if name in children_map:
if not isinstance(value, Field):
children_map[name]._pfp__set_value(value)
else:
children_map[name] = value
self._pfp__notify_parent()
return children_map[name]
else:
# default getattr instead
return super(Struct, self).__setattr__(name, value)
def __repr__(self):
return object.__repr__(self)
def __eq__(self, other):
return self is other
def _pfp__show(self, level=0, include_offset=False):
"""Show the contents of the struct
"""
res = []
res.append(
"{}{} {{".format(
"{:04x} ".format(self._pfp__offset) if include_offset else "",
self._pfp__show_name,
)
)
for child in self._pfp__children:
res.append(
"{}{}{:10s} = {}".format(
" " * (level + 1),
"{:04x} ".format(child._pfp__offset)
if include_offset
else "",
child._pfp__name,
child._pfp__show(level + 1, include_offset),
)
)
res.append("{}}}".format(" " * level))
return "\n".join(res)
def __iter__(self):
"""Iterate over this struct's children
"""
return self._pfp__children.__iter__()
[docs]@inherit_hash
class Union(Struct):
"""A union field, where each member is an alternate
view of the data"""
_pfp__buff = None
_pfp__size = 0
_pfp__show_name = "union"
def __init__(self, stream=None, metadata_processor=None):
"""Init the union and its buff stream
"""
super(Union, self).__init__(metadata_processor=metadata_processor)
if stream is not None:
self._pfp__offset = stream.tell()
self._pfp__buff = six.BytesIO()
def _pfp__add_child(self, name, child, stream=None):
"""Add a child to the Union field
:name: The name of the child
:child: A :class:`.Field` instance
:returns: The resulting field
"""
res = super(Union, self)._pfp__add_child(name, child)
self._pfp__buff.seek(0, 0)
child._pfp__build(stream=self._pfp__buff)
size = len(self._pfp__buff.getvalue())
self._pfp__buff.seek(0, 0)
if stream is not None:
curr_pos = stream.tell()
stream.seek(curr_pos - size, 0)
return res
def _pfp__notify_update(self, child=None):
"""Handle a child with an updated value
"""
if getattr(self, "_pfp__union_update_other_children", True):
self._pfp__union_update_other_children = False
new_data = child._pfp__build()
new_stream = bitwrap.BitwrappedStream(six.BytesIO(new_data))
for other_child in self._pfp__children:
if other_child is child:
continue
if (
isinstance(other_child, Array)
and other_child.is_stringable()
):
other_child._pfp__set_value(new_data)
else:
other_child._pfp__parse(new_stream)
new_stream.seek(0)
self._pfp__no_update_other_children = True
super(Union, self)._pfp__notify_update(child=child)
def _pfp__parse(self, stream, save_offset=False):
"""Parse the incoming stream
:stream: Input stream to be parsed
:returns: Number of bytes parsed
"""
if save_offset:
self._pfp__offset = stream.tell()
max_res = 0
for child in self._pfp__children:
child_res = child._pfp__parse(stream, save_offset)
if child_res > max_res:
max_res = child_res
# rewind the stream
stream.seek(child_res, -1)
self._pfp__size = max_res
self._pfp__buff = six.BytesIO(stream.read(self._pfp__size))
return max_res
def _pfp__build(self, stream=None, save_offset=False):
"""Build the union and write the result into the stream.
:stream: None
:returns: None
"""
max_size = -1
if stream is None:
core_stream = six.BytesIO()
new_stream = bitwrap.BitwrappedStream(core_stream)
else:
new_stream = stream
for child in self._pfp__children:
curr_pos = new_stream.tell()
child._pfp__build(new_stream, save_offset)
size = new_stream.tell() - curr_pos
new_stream.seek(-size, 1)
if size > max_size:
max_size = size
new_stream.seek(max_size, 1)
if stream is None:
return core_stream.getvalue()
else:
return max_size
def __setattr__(self, name, value):
"""Custom __setattr__ to keep track of the order things
are writen (to mimic writing to memory)
"""
res = super(Union, self).__setattr__(name, value)
children_map = super(Struct, self).__getattribute__(
"_pfp__children_map"
)
if name in children_map:
field = getattr(self, name)
# back to the start of the buffer
self._pfp__buff.seek(0, 0)
field._pfp__build(stream=self._pfp__buff)
return res
[docs]@inherit_hash
class Dom(Struct):
"""The main container struct for a template"""
def __init__(self, *args, **kwargs):
super(self.__class__, self).__init__(*args, **kwargs)
# see keep_successful notes on pfp.parse and pfp.interp.PfpInterp.parse
self._pfp__error = None
self._pfp__types = None
def __getattr__(self, attr_name):
"""Custom getattr for Dom class so types can also be
accessed"""
if self._pfp__types is not None and hasattr(
self._pfp__types, attr_name
):
return getattr(self._pfp__types, attr_name)
else:
return super(self.__class__, self).__getattr__(attr_name)
"""The result of an interpreted template"""
def _pfp__build(self, stream=None, save_offset=False):
if stream is None:
io_stream = six.BytesIO()
tmp_stream = bitwrap.BitwrappedStream(io_stream)
tmp_stream.padded = self._pfp__interp.get_bitfield_padded()
super(Dom, self)._pfp__build(tmp_stream, save_offset=save_offset)
# flush out any unaligned bitfields, etc
tmp_stream.flush()
res = io_stream.getvalue()
return res
else:
if not isinstance(stream, bitwrap.BitwrappedStream):
stream = bitwrap.BitwrappedStream(stream)
return super(Dom, self)._pfp__build(
stream, save_offset=save_offset
)
[docs]@inherit_hash
class NumberBase(Field):
"""The base field for all numeric fields"""
# can be set on individual fields, for all numbers (NumberBase.endian = ...),
# or specific number classes (Int.endian = ...)
endian = (
LITTLE_ENDIAN
) # default endianness is LITTLE_ENDIAN apparently..?? wtf
width = 4 # number of bytes
format = "i" # default signed int
bitsize = None # for IntBase
_pfp__value = 0 # default value
@classmethod
def _pfp__width(self):
"""Return the width of the current atomic type
"""
return self.width
def __init__(
self,
stream=None,
bitsize=None,
metadata_processor=None,
bitfield_rw=None,
bitfield_padded=False,
bitfield_left_right=False,
):
"""Special init for the bitsize
"""
self.bitsize = get_value(bitsize)
self.bitfield_rw = bitfield_rw
# fields need to remember if they were parsed with padded bits or not
self.bitfield_padded = bitfield_padded
self.bitfield_left_right = bitfield_left_right
super(NumberBase, self).__init__(
stream, metadata_processor=metadata_processor
)
def __nonzero__(self):
"""Used for the not operator"""
return self._pfp__value != 0
def __bool__(self):
"""Used for the not operator"""
return self._pfp__value != 0
def _pfp__parse(self, stream, save_offset=False, set_val=True):
"""Parse the IO stream for this numeric field
:stream: An IO stream that can be read from
:returns: The number of bytes parsed
"""
if save_offset:
self._pfp__offset = stream.tell()
if self.bitsize is None:
raw_data = stream.read(self.width)
data = utils.binary(raw_data)
else:
self._pfp__offset = self.bitfield_rw.tell(stream)
self._pfp__offset_bits = self.bitfield_rw.tell_bits()
bits = self.bitfield_rw.read_bits(
stream,
self.bitsize,
self.bitfield_padded,
self.bitfield_left_right,
self.endian,
)
width_diff = self.width - (len(bits) // 8) - 1
bits_diff = 8 - (len(bits) % 8)
padding = [0] * (width_diff * 8 + bits_diff)
bits = padding + bits
data = bitwrap.bits_to_bytes(bits)
if self.endian == LITTLE_ENDIAN:
# reverse the data
data = data[::-1]
if len(data) < self.width:
raise errors.PrematureEOF()
val = struct.unpack(
"{}{}".format(self.endian, self.format), data
)[0]
if set_val:
self._pfp__data = data
self._pfp__value = val
return self.width
else:
return val
def _pfp__build(
self, stream=None, save_offset=False, ignore_bitfields=False,
):
"""Build the field and write the result into the stream
:stream: An IO stream that can be written to
:returns: None
"""
if stream is not None and save_offset:
self._pfp__offset = stream.tell()
if ignore_bitfields or self.bitsize is None:
data = struct.pack(
"{}{}".format(self.endian, self.format), self._pfp__value
)
if stream is not None:
stream.write(data)
return len(data)
else:
return data
else:
data = struct.pack(
"{}{}".format(BIG_ENDIAN, self.format), self._pfp__value
)
num_bytes = int(math.ceil(self.bitsize / 8.0))
bit_data = data[-num_bytes:]
raw_bits = bitwrap.bytes_to_bits(bit_data)
bits = raw_bits[-self.bitsize :]
if stream is not None:
self.bitfield_rw.write_bits(
stream,
bits,
self.bitfield_padded,
self.bitfield_left_right,
self.endian,
)
return len(bits) // 8
else:
# TODO this can't be right....
return bits
def _dom_class(self, obj1, obj2):
"""Return the dominating numeric class between the two
:obj1: TODO
:obj2: TODO
:returns: TODO
"""
if isinstance(obj1, Double) or isinstance(obj2, Double):
return Double
if isinstance(obj1, Float) or isinstance(obj2, Float):
return Float
def __iadd__(self, other):
root = get_value(other)
self._pfp__set_value(self._pfp__value + root)
return self
def __isub__(self, other):
root = get_value(other)
self._pfp__set_value(self._pfp__value - root)
return self
def __imul__(self, other):
self._pfp__value *= get_value(other)
return self
def __idiv__(self, other):
self._pfp__value /= get_value(other)
return self
def __iand__(self, other):
self._pfp__value &= get_value(other)
return self
def __ixor__(self, other):
self._pfp__value ^= get_value(other)
return self
def __ior__(self, other):
self._pfp__value |= get_value(other)
return self
def __ifloordiv__(self, other):
self._pfp__value //= get_value(other)
return self
def __imod__(self, other):
self._pfp__value %= get_value(other)
return self
def __ipow__(self, other):
self._pfp__value **= get_value(other)
return self
def __ilshift__(self, other):
self._pfp__value <<= get_value(other)
return self
def __irshift__(self, other):
self._pfp__value >>= get_value(other)
return self
def __add__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value + get_value(other)
)
return res
def __sub__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value - get_value(other)
)
return res
def __mul__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value * get_value(other)
)
return res
def __truediv__(self, other):
res = self.__class__()
# if truediv is being called, then / should also behave like
# truediv (2/3 == 0.6666 instead of 0 [classic division])
# the default in python 3 is truediv
res._pfp__set_value(
self._pfp__value / get_value(other)
)
return res
def __div__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value / get_value(other)
)
return res
def __and__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value & get_value(other)
)
return res
def __xor__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value ^ get_value(other)
)
return res
def __or__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value | get_value(other)
)
return res
def __floordiv__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value // get_value(other)
)
return res
def __mod__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value % get_value(other)
)
return res
def __pow__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value ** get_value(other)
)
return res
def __lshift__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value << get_value(other)
)
return res
def __rshift__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value >> get_value(other)
)
return res
def __invert__(self):
res = self.__class__()
res._pfp__set_value(~self._pfp__value)
return res
def __neg__(self):
res = self.__class__()
res._pfp__set_value(-self._pfp__value)
return res
def __getattr__(self, val):
if val.startswith("__") and attr.endswith("__"):
return getattr(self._pfp__value, val)
raise AttributeError(val)
[docs]@inherit_hash
class IntBase(NumberBase):
"""The base class for all integers"""
signed = True
def _pfp__maybe_promote(self, val1, val2):
"""Determine if val1 or val2 is larger - if one is larger, then the
smaller will be promoted to the larger size. If val1 and val2 are the
same size, val2 will be promoted to val1.
:returns: tuple of new (maybe_promoted_val1, maybe_promoted_val2)
"""
w_val1 = get_width(val1)
w_val2 = get_width(val2)
if w_val1 > w_val2:
res = (val1, val1._pfp__promote(val2))
elif w_val2 > w_val1:
res = (val2._pfp__promote(val1), val2)
else:
res = (val1, val1._pfp__promote(val2))
return (get_value(res[0]), get_value(res[1]))
# from https://wiki.sei.cmu.edu/confluence/display/c/INT02-C.%2BUnderstand%2Binteger%2Bconversion%2Brules
#
# Integer Conversion Rank
#
# Every integer type has an integer conversion rank that determines how conversions are performed. The ranking is based on the concept that each integer type contains at least as many bits as the types ranked below it. The following rules for determining integer conversion rank are defined in the C Standard, subclause 6.3.1.1 [ISO/IEC 9899:2011]:
#
# No two signed integer types shall have the same rank, even if they have the same representation.
# The rank of a signed integer type shall be greater than the rank of any signed integer type with less precision.
# The rank of long long int shall be greater than the rank of long int, which shall be greater than the rank of int, which shall be greater than the rank of short int, which shall be greater than the rank of signed char.
# The rank of any unsigned integer type shall equal the rank of the corresponding signed integer type, if any.
# The rank of any standard integer type shall be greater than the rank of any extended integer type with the same width.
# The rank of char shall equal the rank of signed char and unsigned char.
# The rank of _Bool shall be less than the rank of all other standard integer types.
# The rank of any enumerated type shall equal the rank of the compatible integer type.
# The rank of any extended signed integer type relative to another extended signed integer type with the same precision is implementation-defined but still subject to the other rules for determining the integer conversion rank.
# For all integer types T1, T2, and T3, if T1 has greater rank than T2 and T2 has greater rank than T3, then T1 has greater rank than T3.
# The integer conversion rank is used in the usual arithmetic conversions to determine what conversions need to take place to support an operation on mixed integer types.
#
# Usual Arithmetic Conversions
#
# The usual arithmetic conversions are rules that provide a mechanism to yield a common type when both operands of a binary operator are balanced to a common type or the second and third operands of the conditional operator ( ? : ) are balanced to a common type. Conversions involve two operands of different types, and one or both operands may be converted. Many operators that accept arithmetic operands perform conversions using the usual arithmetic conversions. After integer promotions are performed on both operands, the following rules are applied to the promoted operands:
#
# If both operands have the same type, no further conversion is needed.
# If both operands are of the same integer type (signed or unsigned), the operand with the type of lesser integer conversion rank is converted to the type of the operand with greater rank.
# If the operand that has unsigned integer type has rank greater than or equal to the rank of the type of the other operand, the operand with signed integer type is converted to the type of the operand with unsigned integer type.
# If the type of the operand with signed integer type can represent all of the values of the type of the operand with unsigned integer type, the operand with unsigned integer type is converted to the type of the operand with signed integer type.
# Otherwise, both operands are converted to the unsigned integer type corresponding to the type of the operand with signed integer type.
def _pfp__promote(self, val):
"""Promote the provided value to the current class
"""
if isinstance(val, IntBase):
# will automatically convert correctly between ints of
# different sizes, unsigned/signed, etc
raw = val._pfp__build(ignore_bitfields=True)
while len(raw) < self.width:
if self.endian == BIG_ENDIAN:
raw = b"\x00" + raw
else:
raw += b"\x00"
while len(raw) > self.width:
if self.endian == BIG_ENDIAN:
raw = raw[1:]
else:
raw = raw[:-1]
val = self._pfp__parse(
six.BytesIO(raw),
save_offset=False,
set_val=False,
)
else:
mask = 1 << (8 * self.width)
if self.signed:
max_val = (mask // 2) - 1
min_val = -(mask // 2)
else:
max_val = mask - 1
min_val = 0
if val < min_val:
val += -(min_val)
val &= mask - 1
val -= -(min_val)
elif val > max_val:
val &= mask - 1
return val
def _pfp__set_value(self, new_val):
"""Set the value, potentially converting an unsigned
value to a signed one (and visa versa)"""
if self._pfp__frozen:
raise errors.UnmodifiableConst()
promoted = self._pfp__promote(new_val)
self._pfp__value = promoted
return self._pfp__notify_parent()
def __repr__(self):
f = ":0{}x".format(self.width * 2)
return ("{}({!r} [{" + f + "}]){}").format(
self._pfp__cls_name(),
self._pfp__value,
self._pfp__value,
":{}".format(self.bitsize) if self.bitsize is not None else "",
)
def _pfp__cls_name(self):
"""
"""
return self.__class__.__name__
def __truediv__(self, other):
"""dividing ints should not return a float (python 3
true div behavior). So force floordiv
"""
return self // other
def __cmp__(self, other):
"""Compare the Field to something else, either another
Field or something else
:other: Another Field instance or something else
:returns: result of cmp()
"""
cmp_self, cmp_other = self._pfp__maybe_promote(self, other)
return cmp(cmp_self, cmp_other)
def __lt__(self, other):
"""Compare the Field to something else, either another
Field or something else
:other: The other field
:returns: True if equal
"""
cmp_self, cmp_other = self._pfp__maybe_promote(self, other)
return cmp_self < cmp_other
def __le__(self, other):
cmp_self, cmp_other = self._pfp__maybe_promote(self, other)
return cmp_self <= cmp_other
def __gt__(self, other):
"""Compare the Field to something else, either another
Field or something else
:other: The other field
:returns: True if equal
"""
cmp_self, cmp_other = self._pfp__maybe_promote(self, other)
return cmp_self > cmp_other
def __ge__(self, other):
cmp_self, cmp_other = self._pfp__maybe_promote(self, other)
return cmp_self >= cmp_other
def __ne__(self, other):
cmp_self, cmp_other = self._pfp__maybe_promote(self, other)
return cmp_self != cmp_other
def __eq__(self, other):
"""See if the two items are equal (True/False)
:other:
:returns:
"""
cmp_self, cmp_other = self._pfp__maybe_promote(self, other)
return cmp_self == cmp_other
def __iadd__(self, other):
other = self._pfp__promote(other)
root = get_value(other)
self._pfp__set_value(self._pfp__value + root)
return self
def __isub__(self, other):
other = self._pfp__promote(other)
root = get_value(other)
self._pfp__set_value(self._pfp__value - root)
return self
def __imul__(self, other):
other = self._pfp__promote(other)
root = get_value(other)
self._pfp__set_value(self._pfp__value * root)
return self
def __idiv__(self, other):
other = self._pfp__promote(other)
root = get_value(other)
self._pfp__set_value(int(self._pfp__value / root))
return self
def __iand__(self, other):
other = self._pfp__promote(other)
root = get_value(other)
self._pfp__set_value(self._pfp__value & root)
return self
def __ixor__(self, other):
other = self._pfp__promote(other)
root = get_value(other)
self._pfp__set_value(self._pfp__value ^ root)
return self
def __ior__(self, other):
other = self._pfp__promote(other)
root = get_value(other)
self._pfp__set_value(self._pfp__value | root)
return self
def __ifloordiv__(self, other):
# will always be floordiv with IntBase numbers
return self.__idiv__(self, other)
def __imod__(self, other):
# NOTE: We are not doing the same style of integer promotion here.
# this current implementation matches the C implementation.
#
# See the test_imod function in test_integer_promotion.py
self._pfp__value %= get_value(other)
return self
def __ipow__(self, other):
self._pfp__value **= get_value(other)
return self
def __ilshift__(self, other):
self._pfp__value <<= get_value(other)
return self
def __irshift__(self, other):
self._pfp__value >>= get_value(other)
return self
def __add__(self, other):
res = self.__class__()
res._pfp__set_value(self)
# takes care of promotion already
res += other
return res
def __sub__(self, other):
res = self.__class__()
res._pfp__set_value(self)
# takes care of promotion already
res -= other
return res
def __mul__(self, other):
res = self.__class__()
res._pfp__set_value(self)
# takes care of promotion already
res *= other
return res
def __truediv__(self, other):
# no truediv for pfp fields - only classic division
return self.__div__(other)
def __div__(self, other):
res = self.__class__()
res._pfp__set_value(self)
# takes care of promotion already
res.__idiv__(other)
return res
def __and__(self, other):
res = self.__class__()
res._pfp__set_value(self)
# takes care of promotion already
res &= other
return res
def __xor__(self, other):
res = self.__class__()
res._pfp__set_value(self)
# takes care of promotion already
res ^= other
return res
def __or__(self, other):
res = self.__class__()
res._pfp__set_value(self)
# takes care of promotion already
res |= other
return res
def __floordiv__(self, other):
return self.__div__(self, other)
def __mod__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value % get_value(other)
)
return res
def __pow__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value ** get_value(other)
)
return res
def __lshift__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value << get_value(other)
)
return res
def __rshift__(self, other):
res = self.__class__()
res._pfp__set_value(
self._pfp__value >> get_value(other)
)
return res
def __invert__(self):
res = self.__class__()
res._pfp__set_value(~self._pfp__value)
return res
def __neg__(self):
res = self.__class__()
res._pfp__set_value(-self._pfp__value)
return res
[docs]@inherit_hash
class Char(IntBase):
"""A field representing a signed char"""
width = 1
format = "b"
[docs]@inherit_hash
class UChar(Char):
"""A field representing an unsigned char"""
format = "B"
signed = False
[docs]@inherit_hash
class Short(IntBase):
"""A field representing a signed short"""
width = 2
format = "h"
[docs]@inherit_hash
class UShort(Short):
"""A field representing an unsigned short"""
format = "H"
signed = False
[docs]@inherit_hash
class WChar(Short):
"""A field representing a signed wchar (aka short)"""
pass
[docs]@inherit_hash
class WUChar(UShort):
"""A field representing an unsigned wuchar (aka ushort)"""
signed = False
[docs]@inherit_hash
class Int(IntBase):
"""A field representing a signed int"""
width = 4
format = "i"
[docs]@inherit_hash
class UInt(Int):
"""A field representing an unsigned int"""
format = "I"
signed = False
[docs]@inherit_hash
class Int64(IntBase):
"""A field representing a signed int64"""
width = 8
format = "q"
[docs]@inherit_hash
class UInt64(Int64):
"""A field representing an unsigned int64"""
format = "Q"
signed = False
[docs]@inherit_hash
class Float(NumberBase):
"""A field representing a float"""
width = 4
format = "f"
[docs]@inherit_hash
class Double(NumberBase):
"""A field representing a double"""
width = 8
format = "d"
[docs]@inherit_hash
class Enum(IntBase):
"""The enum field class"""
enum_vals = None
enum_cls = None
enum_name = None
def __init__(
self,
stream=None,
enum_cls=None,
enum_vals=None,
bitsize=None,
metadata_processor=None,
bitfield_rw=None,
bitfield_padded=False,
bitfield_left_right=False,
):
"""Init the enum
"""
self.enum_name = None
if enum_vals is not None:
self.enum_vals = enum_vals
if enum_cls is not None:
self.enum_cls = enum_cls
self.endian = enum_cls.endian
self.width = enum_cls.width
self.format = enum_cls.format
super(Enum, self).__init__(
stream,
metadata_processor=metadata_processor,
bitfield_rw=bitfield_rw,
bitsize=bitsize,
bitfield_padded=bitfield_padded,
bitfield_left_right=bitfield_left_right,
)
def _pfp__parse(self, stream, save_offset=False, set_val=True):
"""Parse the IO stream for this enum
:stream: An IO stream that can be read from
:returns: The number of bytes parsed
"""
res = super(Enum, self)._pfp__parse(stream, save_offset, set_val=set_val)
if self._pfp__value in self.enum_vals:
self.enum_name = self.enum_vals[self._pfp__value]
else:
self.enum_name = "?? UNK_ENUM ??"
return res
def __repr__(self):
"""Add the enum name to the int representation
"""
res = super(Enum, self).__repr__()
res += "({})".format(self.enum_name)
return res
def _pfp__cls_name(self):
"""
"""
return "Enum<{}>".format(self.enum_cls.__name__)
# --------------------------------
[docs]@inherit_hash
class Array(Field):
"""The array field"""
width = -1
"""The number of items of the array. ``len(array_field)`` also works"""
raw_data = None
"""The raw data of the array. Note that this will only be
set if the array's items are a core type (E.g. Int, Char, etc)"""
field_cls = None
"""The class for items in the array"""
implicit = False
"""If the array is an implicit array or not"""
def __init__(self, width, field_cls, stream=None, metadata_processor=None):
""" Create an array field of size "width" from the stream
"""
super(Array, self).__init__(
stream=None, metadata_processor=metadata_processor
)
self.width = width
self.field_cls = field_cls
self.items = []
self.raw_data = None
self.implicit = False
self._pfp__snapshot_raw_stack = []
if stream is not None:
self._pfp__parse(stream, save_offset=True)
else:
if width is not None:
for x in six.moves.range(self.width):
self.items.append(self.field_cls())
def _pfp__snapshot(self, recurse=True):
"""Save off the current value of the field
"""
super(Array, self)._pfp__snapshot(recurse=recurse)
self._pfp__snapshot_raw_stack.append(self.raw_data)
if recurse:
for item in self.items:
item._pfp__snapshot(recurse=recurse)
def _pfp__restore_snapshot(self, recurse=True):
"""Restore the snapshotted value without triggering any events
"""
super(Array, self)._pfp__restore_snapshot(recurse=recurse)
self.raw_data = self._pfp__snapshot_raw_stack.pop()
if recurse:
for item in self.items:
item._pfp__restore_snapshot(recurse=recurse)
def append(self, item):
# TODO check for consistent type
item._pfp__parent = self
self.items.append(item)
self.width = len(self.items)
def is_stringable(self):
# TODO WChar
return self.field_cls in [Char, UChar]
def _array_to_str(self, max_len=-1):
if not self.is_stringable():
return None
if self.raw_data is not None:
if max_len != -1:
return PYSTR(self.raw_data)[:max_len]
return self.raw_data
res = ""
for item in self.items:
if max_len != -1 and len(res) >= max_len:
break
# null-terminate string
if PYVAL(item) == 0:
break
# TODO WChar
res += chr(PYVAL(item))
return res
def __eq__(self, other):
if self.is_stringable() and other.__class__ in [String, WString, str, bytes]:
res = self._array_to_str()
return utils.binary(res) == utils.binary(PYSTR(other))
else:
raise Exception("TODO")
def __ne__(self, other):
return not self.__eq__(other)
def _pfp__set_value(self, value):
is_string_type = False
if isinstance(value, String):
is_string_type = True
value = value._pfp__build()
else:
for string_type in list(six.string_types) + [bytes]:
if isinstance(value, string_type):
is_string_type = True
break
if is_string_type and self.is_stringable():
self.raw_data = value
self.width = len(value)
return self._pfp__notify_parent()
if value.__class__ not in [list, tuple]:
raise Exception("Error, invalid value for array")
# this shouldn't be enforced
# e.g.
# local uchar[16] = {0};
# where the first item should be 0
#
# if len(value) != PYVAL(self.width):
# raise Exception("Array was declared as having {} items, not {} items".format(
# PYVAL(self.width),
# len(value)
# ))
if len(value) > len(self.items):
self.items.extend([None] * (len(value) - len(self.items)))
for idx, item in enumerate(value):
if not isinstance(item, Field):
new_item = self.field_cls()
new_item._pfp__set_value(item)
item = new_item
self.items[idx] = item
self.width = len(self.items)
# see #54 - make sure raw_data is set to None if overwriting with
# a new array/list/set/tuple
self.raw_data = None
return self._pfp__notify_parent()
def _pfp__parse(self, stream, save_offset=False):
start_offset = stream.tell()
if save_offset:
self._pfp__offset = start_offset
if self.width is None:
return
# will always be known widths for these field types
if issubclass(self.field_cls, NumberBase):
length = self.field_cls.width * PYVAL(self.width)
self.raw_data = stream.read(length)
if self._pfp__can_unpack():
self._pfp__unpack_data(self.raw_data)
else:
# optimizations... should reuse existing fields??
self.items = []
for x in six.moves.range(PYVAL(self.width)):
field = self.field_cls(stream)
field._pfp__name = "{}[{}]".format(self._pfp__name, x)
# field._pfp__parse(stream, save_offset)
self.items.append(field)
if self._pfp__can_unpack():
curr_offset = stream.tell()
stream.seek(start_offset, 0)
data = stream.read(curr_offset - start_offset)
# this shouldn't be necessary....
# stream.seek(curr_offset, 0)
self._pfp__unpack_data(data)
def _pfp__build(self, stream=None, save_offset=False):
if stream is not None and save_offset:
self._pfp__offset = stream.tell()
if self.raw_data is None:
res = 0 if stream is not None else utils.binary("")
for item in self.items:
res += item._pfp__build(stream=stream, save_offset=save_offset)
else:
if stream is None:
return self.raw_data
else:
stream.write(self.raw_data)
return len(self.raw_data)
return res
def _pfp__handle_updated(self, watched_field):
if (
self.raw_data is not None
and watched_field._pfp__name is not None
and watched_field._pfp__name.startswith(self._pfp__name)
and watched_field._pfp__array_idx is not None
):
data = watched_field._pfp__build()
offset = watched_field.width * watched_field._pfp__array_idx
self.raw_data = (
self.raw_data[0:offset]
+ data
+ self.raw_data[offset + len(data) :]
)
else:
super(Array, self)._pfp__handle_updated(watched_field)
def __getitem__(self, idx):
if self.raw_data is None:
return self.items[idx]
else:
if self.width < 0 or idx + 1 > self.width:
raise IndexError(idx)
width = self.field_cls.width
offset = width * idx
data = self.raw_data[offset : offset + width]
stream = bitwrap.BitwrappedStream(six.BytesIO(data))
res = self.field_cls(stream)
res._pfp__watch(self)
res._pfp__parent = self
res._pfp__array_idx = idx
res._pfp__name = "{}[{}]".format(self._pfp__name, idx)
return res
def __setitem__(self, idx, value):
if isinstance(value, Field):
if self.raw_data is None:
self.items[idx] = value
else:
if self.width < 0 or idx + 1 > self.width:
raise IndexError(idx)
data = value._pfp__build()
offset = self.field_cls.width * idx
self.raw_data = (
self.raw_data[0:offset]
+ data
+ self.raw_data[offset + self.field_cls.width :]
)
else:
self[idx]._pfp__set_value(value)
self._pfp__notify_update(self)
def __repr__(self):
other = ""
if self.is_stringable():
res = self._array_to_str(20)
other = " ({!r})".format(res)
return "{}[{}]{}".format(
self.field_cls.__name__
if type(self.field_cls) is type
else self.field_cls._typedef_name,
PYVAL(self.width),
other,
)
def _pfp__show(self, level=0, include_offset=False):
if self.is_stringable():
res = self.__repr__()
if self._ is not None:
packed_show = self._._pfp__show(
level=level + 1, include_offset=False
)
res += "\n" + (" " * (level + 1)) + "_ = " + packed_show
return res
res = [self.__repr__()]
for idx, item in enumerate(self.items):
item_res = "{}{}{}[{}] = {}".format(
" " * (level + 1),
"{:04x} ".format(item._pfp__offset) if include_offset else "",
self._pfp__name,
idx,
item._pfp__show(level + 2, include_offset),
)
res.append(item_res)
return "\n".join(res)
def __len__(self):
if self.raw_data is not None:
return int(len(self.raw_data) / self.field_cls.width)
else:
return len(self.items)
def __iter__(self):
"""Iterate over all items in this array
"""
return self.items.__iter__()
# http://www.sweetscape.com/010editor/manual/ArraysStrings.htm
[docs]@inherit_hash
class String(Field):
"""A null-terminated string. String fields should be interchangeable
with char arrays"""
# if the width is -1 when parse is called, read until null
# termination.
width = -1
read_size = 1
terminator = utils.binary("\x00")
def __init__(self, stream=None, metadata_processor=None):
self._pfp__value = utils.binary("")
super(String, self).__init__(
stream=stream, metadata_processor=metadata_processor
)
def _pfp__set_value(self, new_val):
"""Set the value of the String, taking into account
escaping and such as well
"""
if not isinstance(new_val, Field):
new_val = utils.binary(new_val)
return super(String, self)._pfp__set_value(new_val)
def _pfp__parse(self, stream, save_offset=False):
"""Read from the stream until the string is null-terminated
:stream: The input stream
:returns: None
"""
if save_offset:
self._pfp__offset = stream.tell()
res = utils.binary("")
while True:
byte = utils.binary(stream.read(self.read_size))
if len(byte) < self.read_size:
raise errors.PrematureEOF()
# note that the null terminator must be added back when
# built again!
if byte == self.terminator:
break
res += byte
self._pfp__value = res
def _pfp__build(self, stream=None, save_offset=False):
"""Build the String field
:stream: TODO
:returns: TODO
"""
if stream is not None and save_offset:
self._pfp__offset = stream.tell()
data = self._pfp__value + utils.binary("\x00")
if stream is None:
return data
else:
stream.write(data)
return len(data)
def __getitem__(self, idx):
if idx < 0 or idx + 1 > len(self._pfp__value):
raise IndexError(idx)
val = self._pfp__value[idx : idx + 1]
stream = six.BytesIO(val)
res = Char(stream)
return res
def __setitem__(self, idx, val):
if idx < 0 or idx + 1 > len(self._pfp__value):
raise IndexError(idx)
if isinstance(val, Field):
val = val._pfp__build()[-1:]
elif isinstance(val, int):
val = utils.binary(chr(val))
self._pfp__value = (
self._pfp__value[0:idx] + val + self._pfp__value[idx + 1 :]
)
def __add__(self, other):
"""Add two strings together. If other is not a String instance,
a fields.String instance will still be returned
:other: TODO
:returns: TODO
"""
res_field = String()
res = utils.binary("")
if isinstance(other, String):
res = self._pfp__value + other._pfp__value
else:
res = self._pfp__value + utils.binary(PYSTR(other))
res_field._pfp__set_value(res)
return res_field
def __iadd__(self, other):
"""In-place addition to this String field
:other: TODO
:returns: TODO
"""
if isinstance(other, String):
self._pfp__value += other._pfp__value
else:
self._pfp__value += PYSTR(other)
return self
def __len__(self):
return len(self._pfp__value)
[docs]@inherit_hash
class WString(String):
width = -1
read_size = 2
terminator = utils.binary("\x00\x00")
def _pfp__parse(self, stream, save_offset=False):
String._pfp__parse(self, stream, save_offset)
self._pfp__value = utils.binary(self._pfp__value.decode("utf-16le"))
def _pfp__build(self, stream=None, save_offset=False):
if stream is not None and save_offset:
self._pfp__offset = stream.tell()
val = (
self._pfp__value.decode("ISO-8859-1").encode("utf-16le")
+ b"\x00\x00"
)
if stream is None:
return val
else:
stream.write(val)
return len(val)