miasm.os_dep.linux.environment module
from __future__ import print_function from collections import namedtuple import functools import logging import os import re import struct import termios from future.utils import viewitems from miasm.core.interval import interval from miasm.jitter.csts import PAGE_READ, PAGE_WRITE REGEXP_T = type(re.compile('')) StatInfo = namedtuple("StatInfo", [ "st_dev", "st_ino", "st_nlink", "st_mode", "st_uid", "st_gid", "st_rdev", "st_size", "st_blksize", "st_blocks", "st_atime", "st_atimensec", "st_mtime", "st_mtimensec", "st_ctime", "st_ctimensec" ]) StatFSInfo = namedtuple("StatFSInfo", [ "f_type", "f_bsize", "f_blocks", "f_bfree", "f_bavail", "f_files", "f_ffree", "f_fsid", "f_namelen", "f_frsize", "f_flags", "f_spare", ]) log = logging.getLogger("environment") console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter("[%(levelname)-8s]: %(message)s")) log.addHandler(console_handler) log.setLevel(logging.WARNING) class FileDescriptor(object): """Stand for a file descriptor on a system According to inode(7), following types are possibles: - socket - symbolic link - regular file - block device - directory - character device - FIFO """ # st_mode's file type file_type = None # st_mode's file mode (9 least bits are file permission bits) file_mode = 0o0777 # st_dev / st_rdev cont_device_id = None device_id = 0 # inode number (st_ino) inode = None # Number of hardlink (st_nlink) nlink = 0 # Owner / group uid = None gid = None # Size (st_size / st_blksize / st_blocks) size = 0 blksize = 0 blocks = 0 # Times atime = 0 atimensec = 0 mtime = 0 mtimensec = 0 ctime = 0 ctimensec = 0 def __init__(self, number): self.number = number self.is_closed = False def stat(self): mode = self.file_type | self.file_mode return StatInfo( st_dev=self.cont_device_id, st_ino=self.inode, st_nlink=self.nlink, st_mode=mode, st_uid=self.uid, st_gid=self.gid, st_rdev=self.device_id, st_size=self.size, st_blksize=self.blksize, st_blocks=self.blocks, st_atime=self.atime, st_atimensec=self.atimensec, st_mtime=self.mtime, st_mtimensec=self.mtimensec, st_ctime=self.ctime, st_ctimensec=self.ctimensec ) def close(self): self.is_closed = True class FileDescriptorCharDevice(FileDescriptor): file_type = 0o0020000 # S_IFCHR file_mode = 0o0620 cont_device_id = 1 device_id = 1 class FileDescriptorSTDIN(FileDescriptorCharDevice): """Special file descriptor standinf for STDIN""" inode = 0 def read(self, count): raise RuntimeError("Not implemented") class FileDescriptorSTDOUT(FileDescriptorCharDevice): """Special file descriptor standinf for STDOUT""" inode = 1 def write(self, data): print("[STDOUT] %s" % data.rstrip()) class FileDescriptorSTDERR(FileDescriptorCharDevice): """Special file descriptor standinf for STDERR""" inode = 2 def write(self, data): print("[STDERR] %s" % data.rstrip()) class FileDescriptorDirectory(FileDescriptor): """FileDescription designing a directory""" file_type = 0o0040000 # S_IFDIR def __init__(self, number, flags, filesystem, real_path): super(FileDescriptorDirectory, self).__init__(number) self.filesystem = filesystem self.real_path = real_path self.cur_listdir = None self.flags = flags def listdir(self): if self.cur_listdir is None: self.cur_listdir = os.listdir(self.real_path) while self.cur_listdir: yield self.cur_listdir.pop() class FileDescriptorRegularFile(FileDescriptor): """FileDescriptor designing a regular file""" file_type = 0o0100000 # S_IFREG def __init__(self, number, flags, filesystem, real_fd): super(FileDescriptorRegularFile, self).__init__(number) self.flags = flags self.filesystem = filesystem self.real_fd = real_fd def write(self, data): raise RuntimeError("Not implemented") def read(self, count): return os.read(self.real_fd, count) def close(self): super(FileDescriptorRegularFile, self).close() return os.close(self.real_fd) def lseek(self, offset, whence): return os.lseek(self.real_fd, offset, whence) # SEEK_SET def tell(self): return self.lseek(0, 1) # SEEK_CUR def seek(self, offset): return self.lseek(offset, 0) # SEEK_SET class FileDescriptorSocket(FileDescriptor): """FileDescription standing for a socket""" file_type = 0o0140000 # S_IFSOCK def __init__(self, number, family, type_, protocol): super(FileDescriptorSocket, self).__init__(number) self.family = family self.type_ = type_ self.protocol = protocol class FileSystem(object): """File system abstraction Provides standard operations on the filesystem, (a bit like FUSE) API using FileSystem only used sandbox-side path. FileSystem should be the only object able to interact with real path, outside the sandbox. Thus, if `resolve_path` is correctly implemented and used, it should not be possible to modify files outside the sandboxed path """ device_id = 0x1234 # ID of device containing file (stat.st_dev) blocksize = 0x1000 # Size of block on this filesystem f_type = 0xef53 # (Type of filesystem) EXT4_SUPER_MAGIC nb_total_block = 0x1000 nb_free_block = 0x100 nb_avail_block = nb_free_block # Available to unprivileged user nb_total_fnode = 100 # Total file nodes in filesystem nb_free_fnode = 50 max_filename_len = 256 fragment_size = 0 mount_flags = 0 def __init__(self, base_path, linux_env): self.base_path = base_path self.linux_env = linux_env self.passthrough = [] self.path_to_inode = {} # Real path (post-resolution) -> inode number def resolve_path(self, path, follow_link=True): """Resolve @path to the corresponding sandboxed path""" # path_bytes is used for Python 2 / Python 3 compatibility path_bytes = not isinstance(path, str) path_sep = os.path.sep.encode() if path_bytes else os.path.sep if path_bytes: def _convert(subpath): if not isinstance(subpath, str): return subpath return subpath.encode() def _convert_re(expr): if isinstance(expr.pattern, str): try: return re.compile( expr.pattern.encode(), flags=expr.flags & ~re.UNICODE ) except UnicodeEncodeError: # Will never match log.warning( 'Cannot convert regexp to bytes %r %r', expr.pattern, expr.flags, exc_info=True, ) return re.compile(b'$X') return expr else: def _convert(subpath): if not isinstance(subpath, str): return subpath.decode() return subpath def _convert_re(expr): if not isinstance(expr.pattern, str): try: return re.compile( expr.pattern.decode(), flags=expr.flags & re.UNICODE ) except UnicodeDecodeError: # Will never match log.warning( 'Cannot convert regexp to str %r %r', expr.pattern, expr.flags, exc_info=True, ) return re.compile('$X') return expr # Remove '../', etc. path = os.path.normpath(path) # Passthrough for passthrough in self.passthrough: if isinstance(passthrough, REGEXP_T): if _convert_re(passthrough).match(path): return path elif _convert(passthrough) == path: return path # Remove leading '/' if any path = path.lstrip(path_sep) base_path = os.path.abspath(_convert(self.base_path)) out_path = os.path.join(base_path, path) assert out_path.startswith(base_path + path_sep) if os.path.islink(out_path): link_target = os.readlink(out_path) # Link can be absolute or relative -> absolute link = os.path.normpath(os.path.join(os.path.dirname(path), link_target)) if follow_link: out_path = self.resolve_path(link) else: out_path = link return out_path def get_path_inode(self, real_path): inode = self.path_to_inode.setdefault(real_path, len(self.path_to_inode)) return inode def exists(self, path): sb_path = self.resolve_path(path) return os.path.exists(sb_path) def readlink(self, path): sb_path = self.resolve_path(path, follow_link=False) if not os.path.islink(sb_path): return None return os.readlink(sb_path) def statfs(self): return StatFSInfo( f_type=self.f_type, f_bsize=self.blocksize, f_blocks=self.nb_total_block, f_bfree=self.nb_free_block, f_bavail=self.nb_avail_block, f_files=self.nb_total_fnode, f_ffree=self.nb_free_fnode, f_fsid=self.device_id, f_namelen=self.max_filename_len, f_frsize=self.fragment_size, f_flags=self.mount_flags, f_spare=0) def getattr_(self, path, follow_link=True): sb_path = self.resolve_path(path, follow_link=follow_link) flags = self.linux_env.O_RDONLY if os.path.isdir(sb_path): flags |= self.linux_env.O_DIRECTORY fd = self.open_(path, flags, follow_link=follow_link) info = self.linux_env.fstat(fd) self.linux_env.close(fd) return info def open_(self, path, flags, follow_link=True): path = self.resolve_path(path, follow_link=follow_link) if not os.path.exists(path): # ENOENT (No such file or directory) return -1 fd = self.linux_env.next_fd() acc_mode = flags & self.linux_env.O_ACCMODE if os.path.isdir(path): assert flags & self.linux_env.O_DIRECTORY == self.linux_env.O_DIRECTORY if acc_mode == self.linux_env.O_RDONLY: fdesc = FileDescriptorDirectory(fd, flags, self, path) else: raise RuntimeError("Not implemented") elif os.path.isfile(path): if acc_mode == os.O_RDONLY: # Read only real_fd = os.open(path, os.O_RDONLY) else: raise RuntimeError("Not implemented") fdesc = FileDescriptorRegularFile(fd, flags, self, real_fd) elif os.path.islink(path): raise RuntimeError("Not implemented") else: raise RuntimeError("Unknown file type for %r" % path) self.linux_env.file_descriptors[fd] = fdesc # Set stat info fdesc.cont_device_id = self.device_id fdesc.inode = self.get_path_inode(path) fdesc.uid = self.linux_env.user_uid fdesc.gid = self.linux_env.user_gid size = os.path.getsize(path) fdesc.size = size fdesc.blksize = self.blocksize fdesc.blocks = (size + ((512 - (size % 512)) % 512)) // 512 return fd class Networking(object): """Network abstraction""" def __init__(self, linux_env): self.linux_env = linux_env def socket(self, family, type_, protocol): fd = self.linux_env.next_fd() fdesc = FileDescriptorSocket(fd, family, type_, protocol) self.linux_env.file_descriptors[fd] = fdesc return fd class LinuxEnvironment(object): """A LinuxEnvironment regroups information to simulate a Linux-like environment""" # To be overridden platform_arch = None # User information user_uid = 1000 user_euid = 1000 user_gid = 1000 user_egid = 1000 user_name = b"user" # Memory mapping information brk_current = 0x74000000 mmap_current = 0x75000000 # System information sys_sysname = b"Linux" sys_nodename = b"user-pc" sys_release = b"4.13.0-19-generic" sys_version = b"#22-Ubuntu" sys_machine = None # Filesystem filesystem_base = "file_sb" file_descriptors = None # Current process process_tid = 1000 process_pid = 1000 # Syscall restrictions ioctl_allowed = None # list of (fd, cmd), None value for wildcard ioctl_disallowed = None # list of (fd, cmd), None value for wildcard # Time base_time = 1531900000 # Arch specific constant O_ACCMODE = None O_CLOEXEC = None O_DIRECTORY = None O_LARGEFILE = None O_NONBLOCK = None O_RDONLY = None def __init__(self): stdin = FileDescriptorSTDIN(0) stdout = FileDescriptorSTDOUT(1) stderr = FileDescriptorSTDERR(2) for std in [stdin, stdout, stderr]: std.uid = self.user_uid std.gid = self.user_gid self.file_descriptors = { 0: stdin, 1: stdout, 2: stderr, } self.ioctl_allowed = [ (0, termios.TCGETS), (0, termios.TIOCGWINSZ), (0, termios.TIOCSWINSZ), (1, termios.TCGETS), (1, termios.TIOCGWINSZ), (1, termios.TIOCSWINSZ), ] self.ioctl_disallowed = [ (2, termios.TCGETS), (0, termios.TCSETSW), ] self.filesystem = FileSystem(self.filesystem_base, self) self.network = Networking(self) def next_fd(self): return len(self.file_descriptors) def clock_gettime(self): out = self.base_time self.base_time += 1 return out def open_(self, path, flags, follow_link=True): """Stub for 'open' syscall""" return self.filesystem.open_(path, flags, follow_link=follow_link) def socket(self, family, type_, protocol): """Stub for 'socket' syscall""" return self.network.socket(family, type_, protocol) def fstat(self, fd): """Get file status through fd""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.stat() def stat(self, path): """Get file status through path""" return self.filesystem.getattr_(path) def lstat(self, path): """Get file status through path (not following links)""" return self.filesystem.getattr_(path, follow_link=False) def close(self, fd): """Stub for 'close' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.close() def write(self, fd, data): """Stub for 'write' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None fdesc.write(data) return len(data) def read(self, fd, count): """Stub for 'read' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.read(count) def getdents(self, fd, count, packing_callback): """Stub for 'getdents' syscall 'getdents64' must be handled by caller (only the structure layout is modified) @fd: getdents' fd argument @count: getdents' count argument @packing_callback(cur_len, d_ino, d_type, name) -> entry """ fdesc = self.file_descriptors[fd] if not isinstance(fdesc, FileDescriptorDirectory): raise RuntimeError("Not implemented") out = b"" # fdesc.listdir continues from where it stopped for name in fdesc.listdir(): d_ino = 1 # Not the real one d_type = 0 # DT_UNKNOWN (getdents(2) "All applications must properly # handle a return of DT_UNKNOWN.") entry = packing_callback(len(out), d_ino, d_type, name) if len(out) + len(entry) > count: # Report to a further call fdesc.cur_listdir.append(name) break out = out + entry return out def ioctl(self, fd, cmd, arg): """Stub for 'ioctl' syscall Return the list of element to pack back depending on target ioctl If the ioctl is disallowed, return False """ allowed = False disallowed = False for test in [(fd, cmd), (None, cmd), (fd, None)]: if test in self.ioctl_allowed: allowed = True if test in self.ioctl_disallowed: disallowed = True if allowed and disallowed: raise ValueError("fd: %x, cmd: %x is allowed and disallowed" % (fd, cmd)) if allowed: if cmd == termios.TCGETS: return 0, 0, 0, 0 elif cmd == termios.TIOCGWINSZ: # struct winsize # { # unsigned short ws_row; /* rows, in characters */ # unsigned short ws_col; /* columns, in characters */ # unsigned short ws_xpixel; /* horizontal size, pixels */ # unsigned short ws_ypixel; /* vertical size, pixels */ # }; return 1000, 360, 1000, 1000 elif cmd == termios.TIOCSWINSZ: # Ignore it return else: raise RuntimeError("Not implemented") elif disallowed: return False else: raise KeyError("Unknown ioctl fd:%x cmd:%x" % (fd, cmd)) def mmap(self, addr, len_, prot, flags, fd, off, vmmngr): """Stub for 'mmap' syscall 'mmap2' must be implemented by calling this function with off * 4096 """ if addr == 0: addr = self.mmap_current self.mmap_current += (len_ + 0x1000) & ~0xfff all_mem = vmmngr.get_all_memory() mapped = interval( [ (start, start + info["size"] - 1) for start, info in viewitems(all_mem) ] ) MAP_FIXED = 0x10 if flags & MAP_FIXED: # Alloc missing and override missing = interval([(addr, addr + len_ - 1)]) - mapped for start, stop in missing: vmmngr.add_memory_page( start, PAGE_READ|PAGE_WRITE, b"\x00" * (stop - start + 1), "mmap allocated" ) else: # Find first candidate segment nearby addr for start, stop in mapped: if stop < addr: continue rounded = (stop + 1 + 0x1000) & ~0xfff if (interval([(rounded, rounded + len_)]) & mapped).empty: addr = rounded break else: assert (interval([(addr, addr + len_)]) & mapped).empty vmmngr.add_memory_page( addr, PAGE_READ|PAGE_WRITE, b"\x00" * len_, "mmap allocated" ) if fd == 0xffffffff: MAP_ANONYMOUS = 0x20 # mman.h # fd and offset are ignored if MAP_ANONYMOUS flag is present if not(flags & MAP_ANONYMOUS) and off != 0: raise RuntimeError("Not implemented") data = b"\x00" * len_ else: fdesc = self.file_descriptors[fd] cur_pos = fdesc.tell() fdesc.seek(off) data = fdesc.read(len_) fdesc.seek(cur_pos) vmmngr.set_mem(addr, data) return addr def brk(self, addr, vmmngr): """Stub for 'brk' syscall""" if addr == 0: addr = self.brk_current else: all_mem = vmmngr.get_all_memory() mapped = interval( [ (start, start + info["size"] - 1) for start, info in viewitems(all_mem) ] ) # Alloc missing and override missing = interval([(self.brk_current, addr)]) - mapped for start, stop in missing: vmmngr.add_memory_page( start, PAGE_READ|PAGE_WRITE, b"\x00" * (stop - start + 1), "BRK" ) self.brk_current = addr return addr class LinuxEnvironment_x86_32(LinuxEnvironment): platform_arch = b"x86_32" sys_machine = b"x86_32" # TODO FIXME ## O_ACCMODE = 0x3 ## O_CLOEXEC = 0x80000 ## O_DIRECTORY = 0x10000 ## O_LARGEFILE = 0x8000 ## O_NONBLOCK = 0x800 ## O_RDONLY = 0 class LinuxEnvironment_x86_64(LinuxEnvironment): platform_arch = b"x86_64" sys_machine = b"x86_64" O_ACCMODE = 0x3 O_CLOEXEC = 0x80000 O_DIRECTORY = 0x10000 O_LARGEFILE = 0x8000 O_NONBLOCK = 0x800 O_RDONLY = 0 class LinuxEnvironment_arml(LinuxEnvironment): platform_arch = b"arml" sys_machine = b"arml" O_ACCMODE = 0x3 O_CLOEXEC = 0x80000 O_DIRECTORY = 0x4000 O_LARGEFILE = 0x20000 O_NONBLOCK = 0x800 O_RDONLY = 0 # ARM specific tls = 0 # get_tls: __kuser_helper_version >= 1 # cmpxchg: __kuser_helper_version >= 2 # memory_barrier: __kuser_helper_version >= 3 kuser_helper_version = 3 class AuxVec(object): """Auxiliary vector abstraction, filled with default values (mainly based on https://lwn.net/Articles/519085) # Standard usage >>> auxv = AuxVec(elf_base_addr, cont_target.entry_point, linux_env) # Enable AT_SECURE >>> auxv = AuxVec(..., AuxVec.AT_SECURE=1) # Modify AT_RANDOM >>> auxv = AuxVec(..., AuxVec.AT_RANDOM="\x00"*0x10) # Using AuxVec instance for stack preparation # First, fill memory with vectors data >>> for AT_number, data in auxv.data_to_map(): dest_ptr = ... copy_to_dest(data, dest_ptr) auxv.ptrs[AT_number] = dest_ptr # Then, get the key: value (with value being sometime a pointer) >>> for auxid, auxval in auxv.iteritems(): ... """ AT_PHDR = 3 AT_PHNUM = 5 AT_PAGESZ = 6 AT_ENTRY = 9 AT_UID = 11 AT_EUID = 12 AT_GID = 13 AT_EGID = 14 AT_PLATFORM = 15 AT_HWCAP = 16 AT_SECURE = 23 AT_RANDOM = 25 AT_SYSINFO_EHDR = 33 def __init__(self, elf_phdr_vaddr, entry_point, linux_env, **kwargs): """Instantiate an AuxVec, with required elements: - elf_phdr_vaddr: virtual address of the ELF's PHDR in memory - entry_point: virtual address of the ELF entry point - linux_env: LinuxEnvironment instance, used to provides some of the option values Others options can be overridden by named arguments """ self.info = { self.AT_PHDR: elf_phdr_vaddr, self.AT_PHNUM: 9, self.AT_PAGESZ: 0x1000, self.AT_ENTRY: entry_point, self.AT_UID: linux_env.user_uid, self.AT_EUID: linux_env.user_euid, self.AT_GID: linux_env.user_gid, self.AT_EGID: linux_env.user_egid, self.AT_PLATFORM: linux_env.platform_arch, self.AT_HWCAP: 0, self.AT_SECURE: 0, self.AT_RANDOM: b"\x00" * 0x10, # vDSO is not mandatory self.AT_SYSINFO_EHDR: None, } self.info.update(kwargs) self.ptrs = {} # info key -> corresponding virtual address def data_to_map(self): """Iterator on (AT_number, data) Once the data has been mapped, the corresponding ptr must be set in 'self.ptrs[AT_number]' """ for AT_number in [self.AT_PLATFORM, self.AT_RANDOM]: yield (AT_number, self.info[AT_number]) def iteritems(self): """Iterator on auxiliary vector id and values""" for AT_number, value in viewitems(self.info): if AT_number in self.ptrs: value = self.ptrs[AT_number] if value is None: # AT to ignore continue yield (AT_number, value) items = iteritems def prepare_loader_x86_64(jitter, argv, envp, auxv, linux_env, hlt_address=0x13371acc): """Fill the environment with enough information to run a linux loader @jitter: Jitter instance @argv: list of strings @envp: dict of environment variables names to their values @auxv: AuxVec instance @hlt_address (default to 0x13371acc): stopping address Example of use: >>> jitter = machine.jitter() >>> jitter.init_stack() >>> linux_env = LinuxEnvironment_x86_64() >>> argv = ["/bin/ls", "-lah"] >>> envp = {"PATH": "/usr/local/bin", "USER": linux_env.user_name} >>> auxv = AuxVec(elf_base_addr, entry_point, linux_env) >>> prepare_loader_x86_64(jitter, argv, envp, auxv, linux_env) # One may want to enable syscall handling here # The program can now run from the loader >>> jitter.init_run(ld_entry_point) >>> jitter.continue_run() """ # Stack layout looks like # [data] # - auxv values # - envp name=value # - argv arguments # [auxiliary vector] # [environment pointer] # [argument vector] for AT_number, data in auxv.data_to_map(): data += b"\x00" jitter.cpu.RSP -= len(data) ptr = jitter.cpu.RSP jitter.vm.set_mem(ptr, data) auxv.ptrs[AT_number] = ptr env_ptrs = [] for name, value in viewitems(envp): env = b"%s=%s\x00" % (name, value) jitter.cpu.RSP -= len(env) ptr = jitter.cpu.RSP jitter.vm.set_mem(ptr, env) env_ptrs.append(ptr) argv_ptrs = [] for arg in argv: arg += b"\x00" jitter.cpu.RSP -= len(arg) ptr = jitter.cpu.RSP jitter.vm.set_mem(ptr, arg) argv_ptrs.append(ptr) jitter.push_uint64_t(hlt_address) jitter.push_uint64_t(0) jitter.push_uint64_t(0) for auxid, auxval in viewitems(auxv): jitter.push_uint64_t(auxval) jitter.push_uint64_t(auxid) jitter.push_uint64_t(0) for ptr in reversed(env_ptrs): jitter.push_uint64_t(ptr) jitter.push_uint64_t(0) for ptr in reversed(argv_ptrs): jitter.push_uint64_t(ptr) jitter.push_uint64_t(len(argv)) def _arml__kuser_get_tls(linux_env, jitter): # __kuser_get_tls jitter.pc = jitter.cpu.LR jitter.cpu.R0 = linux_env.tls return True def _arml__kuser_cmpxchg(jitter): oldval = jitter.cpu.R0 newval = jitter.cpu.R1 ptr = jitter.cpu.R2 value = struct.unpack("<I", jitter.vm.get_mem(ptr, 4))[0] if value == oldval: jitter.vm.set_mem(ptr, struct.pack("<I", newval)) jitter.cpu.R0 = 0 jitter.cpu.cf = 1 else: jitter.cpu.R0 = -1 jitter.cpu.cf = 0 jitter.pc = jitter.cpu.LR return True def _arml__kuser_memory_barrier(jitter): # __kuser_memory_barrier jitter.pc = jitter.cpu.LR return True def _arml__kuser_helper_version(linux_env, jitter): jitter.pc = jitter.cpu.LR jitter.cpu.R0 = linux_env.kuser_helper_version return True def prepare_loader_arml(jitter, argv, envp, auxv, linux_env, hlt_address=0x13371acc): """Fill the environment with enough information to run a linux loader @jitter: Jitter instance @argv: list of strings @envp: dict of environment variables names to their values @auxv: AuxVec instance @hlt_address (default to 0x13371acc): stopping address Example of use: >>> jitter = machine.jitter() >>> jitter.init_stack() >>> linux_env = LinuxEnvironment_arml() >>> argv = ["/bin/ls", "-lah"] >>> envp = {"PATH": "/usr/local/bin", "USER": linux_env.user_name} >>> auxv = AuxVec(elf_base_addr, entry_point, linux_env) >>> prepare_loader_arml(jitter, argv, envp, auxv, linux_env) # One may want to enable syscall handling here # The program can now run from the loader >>> jitter.init_run(ld_entry_point) >>> jitter.continue_run() """ # Stack layout looks like # [data] # - auxv values # - envp name=value # - argv arguments # [auxiliary vector] # [environment pointer] # [argument vector] for AT_number, data in auxv.data_to_map(): data += b"\x00" jitter.cpu.SP -= len(data) ptr = jitter.cpu.SP jitter.vm.set_mem(ptr, data) auxv.ptrs[AT_number] = ptr env_ptrs = [] for name, value in viewitems(envp): env = b"%s=%s\x00" % (name, value) jitter.cpu.SP -= len(env) ptr = jitter.cpu.SP jitter.vm.set_mem(ptr, env) env_ptrs.append(ptr) argv_ptrs = [] for arg in argv: arg += b"\x00" jitter.cpu.SP -= len(arg) ptr = jitter.cpu.SP jitter.vm.set_mem(ptr, arg) argv_ptrs.append(ptr) jitter.push_uint32_t(hlt_address) jitter.push_uint32_t(0) jitter.push_uint32_t(0) for auxid, auxval in viewitems(auxv): jitter.push_uint32_t(auxval) jitter.push_uint32_t(auxid) jitter.push_uint32_t(0) for ptr in reversed(env_ptrs): jitter.push_uint32_t(ptr) jitter.push_uint32_t(0) for ptr in reversed(argv_ptrs): jitter.push_uint32_t(ptr) jitter.push_uint32_t(len(argv)) # Add kernel user helpers # from Documentation/arm/kernel_user_helpers.txt if linux_env.kuser_helper_version >= 1: jitter.add_breakpoint( 0xFFFF0FE0, functools.partial(_arml__kuser_get_tls, linux_env) ) if linux_env.kuser_helper_version >= 2: jitter.add_breakpoint(0XFFFF0FC0, _arml__kuser_cmpxchg) if linux_env.kuser_helper_version >= 3: jitter.add_breakpoint(0xFFFF0FA0, _arml__kuser_memory_barrier) jitter.add_breakpoint(0xffff0ffc, _arml__kuser_helper_version)
Module variables
var PAGE_READ
var PAGE_WRITE
var console_handler
var log
Functions
def prepare_loader_arml(
jitter, argv, envp, auxv, linux_env, hlt_address=322378444)
Fill the environment with enough information to run a linux loader
@jitter: Jitter instance @argv: list of strings @envp: dict of environment variables names to their values @auxv: AuxVec instance @hlt_address (default to 0x13371acc): stopping address
Example of use:
jitter = machine.jitter() jitter.init_stack() linux_env = LinuxEnvironment_arml() argv = ["/bin/ls", "-lah"] envp = {"PATH": "/usr/local/bin", "USER": linux_env.user_name} auxv = AuxVec(elf_base_addr, entry_point, linux_env) prepare_loader_arml(jitter, argv, envp, auxv, linux_env)
One may want to enable syscall handling here
The program can now run from the loader
jitter.init_run(ld_entry_point) jitter.continue_run()
def prepare_loader_arml(jitter, argv, envp, auxv, linux_env, hlt_address=0x13371acc): """Fill the environment with enough information to run a linux loader @jitter: Jitter instance @argv: list of strings @envp: dict of environment variables names to their values @auxv: AuxVec instance @hlt_address (default to 0x13371acc): stopping address Example of use: >>> jitter = machine.jitter() >>> jitter.init_stack() >>> linux_env = LinuxEnvironment_arml() >>> argv = ["/bin/ls", "-lah"] >>> envp = {"PATH": "/usr/local/bin", "USER": linux_env.user_name} >>> auxv = AuxVec(elf_base_addr, entry_point, linux_env) >>> prepare_loader_arml(jitter, argv, envp, auxv, linux_env) # One may want to enable syscall handling here # The program can now run from the loader >>> jitter.init_run(ld_entry_point) >>> jitter.continue_run() """ # Stack layout looks like # [data] # - auxv values # - envp name=value # - argv arguments # [auxiliary vector] # [environment pointer] # [argument vector] for AT_number, data in auxv.data_to_map(): data += b"\x00" jitter.cpu.SP -= len(data) ptr = jitter.cpu.SP jitter.vm.set_mem(ptr, data) auxv.ptrs[AT_number] = ptr env_ptrs = [] for name, value in viewitems(envp): env = b"%s=%s\x00" % (name, value) jitter.cpu.SP -= len(env) ptr = jitter.cpu.SP jitter.vm.set_mem(ptr, env) env_ptrs.append(ptr) argv_ptrs = [] for arg in argv: arg += b"\x00" jitter.cpu.SP -= len(arg) ptr = jitter.cpu.SP jitter.vm.set_mem(ptr, arg) argv_ptrs.append(ptr) jitter.push_uint32_t(hlt_address) jitter.push_uint32_t(0) jitter.push_uint32_t(0) for auxid, auxval in viewitems(auxv): jitter.push_uint32_t(auxval) jitter.push_uint32_t(auxid) jitter.push_uint32_t(0) for ptr in reversed(env_ptrs): jitter.push_uint32_t(ptr) jitter.push_uint32_t(0) for ptr in reversed(argv_ptrs): jitter.push_uint32_t(ptr) jitter.push_uint32_t(len(argv)) # Add kernel user helpers # from Documentation/arm/kernel_user_helpers.txt if linux_env.kuser_helper_version >= 1: jitter.add_breakpoint( 0xFFFF0FE0, functools.partial(_arml__kuser_get_tls, linux_env) ) if linux_env.kuser_helper_version >= 2: jitter.add_breakpoint(0XFFFF0FC0, _arml__kuser_cmpxchg) if linux_env.kuser_helper_version >= 3: jitter.add_breakpoint(0xFFFF0FA0, _arml__kuser_memory_barrier) jitter.add_breakpoint(0xffff0ffc, _arml__kuser_helper_version)
def prepare_loader_x86_64(
jitter, argv, envp, auxv, linux_env, hlt_address=322378444)
Fill the environment with enough information to run a linux loader
@jitter: Jitter instance @argv: list of strings @envp: dict of environment variables names to their values @auxv: AuxVec instance @hlt_address (default to 0x13371acc): stopping address
Example of use:
jitter = machine.jitter() jitter.init_stack() linux_env = LinuxEnvironment_x86_64() argv = ["/bin/ls", "-lah"] envp = {"PATH": "/usr/local/bin", "USER": linux_env.user_name} auxv = AuxVec(elf_base_addr, entry_point, linux_env) prepare_loader_x86_64(jitter, argv, envp, auxv, linux_env)
One may want to enable syscall handling here
The program can now run from the loader
jitter.init_run(ld_entry_point) jitter.continue_run()
def prepare_loader_x86_64(jitter, argv, envp, auxv, linux_env, hlt_address=0x13371acc): """Fill the environment with enough information to run a linux loader @jitter: Jitter instance @argv: list of strings @envp: dict of environment variables names to their values @auxv: AuxVec instance @hlt_address (default to 0x13371acc): stopping address Example of use: >>> jitter = machine.jitter() >>> jitter.init_stack() >>> linux_env = LinuxEnvironment_x86_64() >>> argv = ["/bin/ls", "-lah"] >>> envp = {"PATH": "/usr/local/bin", "USER": linux_env.user_name} >>> auxv = AuxVec(elf_base_addr, entry_point, linux_env) >>> prepare_loader_x86_64(jitter, argv, envp, auxv, linux_env) # One may want to enable syscall handling here # The program can now run from the loader >>> jitter.init_run(ld_entry_point) >>> jitter.continue_run() """ # Stack layout looks like # [data] # - auxv values # - envp name=value # - argv arguments # [auxiliary vector] # [environment pointer] # [argument vector] for AT_number, data in auxv.data_to_map(): data += b"\x00" jitter.cpu.RSP -= len(data) ptr = jitter.cpu.RSP jitter.vm.set_mem(ptr, data) auxv.ptrs[AT_number] = ptr env_ptrs = [] for name, value in viewitems(envp): env = b"%s=%s\x00" % (name, value) jitter.cpu.RSP -= len(env) ptr = jitter.cpu.RSP jitter.vm.set_mem(ptr, env) env_ptrs.append(ptr) argv_ptrs = [] for arg in argv: arg += b"\x00" jitter.cpu.RSP -= len(arg) ptr = jitter.cpu.RSP jitter.vm.set_mem(ptr, arg) argv_ptrs.append(ptr) jitter.push_uint64_t(hlt_address) jitter.push_uint64_t(0) jitter.push_uint64_t(0) for auxid, auxval in viewitems(auxv): jitter.push_uint64_t(auxval) jitter.push_uint64_t(auxid) jitter.push_uint64_t(0) for ptr in reversed(env_ptrs): jitter.push_uint64_t(ptr) jitter.push_uint64_t(0) for ptr in reversed(argv_ptrs): jitter.push_uint64_t(ptr) jitter.push_uint64_t(len(argv))
Classes
class AuxVec
Auxiliary vector abstraction, filled with default values (mainly based on https://lwn.net/Articles/519085)
Standard usage
auxv = AuxVec(elf_base_addr, cont_target.entry_point, linux_env)
Enable AT_SECURE
auxv = AuxVec(..., AuxVec.AT_SECURE=1)
Modify AT_RANDOM
auxv = AuxVec(..., AuxVec.AT_RANDOM=" "*0x10)
Using AuxVec instance for stack preparation
First, fill memory with vectors data
for AT_number, data in auxv.data_to_map(): dest_ptr = ... copy_to_dest(data, dest_ptr) auxv.ptrs[AT_number] = dest_ptr
Then, get the key: value (with value being sometime a pointer)
for auxid, auxval in auxv.iteritems(): ...
class AuxVec(object): """Auxiliary vector abstraction, filled with default values (mainly based on https://lwn.net/Articles/519085) # Standard usage >>> auxv = AuxVec(elf_base_addr, cont_target.entry_point, linux_env) # Enable AT_SECURE >>> auxv = AuxVec(..., AuxVec.AT_SECURE=1) # Modify AT_RANDOM >>> auxv = AuxVec(..., AuxVec.AT_RANDOM="\x00"*0x10) # Using AuxVec instance for stack preparation # First, fill memory with vectors data >>> for AT_number, data in auxv.data_to_map(): dest_ptr = ... copy_to_dest(data, dest_ptr) auxv.ptrs[AT_number] = dest_ptr # Then, get the key: value (with value being sometime a pointer) >>> for auxid, auxval in auxv.iteritems(): ... """ AT_PHDR = 3 AT_PHNUM = 5 AT_PAGESZ = 6 AT_ENTRY = 9 AT_UID = 11 AT_EUID = 12 AT_GID = 13 AT_EGID = 14 AT_PLATFORM = 15 AT_HWCAP = 16 AT_SECURE = 23 AT_RANDOM = 25 AT_SYSINFO_EHDR = 33 def __init__(self, elf_phdr_vaddr, entry_point, linux_env, **kwargs): """Instantiate an AuxVec, with required elements: - elf_phdr_vaddr: virtual address of the ELF's PHDR in memory - entry_point: virtual address of the ELF entry point - linux_env: LinuxEnvironment instance, used to provides some of the option values Others options can be overridden by named arguments """ self.info = { self.AT_PHDR: elf_phdr_vaddr, self.AT_PHNUM: 9, self.AT_PAGESZ: 0x1000, self.AT_ENTRY: entry_point, self.AT_UID: linux_env.user_uid, self.AT_EUID: linux_env.user_euid, self.AT_GID: linux_env.user_gid, self.AT_EGID: linux_env.user_egid, self.AT_PLATFORM: linux_env.platform_arch, self.AT_HWCAP: 0, self.AT_SECURE: 0, self.AT_RANDOM: b"\x00" * 0x10, # vDSO is not mandatory self.AT_SYSINFO_EHDR: None, } self.info.update(kwargs) self.ptrs = {} # info key -> corresponding virtual address def data_to_map(self): """Iterator on (AT_number, data) Once the data has been mapped, the corresponding ptr must be set in 'self.ptrs[AT_number]' """ for AT_number in [self.AT_PLATFORM, self.AT_RANDOM]: yield (AT_number, self.info[AT_number]) def iteritems(self): """Iterator on auxiliary vector id and values""" for AT_number, value in viewitems(self.info): if AT_number in self.ptrs: value = self.ptrs[AT_number] if value is None: # AT to ignore continue yield (AT_number, value) items = iteritems
Ancestors (in MRO)
- AuxVec
- builtins.object
Class variables
var AT_EGID
var AT_ENTRY
var AT_EUID
var AT_GID
var AT_HWCAP
var AT_PAGESZ
var AT_PHDR
var AT_PHNUM
var AT_PLATFORM
var AT_RANDOM
var AT_SECURE
var AT_SYSINFO_EHDR
var AT_UID
Static methods
def __init__(
self, elf_phdr_vaddr, entry_point, linux_env, **kwargs)
Instantiate an AuxVec, with required elements: - elf_phdr_vaddr: virtual address of the ELF's PHDR in memory - entry_point: virtual address of the ELF entry point - linux_env: LinuxEnvironment instance, used to provides some of the option values
Others options can be overridden by named arguments
def __init__(self, elf_phdr_vaddr, entry_point, linux_env, **kwargs): """Instantiate an AuxVec, with required elements: - elf_phdr_vaddr: virtual address of the ELF's PHDR in memory - entry_point: virtual address of the ELF entry point - linux_env: LinuxEnvironment instance, used to provides some of the option values Others options can be overridden by named arguments """ self.info = { self.AT_PHDR: elf_phdr_vaddr, self.AT_PHNUM: 9, self.AT_PAGESZ: 0x1000, self.AT_ENTRY: entry_point, self.AT_UID: linux_env.user_uid, self.AT_EUID: linux_env.user_euid, self.AT_GID: linux_env.user_gid, self.AT_EGID: linux_env.user_egid, self.AT_PLATFORM: linux_env.platform_arch, self.AT_HWCAP: 0, self.AT_SECURE: 0, self.AT_RANDOM: b"\x00" * 0x10, # vDSO is not mandatory self.AT_SYSINFO_EHDR: None, } self.info.update(kwargs) self.ptrs = {} # info key -> corresponding virtual address
def data_to_map(
self)
Iterator on (AT_number, data) Once the data has been mapped, the corresponding ptr must be set in 'self.ptrs[AT_number]'
def data_to_map(self): """Iterator on (AT_number, data) Once the data has been mapped, the corresponding ptr must be set in 'self.ptrs[AT_number]' """ for AT_number in [self.AT_PLATFORM, self.AT_RANDOM]: yield (AT_number, self.info[AT_number])
def items(
self)
Iterator on auxiliary vector id and values
def iteritems(self): """Iterator on auxiliary vector id and values""" for AT_number, value in viewitems(self.info): if AT_number in self.ptrs: value = self.ptrs[AT_number] if value is None: # AT to ignore continue yield (AT_number, value)
def iteritems(
self)
Iterator on auxiliary vector id and values
def iteritems(self): """Iterator on auxiliary vector id and values""" for AT_number, value in viewitems(self.info): if AT_number in self.ptrs: value = self.ptrs[AT_number] if value is None: # AT to ignore continue yield (AT_number, value)
Instance variables
var info
var ptrs
class FileDescriptor
Stand for a file descriptor on a system
According to inode(7), following types are possibles: - socket - symbolic link - regular file - block device - directory - character device - FIFO
class FileDescriptor(object): """Stand for a file descriptor on a system According to inode(7), following types are possibles: - socket - symbolic link - regular file - block device - directory - character device - FIFO """ # st_mode's file type file_type = None # st_mode's file mode (9 least bits are file permission bits) file_mode = 0o0777 # st_dev / st_rdev cont_device_id = None device_id = 0 # inode number (st_ino) inode = None # Number of hardlink (st_nlink) nlink = 0 # Owner / group uid = None gid = None # Size (st_size / st_blksize / st_blocks) size = 0 blksize = 0 blocks = 0 # Times atime = 0 atimensec = 0 mtime = 0 mtimensec = 0 ctime = 0 ctimensec = 0 def __init__(self, number): self.number = number self.is_closed = False def stat(self): mode = self.file_type | self.file_mode return StatInfo( st_dev=self.cont_device_id, st_ino=self.inode, st_nlink=self.nlink, st_mode=mode, st_uid=self.uid, st_gid=self.gid, st_rdev=self.device_id, st_size=self.size, st_blksize=self.blksize, st_blocks=self.blocks, st_atime=self.atime, st_atimensec=self.atimensec, st_mtime=self.mtime, st_mtimensec=self.mtimensec, st_ctime=self.ctime, st_ctimensec=self.ctimensec ) def close(self): self.is_closed = True
Ancestors (in MRO)
- FileDescriptor
- builtins.object
Class variables
var atime
var atimensec
var blksize
var blocks
var cont_device_id
var ctime
var ctimensec
var device_id
var file_mode
var file_type
var gid
var inode
var mtime
var mtimensec
var nlink
var size
var uid
Static methods
def __init__(
self, number)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, number): self.number = number self.is_closed = False
def close(
self)
def close(self): self.is_closed = True
def stat(
self)
def stat(self): mode = self.file_type | self.file_mode return StatInfo( st_dev=self.cont_device_id, st_ino=self.inode, st_nlink=self.nlink, st_mode=mode, st_uid=self.uid, st_gid=self.gid, st_rdev=self.device_id, st_size=self.size, st_blksize=self.blksize, st_blocks=self.blocks, st_atime=self.atime, st_atimensec=self.atimensec, st_mtime=self.mtime, st_mtimensec=self.mtimensec, st_ctime=self.ctime, st_ctimensec=self.ctimensec )
Instance variables
var is_closed
var number
class FileDescriptorCharDevice
Stand for a file descriptor on a system
According to inode(7), following types are possibles: - socket - symbolic link - regular file - block device - directory - character device - FIFO
class FileDescriptorCharDevice(FileDescriptor): file_type = 0o0020000 # S_IFCHR file_mode = 0o0620 cont_device_id = 1 device_id = 1
Ancestors (in MRO)
- FileDescriptorCharDevice
- FileDescriptor
- builtins.object
Class variables
var atime
var atimensec
var blksize
var blocks
var cont_device_id
var ctime
var ctimensec
var device_id
var file_mode
var gid
var inode
var mtime
var mtimensec
var nlink
var size
var uid
Static methods
def __init__(
self, number)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, number): self.number = number self.is_closed = False
def close(
self)
def close(self): self.is_closed = True
def stat(
self)
def stat(self): mode = self.file_type | self.file_mode return StatInfo( st_dev=self.cont_device_id, st_ino=self.inode, st_nlink=self.nlink, st_mode=mode, st_uid=self.uid, st_gid=self.gid, st_rdev=self.device_id, st_size=self.size, st_blksize=self.blksize, st_blocks=self.blocks, st_atime=self.atime, st_atimensec=self.atimensec, st_mtime=self.mtime, st_mtimensec=self.mtimensec, st_ctime=self.ctime, st_ctimensec=self.ctimensec )
class FileDescriptorDirectory
FileDescription designing a directory
class FileDescriptorDirectory(FileDescriptor): """FileDescription designing a directory""" file_type = 0o0040000 # S_IFDIR def __init__(self, number, flags, filesystem, real_path): super(FileDescriptorDirectory, self).__init__(number) self.filesystem = filesystem self.real_path = real_path self.cur_listdir = None self.flags = flags def listdir(self): if self.cur_listdir is None: self.cur_listdir = os.listdir(self.real_path) while self.cur_listdir: yield self.cur_listdir.pop()
Ancestors (in MRO)
- FileDescriptorDirectory
- FileDescriptor
- builtins.object
Class variables
var atime
var atimensec
var blksize
var blocks
var cont_device_id
var ctime
var ctimensec
var device_id
var file_mode
var file_type
var gid
var inode
var mtime
var mtimensec
var nlink
var size
var uid
Static methods
def __init__(
self, number, flags, filesystem, real_path)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, number, flags, filesystem, real_path): super(FileDescriptorDirectory, self).__init__(number) self.filesystem = filesystem self.real_path = real_path self.cur_listdir = None self.flags = flags
def close(
self)
def close(self): self.is_closed = True
def listdir(
self)
def listdir(self): if self.cur_listdir is None: self.cur_listdir = os.listdir(self.real_path) while self.cur_listdir: yield self.cur_listdir.pop()
def stat(
self)
def stat(self): mode = self.file_type | self.file_mode return StatInfo( st_dev=self.cont_device_id, st_ino=self.inode, st_nlink=self.nlink, st_mode=mode, st_uid=self.uid, st_gid=self.gid, st_rdev=self.device_id, st_size=self.size, st_blksize=self.blksize, st_blocks=self.blocks, st_atime=self.atime, st_atimensec=self.atimensec, st_mtime=self.mtime, st_mtimensec=self.mtimensec, st_ctime=self.ctime, st_ctimensec=self.ctimensec )
Instance variables
var cur_listdir
var filesystem
var flags
var real_path
class FileDescriptorRegularFile
FileDescriptor designing a regular file
class FileDescriptorRegularFile(FileDescriptor): """FileDescriptor designing a regular file""" file_type = 0o0100000 # S_IFREG def __init__(self, number, flags, filesystem, real_fd): super(FileDescriptorRegularFile, self).__init__(number) self.flags = flags self.filesystem = filesystem self.real_fd = real_fd def write(self, data): raise RuntimeError("Not implemented") def read(self, count): return os.read(self.real_fd, count) def close(self): super(FileDescriptorRegularFile, self).close() return os.close(self.real_fd) def lseek(self, offset, whence): return os.lseek(self.real_fd, offset, whence) # SEEK_SET def tell(self): return self.lseek(0, 1) # SEEK_CUR def seek(self, offset): return self.lseek(offset, 0) # SEEK_SET
Ancestors (in MRO)
- FileDescriptorRegularFile
- FileDescriptor
- builtins.object
Class variables
var atime
var atimensec
var blksize
var blocks
var cont_device_id
var ctime
var ctimensec
var device_id
var file_mode
var file_type
var gid
var inode
var mtime
var mtimensec
var nlink
var size
var uid
Static methods
def __init__(
self, number, flags, filesystem, real_fd)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, number, flags, filesystem, real_fd): super(FileDescriptorRegularFile, self).__init__(number) self.flags = flags self.filesystem = filesystem self.real_fd = real_fd
def close(
self)
def close(self): super(FileDescriptorRegularFile, self).close() return os.close(self.real_fd)
def lseek(
self, offset, whence)
def lseek(self, offset, whence): return os.lseek(self.real_fd, offset, whence) # SEEK_SET
def read(
self, count)
def read(self, count): return os.read(self.real_fd, count)
def seek(
self, offset)
def seek(self, offset): return self.lseek(offset, 0) # SEEK_SET
def stat(
self)
def stat(self): mode = self.file_type | self.file_mode return StatInfo( st_dev=self.cont_device_id, st_ino=self.inode, st_nlink=self.nlink, st_mode=mode, st_uid=self.uid, st_gid=self.gid, st_rdev=self.device_id, st_size=self.size, st_blksize=self.blksize, st_blocks=self.blocks, st_atime=self.atime, st_atimensec=self.atimensec, st_mtime=self.mtime, st_mtimensec=self.mtimensec, st_ctime=self.ctime, st_ctimensec=self.ctimensec )
def tell(
self)
def tell(self): return self.lseek(0, 1) # SEEK_CUR
def write(
self, data)
def write(self, data): raise RuntimeError("Not implemented")
Instance variables
var filesystem
var flags
var real_fd
class FileDescriptorSTDERR
Special file descriptor standinf for STDERR
class FileDescriptorSTDERR(FileDescriptorCharDevice): """Special file descriptor standinf for STDERR""" inode = 2 def write(self, data): print("[STDERR] %s" % data.rstrip())
Ancestors (in MRO)
- FileDescriptorSTDERR
- FileDescriptorCharDevice
- FileDescriptor
- builtins.object
Class variables
var atime
var atimensec
var blksize
var blocks
var cont_device_id
var ctime
var ctimensec
var device_id
var file_mode
var file_type
var gid
var mtime
var mtimensec
var nlink
var size
var uid
Static methods
def __init__(
self, number)
Inheritance:
FileDescriptor
.__init__
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, number): self.number = number self.is_closed = False
def close(
self)
def close(self): self.is_closed = True
def stat(
self)
def stat(self): mode = self.file_type | self.file_mode return StatInfo( st_dev=self.cont_device_id, st_ino=self.inode, st_nlink=self.nlink, st_mode=mode, st_uid=self.uid, st_gid=self.gid, st_rdev=self.device_id, st_size=self.size, st_blksize=self.blksize, st_blocks=self.blocks, st_atime=self.atime, st_atimensec=self.atimensec, st_mtime=self.mtime, st_mtimensec=self.mtimensec, st_ctime=self.ctime, st_ctimensec=self.ctimensec )
def write(
self, data)
def write(self, data): print("[STDERR] %s" % data.rstrip())
class FileDescriptorSTDIN
Special file descriptor standinf for STDIN
class FileDescriptorSTDIN(FileDescriptorCharDevice): """Special file descriptor standinf for STDIN""" inode = 0 def read(self, count): raise RuntimeError("Not implemented")
Ancestors (in MRO)
- FileDescriptorSTDIN
- FileDescriptorCharDevice
- FileDescriptor
- builtins.object
Class variables
var atime
var atimensec
var blksize
var blocks
var cont_device_id
var ctime
var ctimensec
var device_id
var file_mode
var file_type
var gid
var mtime
var mtimensec
var nlink
var size
var uid
Static methods
def __init__(
self, number)
Inheritance:
FileDescriptor
.__init__
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, number): self.number = number self.is_closed = False
def close(
self)
def close(self): self.is_closed = True
def read(
self, count)
def read(self, count): raise RuntimeError("Not implemented")
def stat(
self)
def stat(self): mode = self.file_type | self.file_mode return StatInfo( st_dev=self.cont_device_id, st_ino=self.inode, st_nlink=self.nlink, st_mode=mode, st_uid=self.uid, st_gid=self.gid, st_rdev=self.device_id, st_size=self.size, st_blksize=self.blksize, st_blocks=self.blocks, st_atime=self.atime, st_atimensec=self.atimensec, st_mtime=self.mtime, st_mtimensec=self.mtimensec, st_ctime=self.ctime, st_ctimensec=self.ctimensec )
class FileDescriptorSTDOUT
Special file descriptor standinf for STDOUT
class FileDescriptorSTDOUT(FileDescriptorCharDevice): """Special file descriptor standinf for STDOUT""" inode = 1 def write(self, data): print("[STDOUT] %s" % data.rstrip())
Ancestors (in MRO)
- FileDescriptorSTDOUT
- FileDescriptorCharDevice
- FileDescriptor
- builtins.object
Class variables
var atime
var atimensec
var blksize
var blocks
var cont_device_id
var ctime
var ctimensec
var device_id
var file_mode
var file_type
var gid
var mtime
var mtimensec
var nlink
var size
var uid
Static methods
def __init__(
self, number)
Inheritance:
FileDescriptor
.__init__
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, number): self.number = number self.is_closed = False
def close(
self)
def close(self): self.is_closed = True
def stat(
self)
def stat(self): mode = self.file_type | self.file_mode return StatInfo( st_dev=self.cont_device_id, st_ino=self.inode, st_nlink=self.nlink, st_mode=mode, st_uid=self.uid, st_gid=self.gid, st_rdev=self.device_id, st_size=self.size, st_blksize=self.blksize, st_blocks=self.blocks, st_atime=self.atime, st_atimensec=self.atimensec, st_mtime=self.mtime, st_mtimensec=self.mtimensec, st_ctime=self.ctime, st_ctimensec=self.ctimensec )
def write(
self, data)
def write(self, data): print("[STDOUT] %s" % data.rstrip())
class FileDescriptorSocket
FileDescription standing for a socket
class FileDescriptorSocket(FileDescriptor): """FileDescription standing for a socket""" file_type = 0o0140000 # S_IFSOCK def __init__(self, number, family, type_, protocol): super(FileDescriptorSocket, self).__init__(number) self.family = family self.type_ = type_ self.protocol = protocol
Ancestors (in MRO)
- FileDescriptorSocket
- FileDescriptor
- builtins.object
Class variables
var atime
var atimensec
var blksize
var blocks
var cont_device_id
var ctime
var ctimensec
var device_id
var file_mode
var file_type
var gid
var inode
var mtime
var mtimensec
var nlink
var size
var uid
Static methods
def __init__(
self, number, family, type_, protocol)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, number, family, type_, protocol): super(FileDescriptorSocket, self).__init__(number) self.family = family self.type_ = type_ self.protocol = protocol
def close(
self)
def close(self): self.is_closed = True
def stat(
self)
def stat(self): mode = self.file_type | self.file_mode return StatInfo( st_dev=self.cont_device_id, st_ino=self.inode, st_nlink=self.nlink, st_mode=mode, st_uid=self.uid, st_gid=self.gid, st_rdev=self.device_id, st_size=self.size, st_blksize=self.blksize, st_blocks=self.blocks, st_atime=self.atime, st_atimensec=self.atimensec, st_mtime=self.mtime, st_mtimensec=self.mtimensec, st_ctime=self.ctime, st_ctimensec=self.ctimensec )
Instance variables
var family
var protocol
var type_
class FileSystem
File system abstraction Provides standard operations on the filesystem, (a bit like FUSE)
API using FileSystem only used sandbox-side path. FileSystem should be the only object able to interact with real path, outside the sandbox.
Thus, if resolve_path
is correctly implemented and used, it should not be
possible to modify files outside the sandboxed path
class FileSystem(object): """File system abstraction Provides standard operations on the filesystem, (a bit like FUSE) API using FileSystem only used sandbox-side path. FileSystem should be the only object able to interact with real path, outside the sandbox. Thus, if `resolve_path` is correctly implemented and used, it should not be possible to modify files outside the sandboxed path """ device_id = 0x1234 # ID of device containing file (stat.st_dev) blocksize = 0x1000 # Size of block on this filesystem f_type = 0xef53 # (Type of filesystem) EXT4_SUPER_MAGIC nb_total_block = 0x1000 nb_free_block = 0x100 nb_avail_block = nb_free_block # Available to unprivileged user nb_total_fnode = 100 # Total file nodes in filesystem nb_free_fnode = 50 max_filename_len = 256 fragment_size = 0 mount_flags = 0 def __init__(self, base_path, linux_env): self.base_path = base_path self.linux_env = linux_env self.passthrough = [] self.path_to_inode = {} # Real path (post-resolution) -> inode number def resolve_path(self, path, follow_link=True): """Resolve @path to the corresponding sandboxed path""" # path_bytes is used for Python 2 / Python 3 compatibility path_bytes = not isinstance(path, str) path_sep = os.path.sep.encode() if path_bytes else os.path.sep if path_bytes: def _convert(subpath): if not isinstance(subpath, str): return subpath return subpath.encode() def _convert_re(expr): if isinstance(expr.pattern, str): try: return re.compile( expr.pattern.encode(), flags=expr.flags & ~re.UNICODE ) except UnicodeEncodeError: # Will never match log.warning( 'Cannot convert regexp to bytes %r %r', expr.pattern, expr.flags, exc_info=True, ) return re.compile(b'$X') return expr else: def _convert(subpath): if not isinstance(subpath, str): return subpath.decode() return subpath def _convert_re(expr): if not isinstance(expr.pattern, str): try: return re.compile( expr.pattern.decode(), flags=expr.flags & re.UNICODE ) except UnicodeDecodeError: # Will never match log.warning( 'Cannot convert regexp to str %r %r', expr.pattern, expr.flags, exc_info=True, ) return re.compile('$X') return expr # Remove '../', etc. path = os.path.normpath(path) # Passthrough for passthrough in self.passthrough: if isinstance(passthrough, REGEXP_T): if _convert_re(passthrough).match(path): return path elif _convert(passthrough) == path: return path # Remove leading '/' if any path = path.lstrip(path_sep) base_path = os.path.abspath(_convert(self.base_path)) out_path = os.path.join(base_path, path) assert out_path.startswith(base_path + path_sep) if os.path.islink(out_path): link_target = os.readlink(out_path) # Link can be absolute or relative -> absolute link = os.path.normpath(os.path.join(os.path.dirname(path), link_target)) if follow_link: out_path = self.resolve_path(link) else: out_path = link return out_path def get_path_inode(self, real_path): inode = self.path_to_inode.setdefault(real_path, len(self.path_to_inode)) return inode def exists(self, path): sb_path = self.resolve_path(path) return os.path.exists(sb_path) def readlink(self, path): sb_path = self.resolve_path(path, follow_link=False) if not os.path.islink(sb_path): return None return os.readlink(sb_path) def statfs(self): return StatFSInfo( f_type=self.f_type, f_bsize=self.blocksize, f_blocks=self.nb_total_block, f_bfree=self.nb_free_block, f_bavail=self.nb_avail_block, f_files=self.nb_total_fnode, f_ffree=self.nb_free_fnode, f_fsid=self.device_id, f_namelen=self.max_filename_len, f_frsize=self.fragment_size, f_flags=self.mount_flags, f_spare=0) def getattr_(self, path, follow_link=True): sb_path = self.resolve_path(path, follow_link=follow_link) flags = self.linux_env.O_RDONLY if os.path.isdir(sb_path): flags |= self.linux_env.O_DIRECTORY fd = self.open_(path, flags, follow_link=follow_link) info = self.linux_env.fstat(fd) self.linux_env.close(fd) return info def open_(self, path, flags, follow_link=True): path = self.resolve_path(path, follow_link=follow_link) if not os.path.exists(path): # ENOENT (No such file or directory) return -1 fd = self.linux_env.next_fd() acc_mode = flags & self.linux_env.O_ACCMODE if os.path.isdir(path): assert flags & self.linux_env.O_DIRECTORY == self.linux_env.O_DIRECTORY if acc_mode == self.linux_env.O_RDONLY: fdesc = FileDescriptorDirectory(fd, flags, self, path) else: raise RuntimeError("Not implemented") elif os.path.isfile(path): if acc_mode == os.O_RDONLY: # Read only real_fd = os.open(path, os.O_RDONLY) else: raise RuntimeError("Not implemented") fdesc = FileDescriptorRegularFile(fd, flags, self, real_fd) elif os.path.islink(path): raise RuntimeError("Not implemented") else: raise RuntimeError("Unknown file type for %r" % path) self.linux_env.file_descriptors[fd] = fdesc # Set stat info fdesc.cont_device_id = self.device_id fdesc.inode = self.get_path_inode(path) fdesc.uid = self.linux_env.user_uid fdesc.gid = self.linux_env.user_gid size = os.path.getsize(path) fdesc.size = size fdesc.blksize = self.blocksize fdesc.blocks = (size + ((512 - (size % 512)) % 512)) // 512 return fd
Ancestors (in MRO)
- FileSystem
- builtins.object
Class variables
var blocksize
var device_id
var f_type
var fragment_size
var max_filename_len
var mount_flags
var nb_avail_block
var nb_free_block
var nb_free_fnode
var nb_total_block
var nb_total_fnode
Static methods
def __init__(
self, base_path, linux_env)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, base_path, linux_env): self.base_path = base_path self.linux_env = linux_env self.passthrough = [] self.path_to_inode = {} # Real path (post-resolution) -> inode number
def exists(
self, path)
def exists(self, path): sb_path = self.resolve_path(path) return os.path.exists(sb_path)
def get_path_inode(
self, real_path)
def get_path_inode(self, real_path): inode = self.path_to_inode.setdefault(real_path, len(self.path_to_inode)) return inode
def getattr_(
self, path, follow_link=True)
def getattr_(self, path, follow_link=True): sb_path = self.resolve_path(path, follow_link=follow_link) flags = self.linux_env.O_RDONLY if os.path.isdir(sb_path): flags |= self.linux_env.O_DIRECTORY fd = self.open_(path, flags, follow_link=follow_link) info = self.linux_env.fstat(fd) self.linux_env.close(fd) return info
def open_(
self, path, flags, follow_link=True)
def open_(self, path, flags, follow_link=True): path = self.resolve_path(path, follow_link=follow_link) if not os.path.exists(path): # ENOENT (No such file or directory) return -1 fd = self.linux_env.next_fd() acc_mode = flags & self.linux_env.O_ACCMODE if os.path.isdir(path): assert flags & self.linux_env.O_DIRECTORY == self.linux_env.O_DIRECTORY if acc_mode == self.linux_env.O_RDONLY: fdesc = FileDescriptorDirectory(fd, flags, self, path) else: raise RuntimeError("Not implemented") elif os.path.isfile(path): if acc_mode == os.O_RDONLY: # Read only real_fd = os.open(path, os.O_RDONLY) else: raise RuntimeError("Not implemented") fdesc = FileDescriptorRegularFile(fd, flags, self, real_fd) elif os.path.islink(path): raise RuntimeError("Not implemented") else: raise RuntimeError("Unknown file type for %r" % path) self.linux_env.file_descriptors[fd] = fdesc # Set stat info fdesc.cont_device_id = self.device_id fdesc.inode = self.get_path_inode(path) fdesc.uid = self.linux_env.user_uid fdesc.gid = self.linux_env.user_gid size = os.path.getsize(path) fdesc.size = size fdesc.blksize = self.blocksize fdesc.blocks = (size + ((512 - (size % 512)) % 512)) // 512 return fd
def readlink(
self, path)
def readlink(self, path): sb_path = self.resolve_path(path, follow_link=False) if not os.path.islink(sb_path): return None return os.readlink(sb_path)
def resolve_path(
self, path, follow_link=True)
Resolve @path to the corresponding sandboxed path
def resolve_path(self, path, follow_link=True): """Resolve @path to the corresponding sandboxed path""" # path_bytes is used for Python 2 / Python 3 compatibility path_bytes = not isinstance(path, str) path_sep = os.path.sep.encode() if path_bytes else os.path.sep if path_bytes: def _convert(subpath): if not isinstance(subpath, str): return subpath return subpath.encode() def _convert_re(expr): if isinstance(expr.pattern, str): try: return re.compile( expr.pattern.encode(), flags=expr.flags & ~re.UNICODE ) except UnicodeEncodeError: # Will never match log.warning( 'Cannot convert regexp to bytes %r %r', expr.pattern, expr.flags, exc_info=True, ) return re.compile(b'$X') return expr else: def _convert(subpath): if not isinstance(subpath, str): return subpath.decode() return subpath def _convert_re(expr): if not isinstance(expr.pattern, str): try: return re.compile( expr.pattern.decode(), flags=expr.flags & re.UNICODE ) except UnicodeDecodeError: # Will never match log.warning( 'Cannot convert regexp to str %r %r', expr.pattern, expr.flags, exc_info=True, ) return re.compile('$X') return expr # Remove '../', etc. path = os.path.normpath(path) # Passthrough for passthrough in self.passthrough: if isinstance(passthrough, REGEXP_T): if _convert_re(passthrough).match(path): return path elif _convert(passthrough) == path: return path # Remove leading '/' if any path = path.lstrip(path_sep) base_path = os.path.abspath(_convert(self.base_path)) out_path = os.path.join(base_path, path) assert out_path.startswith(base_path + path_sep) if os.path.islink(out_path): link_target = os.readlink(out_path) # Link can be absolute or relative -> absolute link = os.path.normpath(os.path.join(os.path.dirname(path), link_target)) if follow_link: out_path = self.resolve_path(link) else: out_path = link return out_path
def statfs(
self)
def statfs(self): return StatFSInfo( f_type=self.f_type, f_bsize=self.blocksize, f_blocks=self.nb_total_block, f_bfree=self.nb_free_block, f_bavail=self.nb_avail_block, f_files=self.nb_total_fnode, f_ffree=self.nb_free_fnode, f_fsid=self.device_id, f_namelen=self.max_filename_len, f_frsize=self.fragment_size, f_flags=self.mount_flags, f_spare=0)
Instance variables
var base_path
var linux_env
var passthrough
var path_to_inode
class LinuxEnvironment
A LinuxEnvironment regroups information to simulate a Linux-like environment
class LinuxEnvironment(object): """A LinuxEnvironment regroups information to simulate a Linux-like environment""" # To be overridden platform_arch = None # User information user_uid = 1000 user_euid = 1000 user_gid = 1000 user_egid = 1000 user_name = b"user" # Memory mapping information brk_current = 0x74000000 mmap_current = 0x75000000 # System information sys_sysname = b"Linux" sys_nodename = b"user-pc" sys_release = b"4.13.0-19-generic" sys_version = b"#22-Ubuntu" sys_machine = None # Filesystem filesystem_base = "file_sb" file_descriptors = None # Current process process_tid = 1000 process_pid = 1000 # Syscall restrictions ioctl_allowed = None # list of (fd, cmd), None value for wildcard ioctl_disallowed = None # list of (fd, cmd), None value for wildcard # Time base_time = 1531900000 # Arch specific constant O_ACCMODE = None O_CLOEXEC = None O_DIRECTORY = None O_LARGEFILE = None O_NONBLOCK = None O_RDONLY = None def __init__(self): stdin = FileDescriptorSTDIN(0) stdout = FileDescriptorSTDOUT(1) stderr = FileDescriptorSTDERR(2) for std in [stdin, stdout, stderr]: std.uid = self.user_uid std.gid = self.user_gid self.file_descriptors = { 0: stdin, 1: stdout, 2: stderr, } self.ioctl_allowed = [ (0, termios.TCGETS), (0, termios.TIOCGWINSZ), (0, termios.TIOCSWINSZ), (1, termios.TCGETS), (1, termios.TIOCGWINSZ), (1, termios.TIOCSWINSZ), ] self.ioctl_disallowed = [ (2, termios.TCGETS), (0, termios.TCSETSW), ] self.filesystem = FileSystem(self.filesystem_base, self) self.network = Networking(self) def next_fd(self): return len(self.file_descriptors) def clock_gettime(self): out = self.base_time self.base_time += 1 return out def open_(self, path, flags, follow_link=True): """Stub for 'open' syscall""" return self.filesystem.open_(path, flags, follow_link=follow_link) def socket(self, family, type_, protocol): """Stub for 'socket' syscall""" return self.network.socket(family, type_, protocol) def fstat(self, fd): """Get file status through fd""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.stat() def stat(self, path): """Get file status through path""" return self.filesystem.getattr_(path) def lstat(self, path): """Get file status through path (not following links)""" return self.filesystem.getattr_(path, follow_link=False) def close(self, fd): """Stub for 'close' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.close() def write(self, fd, data): """Stub for 'write' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None fdesc.write(data) return len(data) def read(self, fd, count): """Stub for 'read' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.read(count) def getdents(self, fd, count, packing_callback): """Stub for 'getdents' syscall 'getdents64' must be handled by caller (only the structure layout is modified) @fd: getdents' fd argument @count: getdents' count argument @packing_callback(cur_len, d_ino, d_type, name) -> entry """ fdesc = self.file_descriptors[fd] if not isinstance(fdesc, FileDescriptorDirectory): raise RuntimeError("Not implemented") out = b"" # fdesc.listdir continues from where it stopped for name in fdesc.listdir(): d_ino = 1 # Not the real one d_type = 0 # DT_UNKNOWN (getdents(2) "All applications must properly # handle a return of DT_UNKNOWN.") entry = packing_callback(len(out), d_ino, d_type, name) if len(out) + len(entry) > count: # Report to a further call fdesc.cur_listdir.append(name) break out = out + entry return out def ioctl(self, fd, cmd, arg): """Stub for 'ioctl' syscall Return the list of element to pack back depending on target ioctl If the ioctl is disallowed, return False """ allowed = False disallowed = False for test in [(fd, cmd), (None, cmd), (fd, None)]: if test in self.ioctl_allowed: allowed = True if test in self.ioctl_disallowed: disallowed = True if allowed and disallowed: raise ValueError("fd: %x, cmd: %x is allowed and disallowed" % (fd, cmd)) if allowed: if cmd == termios.TCGETS: return 0, 0, 0, 0 elif cmd == termios.TIOCGWINSZ: # struct winsize # { # unsigned short ws_row; /* rows, in characters */ # unsigned short ws_col; /* columns, in characters */ # unsigned short ws_xpixel; /* horizontal size, pixels */ # unsigned short ws_ypixel; /* vertical size, pixels */ # }; return 1000, 360, 1000, 1000 elif cmd == termios.TIOCSWINSZ: # Ignore it return else: raise RuntimeError("Not implemented") elif disallowed: return False else: raise KeyError("Unknown ioctl fd:%x cmd:%x" % (fd, cmd)) def mmap(self, addr, len_, prot, flags, fd, off, vmmngr): """Stub for 'mmap' syscall 'mmap2' must be implemented by calling this function with off * 4096 """ if addr == 0: addr = self.mmap_current self.mmap_current += (len_ + 0x1000) & ~0xfff all_mem = vmmngr.get_all_memory() mapped = interval( [ (start, start + info["size"] - 1) for start, info in viewitems(all_mem) ] ) MAP_FIXED = 0x10 if flags & MAP_FIXED: # Alloc missing and override missing = interval([(addr, addr + len_ - 1)]) - mapped for start, stop in missing: vmmngr.add_memory_page( start, PAGE_READ|PAGE_WRITE, b"\x00" * (stop - start + 1), "mmap allocated" ) else: # Find first candidate segment nearby addr for start, stop in mapped: if stop < addr: continue rounded = (stop + 1 + 0x1000) & ~0xfff if (interval([(rounded, rounded + len_)]) & mapped).empty: addr = rounded break else: assert (interval([(addr, addr + len_)]) & mapped).empty vmmngr.add_memory_page( addr, PAGE_READ|PAGE_WRITE, b"\x00" * len_, "mmap allocated" ) if fd == 0xffffffff: MAP_ANONYMOUS = 0x20 # mman.h # fd and offset are ignored if MAP_ANONYMOUS flag is present if not(flags & MAP_ANONYMOUS) and off != 0: raise RuntimeError("Not implemented") data = b"\x00" * len_ else: fdesc = self.file_descriptors[fd] cur_pos = fdesc.tell() fdesc.seek(off) data = fdesc.read(len_) fdesc.seek(cur_pos) vmmngr.set_mem(addr, data) return addr def brk(self, addr, vmmngr): """Stub for 'brk' syscall""" if addr == 0: addr = self.brk_current else: all_mem = vmmngr.get_all_memory() mapped = interval( [ (start, start + info["size"] - 1) for start, info in viewitems(all_mem) ] ) # Alloc missing and override missing = interval([(self.brk_current, addr)]) - mapped for start, stop in missing: vmmngr.add_memory_page( start, PAGE_READ|PAGE_WRITE, b"\x00" * (stop - start + 1), "BRK" ) self.brk_current = addr return addr
Ancestors (in MRO)
- LinuxEnvironment
- builtins.object
Class variables
var O_ACCMODE
var O_CLOEXEC
var O_DIRECTORY
var O_LARGEFILE
var O_NONBLOCK
var O_RDONLY
var base_time
var brk_current
var file_descriptors
var filesystem_base
var ioctl_allowed
var ioctl_disallowed
var mmap_current
var platform_arch
var process_pid
var process_tid
var sys_machine
var sys_nodename
var sys_release
var sys_sysname
var sys_version
var user_egid
var user_euid
var user_gid
var user_name
var user_uid
Static methods
def __init__(
self)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self): stdin = FileDescriptorSTDIN(0) stdout = FileDescriptorSTDOUT(1) stderr = FileDescriptorSTDERR(2) for std in [stdin, stdout, stderr]: std.uid = self.user_uid std.gid = self.user_gid self.file_descriptors = { 0: stdin, 1: stdout, 2: stderr, } self.ioctl_allowed = [ (0, termios.TCGETS), (0, termios.TIOCGWINSZ), (0, termios.TIOCSWINSZ), (1, termios.TCGETS), (1, termios.TIOCGWINSZ), (1, termios.TIOCSWINSZ), ] self.ioctl_disallowed = [ (2, termios.TCGETS), (0, termios.TCSETSW), ] self.filesystem = FileSystem(self.filesystem_base, self) self.network = Networking(self)
def brk(
self, addr, vmmngr)
Stub for 'brk' syscall
def brk(self, addr, vmmngr): """Stub for 'brk' syscall""" if addr == 0: addr = self.brk_current else: all_mem = vmmngr.get_all_memory() mapped = interval( [ (start, start + info["size"] - 1) for start, info in viewitems(all_mem) ] ) # Alloc missing and override missing = interval([(self.brk_current, addr)]) - mapped for start, stop in missing: vmmngr.add_memory_page( start, PAGE_READ|PAGE_WRITE, b"\x00" * (stop - start + 1), "BRK" ) self.brk_current = addr return addr
def clock_gettime(
self)
def clock_gettime(self): out = self.base_time self.base_time += 1 return out
def close(
self, fd)
Stub for 'close' syscall
def close(self, fd): """Stub for 'close' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.close()
def fstat(
self, fd)
Get file status through fd
def fstat(self, fd): """Get file status through fd""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.stat()
def getdents(
self, fd, count, packing_callback)
Stub for 'getdents' syscall
'getdents64' must be handled by caller (only the structure layout is modified)
@fd: getdents' fd argument @count: getdents' count argument @packing_callback(cur_len, d_ino, d_type, name) -> entry
def getdents(self, fd, count, packing_callback): """Stub for 'getdents' syscall 'getdents64' must be handled by caller (only the structure layout is modified) @fd: getdents' fd argument @count: getdents' count argument @packing_callback(cur_len, d_ino, d_type, name) -> entry """ fdesc = self.file_descriptors[fd] if not isinstance(fdesc, FileDescriptorDirectory): raise RuntimeError("Not implemented") out = b"" # fdesc.listdir continues from where it stopped for name in fdesc.listdir(): d_ino = 1 # Not the real one d_type = 0 # DT_UNKNOWN (getdents(2) "All applications must properly # handle a return of DT_UNKNOWN.") entry = packing_callback(len(out), d_ino, d_type, name) if len(out) + len(entry) > count: # Report to a further call fdesc.cur_listdir.append(name) break out = out + entry return out
def ioctl(
self, fd, cmd, arg)
Stub for 'ioctl' syscall Return the list of element to pack back depending on target ioctl If the ioctl is disallowed, return False
def ioctl(self, fd, cmd, arg): """Stub for 'ioctl' syscall Return the list of element to pack back depending on target ioctl If the ioctl is disallowed, return False """ allowed = False disallowed = False for test in [(fd, cmd), (None, cmd), (fd, None)]: if test in self.ioctl_allowed: allowed = True if test in self.ioctl_disallowed: disallowed = True if allowed and disallowed: raise ValueError("fd: %x, cmd: %x is allowed and disallowed" % (fd, cmd)) if allowed: if cmd == termios.TCGETS: return 0, 0, 0, 0 elif cmd == termios.TIOCGWINSZ: # struct winsize # { # unsigned short ws_row; /* rows, in characters */ # unsigned short ws_col; /* columns, in characters */ # unsigned short ws_xpixel; /* horizontal size, pixels */ # unsigned short ws_ypixel; /* vertical size, pixels */ # }; return 1000, 360, 1000, 1000 elif cmd == termios.TIOCSWINSZ: # Ignore it return else: raise RuntimeError("Not implemented") elif disallowed: return False else: raise KeyError("Unknown ioctl fd:%x cmd:%x" % (fd, cmd))
def lstat(
self, path)
Get file status through path (not following links)
def lstat(self, path): """Get file status through path (not following links)""" return self.filesystem.getattr_(path, follow_link=False)
def mmap(
self, addr, len_, prot, flags, fd, off, vmmngr)
Stub for 'mmap' syscall
'mmap2' must be implemented by calling this function with off * 4096
def mmap(self, addr, len_, prot, flags, fd, off, vmmngr): """Stub for 'mmap' syscall 'mmap2' must be implemented by calling this function with off * 4096 """ if addr == 0: addr = self.mmap_current self.mmap_current += (len_ + 0x1000) & ~0xfff all_mem = vmmngr.get_all_memory() mapped = interval( [ (start, start + info["size"] - 1) for start, info in viewitems(all_mem) ] ) MAP_FIXED = 0x10 if flags & MAP_FIXED: # Alloc missing and override missing = interval([(addr, addr + len_ - 1)]) - mapped for start, stop in missing: vmmngr.add_memory_page( start, PAGE_READ|PAGE_WRITE, b"\x00" * (stop - start + 1), "mmap allocated" ) else: # Find first candidate segment nearby addr for start, stop in mapped: if stop < addr: continue rounded = (stop + 1 + 0x1000) & ~0xfff if (interval([(rounded, rounded + len_)]) & mapped).empty: addr = rounded break else: assert (interval([(addr, addr + len_)]) & mapped).empty vmmngr.add_memory_page( addr, PAGE_READ|PAGE_WRITE, b"\x00" * len_, "mmap allocated" ) if fd == 0xffffffff: MAP_ANONYMOUS = 0x20 # mman.h # fd and offset are ignored if MAP_ANONYMOUS flag is present if not(flags & MAP_ANONYMOUS) and off != 0: raise RuntimeError("Not implemented") data = b"\x00" * len_ else: fdesc = self.file_descriptors[fd] cur_pos = fdesc.tell() fdesc.seek(off) data = fdesc.read(len_) fdesc.seek(cur_pos) vmmngr.set_mem(addr, data) return addr
def next_fd(
self)
def next_fd(self): return len(self.file_descriptors)
def open_(
self, path, flags, follow_link=True)
Stub for 'open' syscall
def open_(self, path, flags, follow_link=True): """Stub for 'open' syscall""" return self.filesystem.open_(path, flags, follow_link=follow_link)
def read(
self, fd, count)
Stub for 'read' syscall
def read(self, fd, count): """Stub for 'read' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.read(count)
def socket(
self, family, type_, protocol)
Stub for 'socket' syscall
def socket(self, family, type_, protocol): """Stub for 'socket' syscall""" return self.network.socket(family, type_, protocol)
def stat(
self, path)
Get file status through path
def stat(self, path): """Get file status through path""" return self.filesystem.getattr_(path)
def write(
self, fd, data)
Stub for 'write' syscall
def write(self, fd, data): """Stub for 'write' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None fdesc.write(data) return len(data)
Instance variables
var file_descriptors
var filesystem
var ioctl_allowed
var ioctl_disallowed
var network
class LinuxEnvironment_arml
A LinuxEnvironment regroups information to simulate a Linux-like environment
class LinuxEnvironment_arml(LinuxEnvironment): platform_arch = b"arml" sys_machine = b"arml" O_ACCMODE = 0x3 O_CLOEXEC = 0x80000 O_DIRECTORY = 0x4000 O_LARGEFILE = 0x20000 O_NONBLOCK = 0x800 O_RDONLY = 0 # ARM specific tls = 0 # get_tls: __kuser_helper_version >= 1 # cmpxchg: __kuser_helper_version >= 2 # memory_barrier: __kuser_helper_version >= 3 kuser_helper_version = 3
Ancestors (in MRO)
- LinuxEnvironment_arml
- LinuxEnvironment
- builtins.object
Class variables
var O_ACCMODE
var O_CLOEXEC
var O_DIRECTORY
var O_LARGEFILE
var O_NONBLOCK
var O_RDONLY
var base_time
var brk_current
var file_descriptors
var filesystem_base
var ioctl_allowed
var ioctl_disallowed
var kuser_helper_version
var mmap_current
var process_pid
var process_tid
var sys_machine
var sys_nodename
var sys_release
var sys_sysname
var sys_version
var tls
var user_egid
var user_euid
var user_gid
var user_name
var user_uid
Static methods
def __init__(
self)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self): stdin = FileDescriptorSTDIN(0) stdout = FileDescriptorSTDOUT(1) stderr = FileDescriptorSTDERR(2) for std in [stdin, stdout, stderr]: std.uid = self.user_uid std.gid = self.user_gid self.file_descriptors = { 0: stdin, 1: stdout, 2: stderr, } self.ioctl_allowed = [ (0, termios.TCGETS), (0, termios.TIOCGWINSZ), (0, termios.TIOCSWINSZ), (1, termios.TCGETS), (1, termios.TIOCGWINSZ), (1, termios.TIOCSWINSZ), ] self.ioctl_disallowed = [ (2, termios.TCGETS), (0, termios.TCSETSW), ] self.filesystem = FileSystem(self.filesystem_base, self) self.network = Networking(self)
def brk(
self, addr, vmmngr)
Stub for 'brk' syscall
def brk(self, addr, vmmngr): """Stub for 'brk' syscall""" if addr == 0: addr = self.brk_current else: all_mem = vmmngr.get_all_memory() mapped = interval( [ (start, start + info["size"] - 1) for start, info in viewitems(all_mem) ] ) # Alloc missing and override missing = interval([(self.brk_current, addr)]) - mapped for start, stop in missing: vmmngr.add_memory_page( start, PAGE_READ|PAGE_WRITE, b"\x00" * (stop - start + 1), "BRK" ) self.brk_current = addr return addr
def clock_gettime(
self)
def clock_gettime(self): out = self.base_time self.base_time += 1 return out
def close(
self, fd)
Stub for 'close' syscall
def close(self, fd): """Stub for 'close' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.close()
def fstat(
self, fd)
Get file status through fd
def fstat(self, fd): """Get file status through fd""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.stat()
def getdents(
self, fd, count, packing_callback)
Stub for 'getdents' syscall
'getdents64' must be handled by caller (only the structure layout is modified)
@fd: getdents' fd argument @count: getdents' count argument @packing_callback(cur_len, d_ino, d_type, name) -> entry
def getdents(self, fd, count, packing_callback): """Stub for 'getdents' syscall 'getdents64' must be handled by caller (only the structure layout is modified) @fd: getdents' fd argument @count: getdents' count argument @packing_callback(cur_len, d_ino, d_type, name) -> entry """ fdesc = self.file_descriptors[fd] if not isinstance(fdesc, FileDescriptorDirectory): raise RuntimeError("Not implemented") out = b"" # fdesc.listdir continues from where it stopped for name in fdesc.listdir(): d_ino = 1 # Not the real one d_type = 0 # DT_UNKNOWN (getdents(2) "All applications must properly # handle a return of DT_UNKNOWN.") entry = packing_callback(len(out), d_ino, d_type, name) if len(out) + len(entry) > count: # Report to a further call fdesc.cur_listdir.append(name) break out = out + entry return out
def ioctl(
self, fd, cmd, arg)
Stub for 'ioctl' syscall Return the list of element to pack back depending on target ioctl If the ioctl is disallowed, return False
def ioctl(self, fd, cmd, arg): """Stub for 'ioctl' syscall Return the list of element to pack back depending on target ioctl If the ioctl is disallowed, return False """ allowed = False disallowed = False for test in [(fd, cmd), (None, cmd), (fd, None)]: if test in self.ioctl_allowed: allowed = True if test in self.ioctl_disallowed: disallowed = True if allowed and disallowed: raise ValueError("fd: %x, cmd: %x is allowed and disallowed" % (fd, cmd)) if allowed: if cmd == termios.TCGETS: return 0, 0, 0, 0 elif cmd == termios.TIOCGWINSZ: # struct winsize # { # unsigned short ws_row; /* rows, in characters */ # unsigned short ws_col; /* columns, in characters */ # unsigned short ws_xpixel; /* horizontal size, pixels */ # unsigned short ws_ypixel; /* vertical size, pixels */ # }; return 1000, 360, 1000, 1000 elif cmd == termios.TIOCSWINSZ: # Ignore it return else: raise RuntimeError("Not implemented") elif disallowed: return False else: raise KeyError("Unknown ioctl fd:%x cmd:%x" % (fd, cmd))
def lstat(
self, path)
Get file status through path (not following links)
def lstat(self, path): """Get file status through path (not following links)""" return self.filesystem.getattr_(path, follow_link=False)
def mmap(
self, addr, len_, prot, flags, fd, off, vmmngr)
Stub for 'mmap' syscall
'mmap2' must be implemented by calling this function with off * 4096
def mmap(self, addr, len_, prot, flags, fd, off, vmmngr): """Stub for 'mmap' syscall 'mmap2' must be implemented by calling this function with off * 4096 """ if addr == 0: addr = self.mmap_current self.mmap_current += (len_ + 0x1000) & ~0xfff all_mem = vmmngr.get_all_memory() mapped = interval( [ (start, start + info["size"] - 1) for start, info in viewitems(all_mem) ] ) MAP_FIXED = 0x10 if flags & MAP_FIXED: # Alloc missing and override missing = interval([(addr, addr + len_ - 1)]) - mapped for start, stop in missing: vmmngr.add_memory_page( start, PAGE_READ|PAGE_WRITE, b"\x00" * (stop - start + 1), "mmap allocated" ) else: # Find first candidate segment nearby addr for start, stop in mapped: if stop < addr: continue rounded = (stop + 1 + 0x1000) & ~0xfff if (interval([(rounded, rounded + len_)]) & mapped).empty: addr = rounded break else: assert (interval([(addr, addr + len_)]) & mapped).empty vmmngr.add_memory_page( addr, PAGE_READ|PAGE_WRITE, b"\x00" * len_, "mmap allocated" ) if fd == 0xffffffff: MAP_ANONYMOUS = 0x20 # mman.h # fd and offset are ignored if MAP_ANONYMOUS flag is present if not(flags & MAP_ANONYMOUS) and off != 0: raise RuntimeError("Not implemented") data = b"\x00" * len_ else: fdesc = self.file_descriptors[fd] cur_pos = fdesc.tell() fdesc.seek(off) data = fdesc.read(len_) fdesc.seek(cur_pos) vmmngr.set_mem(addr, data) return addr
def next_fd(
self)
def next_fd(self): return len(self.file_descriptors)
def open_(
self, path, flags, follow_link=True)
Stub for 'open' syscall
def open_(self, path, flags, follow_link=True): """Stub for 'open' syscall""" return self.filesystem.open_(path, flags, follow_link=follow_link)
def read(
self, fd, count)
Stub for 'read' syscall
def read(self, fd, count): """Stub for 'read' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.read(count)
def socket(
self, family, type_, protocol)
Stub for 'socket' syscall
def socket(self, family, type_, protocol): """Stub for 'socket' syscall""" return self.network.socket(family, type_, protocol)
def stat(
self, path)
Get file status through path
def stat(self, path): """Get file status through path""" return self.filesystem.getattr_(path)
def write(
self, fd, data)
Stub for 'write' syscall
def write(self, fd, data): """Stub for 'write' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None fdesc.write(data) return len(data)
class LinuxEnvironment_x86_32
A LinuxEnvironment regroups information to simulate a Linux-like environment
class LinuxEnvironment_x86_32(LinuxEnvironment): platform_arch = b"x86_32" sys_machine = b"x86_32"
Ancestors (in MRO)
- LinuxEnvironment_x86_32
- LinuxEnvironment
- builtins.object
Class variables
var O_ACCMODE
var O_CLOEXEC
var O_DIRECTORY
var O_LARGEFILE
var O_NONBLOCK
var O_RDONLY
var base_time
var brk_current
var file_descriptors
var filesystem_base
var ioctl_allowed
var ioctl_disallowed
var mmap_current
var process_pid
var process_tid
var sys_machine
var sys_nodename
var sys_release
var sys_sysname
var sys_version
var user_egid
var user_euid
var user_gid
var user_name
var user_uid
Static methods
def __init__(
self)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self): stdin = FileDescriptorSTDIN(0) stdout = FileDescriptorSTDOUT(1) stderr = FileDescriptorSTDERR(2) for std in [stdin, stdout, stderr]: std.uid = self.user_uid std.gid = self.user_gid self.file_descriptors = { 0: stdin, 1: stdout, 2: stderr, } self.ioctl_allowed = [ (0, termios.TCGETS), (0, termios.TIOCGWINSZ), (0, termios.TIOCSWINSZ), (1, termios.TCGETS), (1, termios.TIOCGWINSZ), (1, termios.TIOCSWINSZ), ] self.ioctl_disallowed = [ (2, termios.TCGETS), (0, termios.TCSETSW), ] self.filesystem = FileSystem(self.filesystem_base, self) self.network = Networking(self)
def brk(
self, addr, vmmngr)
Stub for 'brk' syscall
def brk(self, addr, vmmngr): """Stub for 'brk' syscall""" if addr == 0: addr = self.brk_current else: all_mem = vmmngr.get_all_memory() mapped = interval( [ (start, start + info["size"] - 1) for start, info in viewitems(all_mem) ] ) # Alloc missing and override missing = interval([(self.brk_current, addr)]) - mapped for start, stop in missing: vmmngr.add_memory_page( start, PAGE_READ|PAGE_WRITE, b"\x00" * (stop - start + 1), "BRK" ) self.brk_current = addr return addr
def clock_gettime(
self)
def clock_gettime(self): out = self.base_time self.base_time += 1 return out
def close(
self, fd)
Stub for 'close' syscall
def close(self, fd): """Stub for 'close' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.close()
def fstat(
self, fd)
Get file status through fd
def fstat(self, fd): """Get file status through fd""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.stat()
def getdents(
self, fd, count, packing_callback)
Stub for 'getdents' syscall
'getdents64' must be handled by caller (only the structure layout is modified)
@fd: getdents' fd argument @count: getdents' count argument @packing_callback(cur_len, d_ino, d_type, name) -> entry
def getdents(self, fd, count, packing_callback): """Stub for 'getdents' syscall 'getdents64' must be handled by caller (only the structure layout is modified) @fd: getdents' fd argument @count: getdents' count argument @packing_callback(cur_len, d_ino, d_type, name) -> entry """ fdesc = self.file_descriptors[fd] if not isinstance(fdesc, FileDescriptorDirectory): raise RuntimeError("Not implemented") out = b"" # fdesc.listdir continues from where it stopped for name in fdesc.listdir(): d_ino = 1 # Not the real one d_type = 0 # DT_UNKNOWN (getdents(2) "All applications must properly # handle a return of DT_UNKNOWN.") entry = packing_callback(len(out), d_ino, d_type, name) if len(out) + len(entry) > count: # Report to a further call fdesc.cur_listdir.append(name) break out = out + entry return out
def ioctl(
self, fd, cmd, arg)
Stub for 'ioctl' syscall Return the list of element to pack back depending on target ioctl If the ioctl is disallowed, return False
def ioctl(self, fd, cmd, arg): """Stub for 'ioctl' syscall Return the list of element to pack back depending on target ioctl If the ioctl is disallowed, return False """ allowed = False disallowed = False for test in [(fd, cmd), (None, cmd), (fd, None)]: if test in self.ioctl_allowed: allowed = True if test in self.ioctl_disallowed: disallowed = True if allowed and disallowed: raise ValueError("fd: %x, cmd: %x is allowed and disallowed" % (fd, cmd)) if allowed: if cmd == termios.TCGETS: return 0, 0, 0, 0 elif cmd == termios.TIOCGWINSZ: # struct winsize # { # unsigned short ws_row; /* rows, in characters */ # unsigned short ws_col; /* columns, in characters */ # unsigned short ws_xpixel; /* horizontal size, pixels */ # unsigned short ws_ypixel; /* vertical size, pixels */ # }; return 1000, 360, 1000, 1000 elif cmd == termios.TIOCSWINSZ: # Ignore it return else: raise RuntimeError("Not implemented") elif disallowed: return False else: raise KeyError("Unknown ioctl fd:%x cmd:%x" % (fd, cmd))
def lstat(
self, path)
Get file status through path (not following links)
def lstat(self, path): """Get file status through path (not following links)""" return self.filesystem.getattr_(path, follow_link=False)
def mmap(
self, addr, len_, prot, flags, fd, off, vmmngr)
Stub for 'mmap' syscall
'mmap2' must be implemented by calling this function with off * 4096
def mmap(self, addr, len_, prot, flags, fd, off, vmmngr): """Stub for 'mmap' syscall 'mmap2' must be implemented by calling this function with off * 4096 """ if addr == 0: addr = self.mmap_current self.mmap_current += (len_ + 0x1000) & ~0xfff all_mem = vmmngr.get_all_memory() mapped = interval( [ (start, start + info["size"] - 1) for start, info in viewitems(all_mem) ] ) MAP_FIXED = 0x10 if flags & MAP_FIXED: # Alloc missing and override missing = interval([(addr, addr + len_ - 1)]) - mapped for start, stop in missing: vmmngr.add_memory_page( start, PAGE_READ|PAGE_WRITE, b"\x00" * (stop - start + 1), "mmap allocated" ) else: # Find first candidate segment nearby addr for start, stop in mapped: if stop < addr: continue rounded = (stop + 1 + 0x1000) & ~0xfff if (interval([(rounded, rounded + len_)]) & mapped).empty: addr = rounded break else: assert (interval([(addr, addr + len_)]) & mapped).empty vmmngr.add_memory_page( addr, PAGE_READ|PAGE_WRITE, b"\x00" * len_, "mmap allocated" ) if fd == 0xffffffff: MAP_ANONYMOUS = 0x20 # mman.h # fd and offset are ignored if MAP_ANONYMOUS flag is present if not(flags & MAP_ANONYMOUS) and off != 0: raise RuntimeError("Not implemented") data = b"\x00" * len_ else: fdesc = self.file_descriptors[fd] cur_pos = fdesc.tell() fdesc.seek(off) data = fdesc.read(len_) fdesc.seek(cur_pos) vmmngr.set_mem(addr, data) return addr
def next_fd(
self)
def next_fd(self): return len(self.file_descriptors)
def open_(
self, path, flags, follow_link=True)
Stub for 'open' syscall
def open_(self, path, flags, follow_link=True): """Stub for 'open' syscall""" return self.filesystem.open_(path, flags, follow_link=follow_link)
def read(
self, fd, count)
Stub for 'read' syscall
def read(self, fd, count): """Stub for 'read' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.read(count)
def socket(
self, family, type_, protocol)
Stub for 'socket' syscall
def socket(self, family, type_, protocol): """Stub for 'socket' syscall""" return self.network.socket(family, type_, protocol)
def stat(
self, path)
Get file status through path
def stat(self, path): """Get file status through path""" return self.filesystem.getattr_(path)
def write(
self, fd, data)
Stub for 'write' syscall
def write(self, fd, data): """Stub for 'write' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None fdesc.write(data) return len(data)
class LinuxEnvironment_x86_64
A LinuxEnvironment regroups information to simulate a Linux-like environment
class LinuxEnvironment_x86_64(LinuxEnvironment): platform_arch = b"x86_64" sys_machine = b"x86_64" O_ACCMODE = 0x3 O_CLOEXEC = 0x80000 O_DIRECTORY = 0x10000 O_LARGEFILE = 0x8000 O_NONBLOCK = 0x800 O_RDONLY = 0
Ancestors (in MRO)
- LinuxEnvironment_x86_64
- LinuxEnvironment
- builtins.object
Class variables
var O_ACCMODE
var O_CLOEXEC
var O_DIRECTORY
var O_LARGEFILE
var O_NONBLOCK
var O_RDONLY
var base_time
var brk_current
var file_descriptors
var filesystem_base
var ioctl_allowed
var ioctl_disallowed
var mmap_current
var process_pid
var process_tid
var sys_machine
var sys_nodename
var sys_release
var sys_sysname
var sys_version
var user_egid
var user_euid
var user_gid
var user_name
var user_uid
Static methods
def __init__(
self)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self): stdin = FileDescriptorSTDIN(0) stdout = FileDescriptorSTDOUT(1) stderr = FileDescriptorSTDERR(2) for std in [stdin, stdout, stderr]: std.uid = self.user_uid std.gid = self.user_gid self.file_descriptors = { 0: stdin, 1: stdout, 2: stderr, } self.ioctl_allowed = [ (0, termios.TCGETS), (0, termios.TIOCGWINSZ), (0, termios.TIOCSWINSZ), (1, termios.TCGETS), (1, termios.TIOCGWINSZ), (1, termios.TIOCSWINSZ), ] self.ioctl_disallowed = [ (2, termios.TCGETS), (0, termios.TCSETSW), ] self.filesystem = FileSystem(self.filesystem_base, self) self.network = Networking(self)
def brk(
self, addr, vmmngr)
Stub for 'brk' syscall
def brk(self, addr, vmmngr): """Stub for 'brk' syscall""" if addr == 0: addr = self.brk_current else: all_mem = vmmngr.get_all_memory() mapped = interval( [ (start, start + info["size"] - 1) for start, info in viewitems(all_mem) ] ) # Alloc missing and override missing = interval([(self.brk_current, addr)]) - mapped for start, stop in missing: vmmngr.add_memory_page( start, PAGE_READ|PAGE_WRITE, b"\x00" * (stop - start + 1), "BRK" ) self.brk_current = addr return addr
def clock_gettime(
self)
def clock_gettime(self): out = self.base_time self.base_time += 1 return out
def close(
self, fd)
Stub for 'close' syscall
def close(self, fd): """Stub for 'close' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.close()
def fstat(
self, fd)
Get file status through fd
def fstat(self, fd): """Get file status through fd""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.stat()
def getdents(
self, fd, count, packing_callback)
Stub for 'getdents' syscall
'getdents64' must be handled by caller (only the structure layout is modified)
@fd: getdents' fd argument @count: getdents' count argument @packing_callback(cur_len, d_ino, d_type, name) -> entry
def getdents(self, fd, count, packing_callback): """Stub for 'getdents' syscall 'getdents64' must be handled by caller (only the structure layout is modified) @fd: getdents' fd argument @count: getdents' count argument @packing_callback(cur_len, d_ino, d_type, name) -> entry """ fdesc = self.file_descriptors[fd] if not isinstance(fdesc, FileDescriptorDirectory): raise RuntimeError("Not implemented") out = b"" # fdesc.listdir continues from where it stopped for name in fdesc.listdir(): d_ino = 1 # Not the real one d_type = 0 # DT_UNKNOWN (getdents(2) "All applications must properly # handle a return of DT_UNKNOWN.") entry = packing_callback(len(out), d_ino, d_type, name) if len(out) + len(entry) > count: # Report to a further call fdesc.cur_listdir.append(name) break out = out + entry return out
def ioctl(
self, fd, cmd, arg)
Stub for 'ioctl' syscall Return the list of element to pack back depending on target ioctl If the ioctl is disallowed, return False
def ioctl(self, fd, cmd, arg): """Stub for 'ioctl' syscall Return the list of element to pack back depending on target ioctl If the ioctl is disallowed, return False """ allowed = False disallowed = False for test in [(fd, cmd), (None, cmd), (fd, None)]: if test in self.ioctl_allowed: allowed = True if test in self.ioctl_disallowed: disallowed = True if allowed and disallowed: raise ValueError("fd: %x, cmd: %x is allowed and disallowed" % (fd, cmd)) if allowed: if cmd == termios.TCGETS: return 0, 0, 0, 0 elif cmd == termios.TIOCGWINSZ: # struct winsize # { # unsigned short ws_row; /* rows, in characters */ # unsigned short ws_col; /* columns, in characters */ # unsigned short ws_xpixel; /* horizontal size, pixels */ # unsigned short ws_ypixel; /* vertical size, pixels */ # }; return 1000, 360, 1000, 1000 elif cmd == termios.TIOCSWINSZ: # Ignore it return else: raise RuntimeError("Not implemented") elif disallowed: return False else: raise KeyError("Unknown ioctl fd:%x cmd:%x" % (fd, cmd))
def lstat(
self, path)
Get file status through path (not following links)
def lstat(self, path): """Get file status through path (not following links)""" return self.filesystem.getattr_(path, follow_link=False)
def mmap(
self, addr, len_, prot, flags, fd, off, vmmngr)
Stub for 'mmap' syscall
'mmap2' must be implemented by calling this function with off * 4096
def mmap(self, addr, len_, prot, flags, fd, off, vmmngr): """Stub for 'mmap' syscall 'mmap2' must be implemented by calling this function with off * 4096 """ if addr == 0: addr = self.mmap_current self.mmap_current += (len_ + 0x1000) & ~0xfff all_mem = vmmngr.get_all_memory() mapped = interval( [ (start, start + info["size"] - 1) for start, info in viewitems(all_mem) ] ) MAP_FIXED = 0x10 if flags & MAP_FIXED: # Alloc missing and override missing = interval([(addr, addr + len_ - 1)]) - mapped for start, stop in missing: vmmngr.add_memory_page( start, PAGE_READ|PAGE_WRITE, b"\x00" * (stop - start + 1), "mmap allocated" ) else: # Find first candidate segment nearby addr for start, stop in mapped: if stop < addr: continue rounded = (stop + 1 + 0x1000) & ~0xfff if (interval([(rounded, rounded + len_)]) & mapped).empty: addr = rounded break else: assert (interval([(addr, addr + len_)]) & mapped).empty vmmngr.add_memory_page( addr, PAGE_READ|PAGE_WRITE, b"\x00" * len_, "mmap allocated" ) if fd == 0xffffffff: MAP_ANONYMOUS = 0x20 # mman.h # fd and offset are ignored if MAP_ANONYMOUS flag is present if not(flags & MAP_ANONYMOUS) and off != 0: raise RuntimeError("Not implemented") data = b"\x00" * len_ else: fdesc = self.file_descriptors[fd] cur_pos = fdesc.tell() fdesc.seek(off) data = fdesc.read(len_) fdesc.seek(cur_pos) vmmngr.set_mem(addr, data) return addr
def next_fd(
self)
def next_fd(self): return len(self.file_descriptors)
def open_(
self, path, flags, follow_link=True)
Stub for 'open' syscall
def open_(self, path, flags, follow_link=True): """Stub for 'open' syscall""" return self.filesystem.open_(path, flags, follow_link=follow_link)
def read(
self, fd, count)
Stub for 'read' syscall
def read(self, fd, count): """Stub for 'read' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None return fdesc.read(count)
def socket(
self, family, type_, protocol)
Stub for 'socket' syscall
def socket(self, family, type_, protocol): """Stub for 'socket' syscall""" return self.network.socket(family, type_, protocol)
def stat(
self, path)
Get file status through path
def stat(self, path): """Get file status through path""" return self.filesystem.getattr_(path)
def write(
self, fd, data)
Stub for 'write' syscall
def write(self, fd, data): """Stub for 'write' syscall""" fdesc = self.file_descriptors.get(fd) if fdesc is None: return None fdesc.write(data) return len(data)
class Networking
Network abstraction
class Networking(object): """Network abstraction""" def __init__(self, linux_env): self.linux_env = linux_env def socket(self, family, type_, protocol): fd = self.linux_env.next_fd() fdesc = FileDescriptorSocket(fd, family, type_, protocol) self.linux_env.file_descriptors[fd] = fdesc return fd
Ancestors (in MRO)
- Networking
- builtins.object
Static methods
def __init__(
self, linux_env)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, linux_env): self.linux_env = linux_env
def socket(
self, family, type_, protocol)
def socket(self, family, type_, protocol): fd = self.linux_env.next_fd() fdesc = FileDescriptorSocket(fd, family, type_, protocol) self.linux_env.file_descriptors[fd] = fdesc return fd
Instance variables
var linux_env
class REGEXP_T
Compiled regular expression object.
Ancestors (in MRO)
- REGEXP_T
- builtins.object
Class variables
var flags
var groupindex
var groups
var pattern
class StatFSInfo
StatFSInfo(f_type, f_bsize, f_blocks, f_bfree, f_bavail, f_files, f_ffree, f_fsid, f_namelen, f_frsize, f_flags, f_spare)
Ancestors (in MRO)
- StatFSInfo
- builtins.tuple
- builtins.object
Class variables
var f_bavail
var f_bfree
var f_blocks
var f_bsize
var f_ffree
var f_files
var f_flags
var f_frsize
var f_fsid
var f_namelen
var f_spare
var f_type
class StatInfo
StatInfo(st_dev, st_ino, st_nlink, st_mode, st_uid, st_gid, st_rdev, st_size, st_blksize, st_blocks, st_atime, st_atimensec, st_mtime, st_mtimensec, st_ctime, st_ctimensec)
Ancestors (in MRO)
- StatInfo
- builtins.tuple
- builtins.object
Class variables
var st_atime
var st_atimensec
var st_blksize
var st_blocks
var st_ctime
var st_ctimensec
var st_dev
var st_gid
var st_ino
var st_mode
var st_mtime
var st_mtimensec
var st_nlink
var st_rdev
var st_size
var st_uid