zio源码

#!/usr/bin/env python

from __future__ import print_function
from __future__ import division  #在较旧的Python版本中,对整数执行常规除法运算的结果与使用双斜杠类似。

__version__ = "2.1.1"
__project__ = "https://github.com/zTrix/zio"

import os
import sys
import re
import struct
import functools
import socket
import signal
import ast
import time
import datetime
import errno
import select
import binascii
import tempfile
# for ProcessIO below
import pty
import shlex
import fcntl
import gc
import atexit
import resource
import termios
import tty
# works for python2.6 python2.7 and python3
from distutils.spawn import find_executable


python_version_major = sys.version_info[0]      # do not use sys.version_info.major which is not available in python2.6

# python2 python3 shim
if python_version_major < 3:
    input = raw_input           # pylint: disable=undefined-variable

    class TimeoutError(OSError): pass   # from ptyprocess.py, issubclass(TimeoutError, OSError) == True
else:
    unicode = str
    unichr = chr

try:
    from io import BytesIO
except ImportError:
    from StringIO import StringIO as BytesIO

if True:
    # termcolor handled using bytes instead of unicode
    # since termcolor use MIT license, SATA license above should be OK
    ATTRIBUTES = dict( list(zip([ 'bold', 'dark', '', 'underline', 'blink', '', 'reverse', 'concealed' ], list(range(1, 9)))))
    del ATTRIBUTES['']
    HIGHLIGHTS = dict( list(zip([ 'on_grey', 'on_red', 'on_green', 'on_yellow', 'on_blue', 'on_magenta', 'on_cyan', 'on_white' ], list(range(40, 48)))))
    COLORS = dict(list(zip(['grey', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white', ], list(range(30, 38)))))
    RESET = b'33[0m'

    def colored(text, color=None, on_color=None, attrs=None):
        fmt_str = b'33[%dm%s'
        if color is not None: text = fmt_str % (COLORS[color], text)
        if on_color is not None: text = fmt_str % (HIGHLIGHTS[on_color], text)
        if attrs is not None:
            for attr in attrs:
                text = fmt_str % (ATTRIBUTES[attr], text)

        text += RESET
        return text

# -------------------------------------------------
# =====> packing/unpacking related functions <=====

def convert_packing(endian, bits, arg, autopad=False, automod=True):
    """
    given endian, bits spec, do the following
        convert between bytes <--> int
        convert between bytes <--> [int]

    params:
        endian: < for little endian, > for big endian
        bits: bit size of packing, valid values are 8, 16, 32, 64
        arg: integer or bytes
        autopad: auto pad input string to required length if needed
    """
    pfs = {8: 'B', 16: 'H', 32: 'I', 64: 'Q'}

    if isinstance(arg, unicode):
        arg = arg.encode('latin-1')

    if isinstance(arg, bytearray):
        arg = bytes(arg)

    if isinstance(arg, bytes):      # bytes -> int or [int]
        c = bits // 8
        r = len(arg) % c
        if r != 0:
            if autopad:
                arg = arg[:len(arg) // c * c] + (arg[-r:].ljust(c, b'x00') if endian == '<' else arg[-r:].rjust(c, b'x00'))
            else:
                raise ValueError('bad input length, expected multiple of %d, got %d. Fix length manually or use autopad=True' % (c, len(arg)))
        unpacked = struct.unpack(endian + pfs[bits] * (len(arg) // c), arg)
        return list(unpacked) if len(unpacked) > 1 else unpacked[0]
    else:                           # int or [int] -> bytes
        args = list(arg) if isinstance(arg, (list, tuple)) else [arg]
        if automod:
            args = [i % (1<<bits) for i in args]
        return struct.pack(endian + pfs[bits] * len(args), *args)

l8 = functools.partial(convert_packing, '<', 8)
b8 = functools.partial(convert_packing, '>', 8)
l16 = functools.partial(convert_packing, '<', 16)
b16 = functools.partial(convert_packing, '>', 16)
l32 = functools.partial(convert_packing, '<', 32)
b32 = functools.partial(convert_packing, '>', 32)
l64 = functools.partial(convert_packing, '<', 64)
b64 = functools.partial(convert_packing, '>', 64)

# -------------------------------------------------
# =====> utility functions <=====

def bytes2hex(s):
    '''
    Union{bytes, unicode} -> bytes
    '''
    if isinstance(s, unicode):
        s = s.encode('latin-1')
    return binascii.hexlify(s)

def hex2bytes(s, autopad=False):
    '''
    bytes -> bytes
    '''
    if isinstance(s, unicode):
        s = s.encode('latin-1')
    s = s.strip()
    if len(s) % 2 == 1:
        if autopad == 'left' or autopad == True:
            s = b'0' + s
        elif autopad == 'right':
            s = s + b'0'
        else:
            raise ValueError('invalid length of hex bytes: %d, should be multiple of 2. Use autopad=True to fix automatically' % len(s))
    return binascii.unhexlify(s)

tohex = bytes2hex
unhex = hex2bytes

if python_version_major < 3:
    def xor(a, b):
        '''
        bytes -> bytes -> bytes
        the first param a must be longer than or equal to the length of the second param
        '''
        return b''.join([chr(ord(c) ^ ord(b[i % len(b)])) for i, c in enumerate(a)])
else:
    def xor(a, b):
        '''
        bytes -> bytes -> bytes
        the first param a must be longer than or equal to the length of the second param
        '''
        return bytes([c ^ b[i % len(b)] for i, c in enumerate(a)])

def is_hostport_tuple(target):
    return type(target) == tuple and len(target) == 2 and isinstance(target[1], int) and target[1] >= 0 and target[1] < 65536

def match_pattern(pattern, byte_buf):
    '''
    pattern -> byte_buf -> index span # (-1, -1) for not found)
    pattern could be bytes or re objects or lambda function which returns index span
    '''
    if isinstance(pattern, unicode):
        pattern = pattern.encode('latin-1')
    if isinstance(pattern, bytes):
        i = byte_buf.find(pattern)
        if i > -1:
            return (i, i + len(pattern))
        else:
            return (-1, -1)
    elif hasattr(pattern, 'match') and hasattr(pattern, 'search'):
        mo = pattern.search(byte_buf)
        if not mo:
            return (-1, -1)
        else:
            return mo.span()
    elif callable(pattern):
        return pattern(byte_buf)

def write_stdout(data):
    if hasattr(sys.stdout, 'buffer'):
        sys.stdout.buffer.write(data)
    else:
        if python_version_major < 3:
            sys.stdout.write(data)
        else:
            sys.stdout.write(data.decode())
    sys.stdout.flush()

def write_stderr(data):
    if hasattr(sys.stderr, 'buffer'):
        sys.stderr.buffer.write(data)
    else:
        if python_version_major < 3:
            sys.stderr.write(data)
        else:
            sys.stderr.write(data.decode())
    sys.stderr.flush()

def write_debug(f, data, show_time=True, end=b'
'):
    if not f:
        return
    if isinstance(data, unicode):
        data = data.encode('latin-1')
    if show_time:
        now = datetime.datetime.now().strftime('[%Y-%m-%d_%H:%M:%S]').encode()
        f.write(now)
        f.write(b' ')
    f.write(data)
    if end:
        f.write(end)
    f.flush()
    
def ttyraw(fd, when=tty.TCSAFLUSH, echo=False, raw_in=True, raw_out=False):
    mode = tty.tcgetattr(fd)[:]
    if raw_in:
        mode[tty.IFLAG] = mode[tty.IFLAG] & ~(tty.BRKINT | tty.ICRNL | tty.INPCK | tty.ISTRIP | tty.IXON)
        mode[tty.CFLAG] = mode[tty.CFLAG] & ~(tty.CSIZE | tty.PARENB)
        mode[tty.CFLAG] = mode[tty.CFLAG] | tty.CS8
        if echo:
            mode[tty.LFLAG] = mode[tty.LFLAG] & ~(tty.ICANON | tty.IEXTEN | tty.ISIG)
        else:
            mode[tty.LFLAG] = mode[tty.LFLAG] & ~(tty.ECHO | tty.ICANON | tty.IEXTEN | tty.ISIG)
    if raw_out:
        mode[tty.OFLAG] = mode[tty.OFLAG] & ~(tty.OPOST)
    mode[tty.CC][tty.VMIN] = 1
    mode[tty.CC][tty.VTIME] = 0
    tty.tcsetattr(fd, when, mode)

# -------------------------------------------------
# =====> zio class modes and params <=====

PIPE = 'pipe'           # io mode (process io): send all characters untouched, but use PIPE, so libc cache may apply
TTY = 'tty'             # io mode (process io): normal tty behavier, support Ctrl-C to terminate, and auto 
 to display more readable lines for human
TTY_RAW = 'ttyraw'      # io mode (process io): send all characters just untouched

def COLORED(f, color='cyan', on_color=None, attrs=None):
    return lambda s : colored(f(s), color, on_color, attrs)

# read/write transform functions
# bytes -> (printable) bytes
if python_version_major < 3:
    def REPR(s): return b'b' + repr(s) + b'
'
else:
    def REPR(s): return str(s).encode('latin-1') + b'
'

def EVAL(s):    # now you are not worried about pwning yourself, do not use ast.literal_eval because of 1. encoding issue 2. we only eval string
    st = 0      # 0 for normal, 1 for escape, 2 for xXX
    ret = []
    i = 0
    while i < len(s):
        c = s[i:i+1]    # current byte, for python2/3 compatibility
        if st == 0:
            if c == b'\':
                st = 1
            else:
                ret.append(c)
        elif st == 1:
            if c in (b'"', b"'", b"\", b"t", b"n", b"r"):
                if c == b't':
                    ret.append(b'	')
                elif c == b'n':
                    ret.append(b'
')
                elif c == b'r':
                    ret.append(b'
')
                else:
                    ret.append(c)
                st = 0
            elif c == b'x':
                st = 2
            else:
                raise ValueError('invalid repr of str %s' % s)
        else:
            num = int(s[i:i+2], 16)
            assert 0 <= num < 256
            if python_version_major < 3:
                ret.append(chr(num))
            else:
                ret.append(bytes([num]))
            st = 0
            i += 1
        i += 1
    return b''.join(ret)

def HEX(s): return bytes2hex(s) + b'
'
TOHEX = HEX
def UNHEX(s): return hex2bytes(s)

def HEXDUMP(byte_buf, width=16, indent=0):
    length = len(byte_buf)
    lines = (length // width) + (length % width != 0)
    ret = []

    printable_low = b' '
    printable_high = b'~'

    hexcode_width = 0

    for lino in range(lines):
        index_begin = lino * width
        line = byte_buf[index_begin:index_begin+width]

        prefix = format('%08x' % index_begin).encode()
        hexcode = b''
        printable = b''

        for gi in range(0, len(line), 2):
            gd = line[gi:gi+2]
            hexcode += b' ' + binascii.hexlify(gd)
            
            printable += gd[0:1] if printable_low <= gd[0:1] <= printable_high else b'.'
            if len(gd) == 2:
                printable += gd[1:2] if printable_low <= gd[1:2] <= printable_high else b'.'

        if len(hexcode) > hexcode_
            hexcode_width = len(hexcode)
        elif len(hexcode) < hexcode_
            hexcode = hexcode.ljust(hexcode_width, b' ')

        ret.append(b'%s%s:%s  %s
' % (b' ' * indent, prefix, hexcode, printable))
    return b''.join(ret)

HEXDUMP_INDENT4 = functools.partial(HEXDUMP, indent=4)
HEXDUMP_INDENT8 = functools.partial(HEXDUMP, indent=8)
HEXDUMP_INDENT16 = functools.partial(HEXDUMP, indent=16)

if python_version_major < 3:
    def BIN(s): return b' '.join([format(ord(x),'08b') for x in str(s)]) + b'
'
else:
    def BIN(s): return b' '.join([format(x,'08b').encode() for x in s]) + b'
'

def UNBIN(s, autopad=False):
    s = bytes(filter(lambda x: x in b'01', s))
    if len(s) % 8 != 0:
        extra = 8 - len(s) % 8
        if autopad == 'left' or autopad == True:
            s = (b'0' * extra) + s
        elif autopad == 'right':
            s = s + (b'0' * extra)
        else:
            raise ValueError('invalid length of 01 bytestring: %d, should be multiple of 8. Use autopad=True to fix automatically' % len(s))
    if python_version_major < 3:
        return b''.join([chr(int(s[x:x+8],2)) for x in range(0, len(s), 8)])
    else:
        return bytes([int(s[x:x+8],2) for x in range(0, len(s), 8)])

def RAW(s): return s
def NONE(s): return b''

# -------------------------------------------------
# =====> zio helper functions <=====

def select_ignoring_useless_signal(iwtd, owtd, ewtd, timeout=None):
    '''This is a wrapper around select.select() that ignores signals. If
    select.select raises a select.error exception and errno is an EINTR
    error then it is ignored. Mainly this is used to ignore sigwinch
    (terminal resize). '''

    # if select() is interrupted by a signal (errno==EINTR) then
    # we loop back and enter the select() again.
    if timeout is not None:
        end_time = time.time() + timeout
    while True:
        try:
            return select.select(iwtd, owtd, ewtd, timeout)
        except select.error as err:
            if select.error == OSError:     # python3 style
                eno = err.errno
            else:
                err = sys.exc_info()[1]     # python2 style
                eno = err[0]
            if eno == errno.EINTR:
                # if we loop back we have to subtract the
                # amount of time we already waited.
                if timeout is not None:
                    timeout = end_time - time.time()
                    if timeout < 0:
                        return([], [], [])
            else:
                # something else caused the select.error, so
                # this actually is an exception.
                raise

# zio class here
class zio(object):
    '''
    zio: unified io interface for both socket io and process io
    '''
    
    def __init__(self, target,
        # common params
        timeout=None,
        logfile=None,
        print_read=True,
        print_write=True,
        debug=None,
        # ProcessIO params 
        stdin=PIPE,
        stdout=TTY_RAW,
        cwd=None,
        env=None,
        sighup=signal.SIG_DFL,
        write_delay=0.05,
        read_echoback=True,
    ):
        """
        zio is an easy-to-use io library for pwning development, supporting an unified interface for local process pwning and remote tcp socket io
        note that zio fully operates at bytes level instead of unicode, so remember to use bytes when passing arguments to zio methods

        example:

        io = zio(('localhost', 80), print_read=COLORED(RAW, 'yellow'), print_write=HEX)
        io = zio(socket.create_connection(('127.0.0.1', 80)))

        io = zio('ls -l')
        io = zio(['ls', '-l'])

        params:
            target(required): the target object for zio to operate with, could be socket (addr, port) tuple, or connected socket object, or cmd line for spawning process
            print_read: bool | [COLORED]{NONE, RAW, REPR, HEX}, if set, transform and print all the data read from target
            print_write: bool | [COLORED]{NONE, RAW, REPR, HEX}, if set, transform and print all the data sent out
            timeout: int, the global timeout for this zio object
            logfile: where to print traffic data in or out from target, default to sys.stderr
            debug: if set to a file object(must be opened using binary mode), will provide info for debugging zio internal. leave it to None by default.
            stdin(ProcessIO only): {PIPE, TTY, TTY_RAW} which mode to choose for child process stdin, PIPE is recommended for programming interface, since you will need to take care of tty control chars by hand when call write methods if stdin set to TTY mode.
            stdout(ProcessIO only): {PIPE, TTY, TTY_RAW} which mode to choose for child process stdout
            cwd(ProcessIO only): the working directory to spawn child process
            env(ProcessIO only): env variables for child process
            write_delay(ProcessIO only): write delay for child process to prevent writing too fast
        """

        if not target:
            raise ValueError('cmdline or socket not provided for zio, try zio("ls -l")')

        self.target = target
        self.print_read = print_read
        self.print_write = print_write
        if logfile is None:
            self.logfile = sys.stderr
        else:
            self.logfile = logfile  # must be opened using 'rb'

        # zio object itself is a buffered reader/writer
        self.buffer = bytearray()

        self.debug = debug

        if isinstance(timeout, int) and timeout > 0:
            self.timeout = timeout
        else:
            self.timeout = 10

        if is_hostport_tuple(self.target) or isinstance(self.target, socket.socket):
            self.io = SocketIO(self.target, timeout=self.timeout, debug=debug)
        else:
            self.io = ProcessIO(self.target, timeout=self.timeout, debug=debug,
                stdin=stdin,
                stdout=stdout,
                cwd=cwd,
                env=env,
                sighup=sighup,
                write_delay=write_delay,
                read_echoback=read_echoback,
                )

    def log_read(self, byte_buf):
        '''
        bytes -> IO bytes
        '''
        if self.print_read and byte_buf:   # should log when byte_buf is empty bytestring
            content = self.read_transform(byte_buf)
            if hasattr(self.logfile, 'buffer'):
                self.logfile.buffer.write(content)
            else:
                self.logfile.write(content)
            self.logfile.flush()

    def log_write(self, byte_buf):
        '''
        bytes -> IO bytes
        '''
        if self.print_write and byte_buf:   # should log when byte_buf is empty bytestring
            content = self.write_transform(byte_buf)
            if hasattr(self.logfile, 'buffer'):
                self.logfile.buffer.write(content)
            else:
                self.logfile.write(content)
            self.logfile.flush()

    @property
    def print_read(self):
        return self.read_transform is not None and self.read_transform is not NONE

    @print_read.setter
    def print_read(self, value):
        if value == True:
            self.read_transform = RAW
        elif value == False:
            self.read_transform = NONE
        elif callable(value):
            self.read_transform = value
        else:
            raise ValueError('bad print_read value')
        
        assert callable(self.read_transform)

    @property
    def print_write(self):
        return self.write_transform is not None and self.write_transform is not NONE

    @print_write.setter
    def print_write(self, value):
        if value == True:
            self.write_transform = RAW
        elif value == False:
            self.write_transform = NONE
        elif callable(value):
            self.write_transform = value
        else:
            raise ValueError('bad print_read value')
        
        assert callable(self.write_transform)

    def read(self, size=None):
        '''
        if size is -1 or None, then read all bytes available until EOF
        if size is a positive integer, read exactly `size` bytes and return
        raise EOFError if EOF occurred before full size read
        raise TimeoutError if Timeout occured
        '''
        is_read_all = size is None or size < 0
        incoming = None
        
        # log buffer content first
        if self.buffer:
            if is_read_all:
                self.log_read(bytes(self.buffer))
            else:
                self.log_read(bytes(self.buffer[:size]))

        while True:
            if is_read_all or len(self.buffer) < size:
                incoming = self.io.recv(1536)
                if incoming is None:
                    if is_read_all:
                        ret = bytes(self.buffer)
                        # self.buffer.clear()   # note: python2 does not support bytearray.clear()
                        self.buffer = bytearray()
                        return ret
                    else:
                        raise EOFError('EOF occured before full size read, buffer = %r' % self.buffer)
                self.buffer.extend(incoming)

            if not is_read_all and len(self.buffer) >= size:
                if incoming:
                    self.log_read(incoming[:len(incoming) + size - len(self.buffer)])
                ret = bytes(self.buffer[:size])
                self.buffer = self.buffer[size:]
                return ret
            else:
                self.log_read(incoming)

    read_exact = read

    def read_to_end(self):
        '''
        read all data until EOF
        '''
        return self.read(size=-1)

    read_all = read_to_end
    recvall = read_to_end

    def read_line(self, keep=True):
        content = self.read_until(b'
', keep=True)
        if not keep:
            content = content.rstrip(b'
')
        return content

    readline = read_line
    recvline = read_line    # for pwntools compatibility

    def read_until(self, pattern, keep=True):
        '''
        read until some bytes pattern found
        patter could be one of following:
        1. bytes
        2. re object(must compile using bytes rather than unicode, e.g: re.compile(b"something"))
        3. callable functions return True for found and False for not found
        4. lists of things above

        raise EOFError if EOF occurred before pattern found
        '''

        if not isinstance(pattern, (list, tuple)):
            pattern_list = [pattern]
        else:
            pattern_list = pattern

        log_pos = 0

        while True:
            for p in pattern_list:
                span = match_pattern(p, self.buffer)
                if span[0] > -1: # found
                    end_pos = span[1]
                    ret = self.buffer[:end_pos] if keep == True else self.buffer[:span[0]]
                    self.log_read(bytes(self.buffer[log_pos:end_pos]))
                    self.buffer = self.buffer[end_pos:]
                    return bytes(ret)

            self.log_read(bytes(self.buffer[log_pos:]))
            log_pos = len(self.buffer)

            incoming = self.io.recv(1536)
            if incoming is None:
                raise EOFError('EOF occured before pattern match, buffer = %r' % self.buffer)

            self.buffer.extend(incoming)

    readuntil = read_until
    recv_until = read_until
    recvuntil = read_until

    def read_some(self, size=None):
        '''
        just read 1 or more available bytes (less than size) and return
        '''
        if len(self.buffer):
            if size is None or size <= 0:
                ret = bytes(self.buffer)
                self.buffer = bytearray()
            else:
                ret = bytes(self.buffer[:size])
                self.buffer = self.buffer[size:]
            self.log_read(ret)
            return ret

        ret = self.io.recv(size)
        self.log_read(ret)
        return ret

    recv = read_some

    def read_until_timeout(self, timeout=1):
        '''
        read for some timeout, return current buffer plus whatever read
        '''
        end_time = time.time() + timeout

        if self.buffer:
            self.log_read(bytes(self.buffer))

        while True:
            r, _w, _e = select_ignoring_useless_signal([self.io.rfd], [], [], timeout)
            data = None
            if self.io.rfd in r:
                data = self.io.recv(1536)
                if data is None:
                    break
                elif data:
                    self.buffer.extend(data)
                    self.log_read(data)
                    break

            timeout = end_time - time.time()
            if timeout < 0:
                break

        if len(self.buffer):
            ret = bytes(self.buffer)
            self.buffer = bytearray()
            return ret
        return b''

    read_eager = read_until_timeout

    def readable(self):
        '''
        tell wether we have some data to read
        '''
        return select_ignoring_useless_signal([self.io.rfd], [], [], 0) == ([self.io.rfd], [], [])

    def write(self, byte_buf):
        '''
        write/sendall bytes and flush them all
        '''
        if not byte_buf:
            return 0
        if isinstance(byte_buf, unicode):
            byte_buf = byte_buf.encode('latin-1')   # will raise UnicodeEncodeError if code point larger than 255
        self.log_write(bytes(byte_buf))
        self.io.send(byte_buf)
        return len(byte_buf)

    send = write    # for pwntools compatibility
    sendall = write # for socket compatibility

    def write_line(self, byte_buf):
        '''
        write byte_buf and a linesep
        '''
        if isinstance(byte_buf, unicode):
            byte_buf = byte_buf.encode('latin-1')   # will raise UnicodeEncodeError if code point larger than 255
        return self.write(byte_buf + os.linesep.encode())

    sendline = write_line
    send_line = write_line
    writeline = write_line

    def write_lines(self, sequence):
        n = 0
        for s in sequence:
            n += self.write_line(s)
        return n

    writelines = write_lines

    def write_after(self, pattern, byte_buf):
        self.read_until(pattern)
        self.write(byte_buf)

    writeafter = write_after
    sendafter = write_after

    def write_line_after(self, pattern, byte_buf):
        self.read_until(pattern)
        self.writeline(byte_buf)

    writeline_after = write_line_after  # for human mistake
    sendline_after = write_line_after   # for human mistake
    sendlineafter = write_line_after    # for pwntools compatibility

    def send_eof(self):
        '''
        notify peer that we have done writing
        '''
        self.io.send_eof()

    sendeof = send_eof
    end = send_eof      # for zio 1.0 compatibility

    def interact(self, **kwargs):
        '''
        interact with current tty stdin/stdout
        '''
        if self.buffer:
            kwargs['buffered'] = bytes(self.buffer)
            self.buffer = bytearray()
        self.io.interact(**kwargs)

    interactive = interact      # for pwntools compatibility

    def close(self):
        '''
        close underlying io and free all resources
        '''
        self.io.close()

    def is_closed(self):
        '''
        tell whether this zio object is closed
        '''
        return self.io.is_closed()

    def is_eof_seen(self):
        '''
        tell whether we have received EOF from peer end
        '''
        return self.io.eof_seen

    def is_eof_sent(self):
        '''
        tell whether we have sent EOF to the peer 
        '''
        return self.io.eof_sent

    def flush(self):
        '''
        kept to act like a file-like object
        '''
        pass

    def fileno(self):
        '''
        return underlying os fileno, act like a file-like object
        '''
        return self.io.rfd

    def mode(self):
        return self.io.mode

    def exit_status(self):
        return self.io.exit_status

    exit_code = exit_status

    def gdb_hint(self, userscript=None, breakpoints=None):
        '''
        script: str
        breakpoints: List[Union{int, (int, keyword:str)}], example: [0x400419, (0x1009, 'libc.so')]
        '''
        pid = self.io.target_pid()
        if not pid:
            input('unable to find target pid to attach gdb')
            return
        
        gdb_cmd = ['attach %d' % pid, 'set disassembly-flavor intel']

        vmmap = open('/proc/%d/maps' % pid).read()
        vmmap_lines = vmmap.splitlines()

        if breakpoints:
            for b in breakpoints:
                if isinstance(b, (tuple, list)):
                    found = False
                    for line in vmmap_lines:
                        if b[1].lower() in line.lower():
                            base = int(line.split('-')[0], 16)
                            gdb_cmd.append('b *' + hex(base + b[0]))
                            found = True
                            break
                    if not found:
                        print('[ WARN ] keyword not found for breakpoint base address: %r' % b)
                elif isinstance(b, int):
                    gdb_cmd.append('b *' + hex(b))
                elif isinstance(b, type('')):
                    gdb_cmd.append('b *' + b)
                else:
                    print('[ WARN ] bad breakpoint: %r' % b)

        if not userscript:
            userscript = ''
        if isinstance(userscript, bytes):
            userscript = userscript.decode('utf-8')

        gdb_script = '
'.join(gdb_cmd) + '

' + userscript + '
'

        tf = tempfile.NamedTemporaryFile(mode="w", suffix='.zio.gdbx')
        tf.write(gdb_script)
        tf.flush()

        hint = "gdb -x %s" % tf.name
        hint += '
use cmdline above to attach gdb then press enter to continue ... '
        input(hint)

    def __str__(self):
        return '<zio target=%s, timeout=%s, io=%s, buffer=%s>' % (self.target, self.timeout, str(self.io), self.buffer)

class SocketIO:
    mode = 'socket'

    def __init__(self, target, timeout=None, debug=None):
        self.timeout = timeout
        self.debug = debug

        if isinstance(target, socket.socket):
            self.sock = target
        else:
            self.sock = socket.create_connection(target, self.timeout)

        self.eof_seen = False
        self.eof_sent = False
        self.exit_code = None

    @property
    def rfd(self):
        return self.sock.fileno()

    @property
    def wfd(self):
        return self.sock.fileno()

    def recv(self, size=None):
        '''
        recv 1 or more available bytes then return
        return None to indicate EOF
        since we use b'' to indicate empty string in case of timeout, so do not return b'' for EOF
        '''
        if size is None:    # socket.recv does not allow None or -1 as argument
            size = 8192
        try:
            b = self.sock.recv(size)
            if self.debug: write_debug(self.debug, b'SocketIO.recv(%r) -> %r' % (size, b))
            if not b:
                self.eof_seen = True
                return None
            return b
        except socket.timeout:
            raise TimeoutError('socket.timeout')    # translate to TimeoutError
        except Exception as ex:
            self.exit_code = 1    # recv exception
            if self.debug: write_debug(self.debug, b'SocketIO.recv(%r) exception: %r' % (size, ex))
            raise

    def send(self, buf):
        try:
            return self.sock.sendall(buf)
        except Exception as ex:
            self.exit_code = 2    # send exception
            if self.debug: write_debug(self.debug, b'SocketIO.send(%r) exception: %r' % (buf, ex))
            raise

    def send_eof(self):
        self.eof_sent = True
        self.sock.shutdown(socket.SHUT_WR)
        if self.debug: write_debug(self.debug, b'SocketIO.send_eof()')

    def interact(self, buffered=None, read_transform=None, write_transform=None, show_input=None, show_output=None, raw_mode=False):
        if show_input is None:
            show_input = not os.isatty(pty.STDIN_FILENO)    # if pty, itself will echo; if pipe, we do echo
        if show_output is None:
            show_output = True

        parent_tty_mode = None
        if os.isatty(pty.STDIN_FILENO) and raw_mode:
            parent_tty_mode = tty.tcgetattr(pty.STDIN_FILENO)   # save mode and restore after interact
            ttyraw(pty.STDIN_FILENO)                            # set to raw mode to pass all input thru, supporting remote apps as htop/vim

        if buffered is not None:
            if read_transform is not None:
                buffered = read_transform(buffered)
            if show_output:
                write_stdout(buffered)

        while not self.is_closed():
            try:
                r, _w, _e = select_ignoring_useless_signal([self.rfd, pty.STDIN_FILENO], [], [])
            except KeyboardInterrupt:
                break
            data = None
            if self.rfd in r:
                data = self.recv(1024)
                if data:
                    if read_transform is not None:
                        data = read_transform(data)
                    if show_output:
                        write_stdout(data)
                else:       # EOF
                    self.eof_seen = True
                    break
            if pty.STDIN_FILENO in r:
                try:
                    data = os.read(pty.STDIN_FILENO, 1024)
                except OSError as e:
                    # the subprocess may have closed before we get to reading it
                    if e.errno != errno.EIO:
                        raise
                if data:
                    if write_transform:
                        data = write_transform(data)
                    if show_input:
                        write_stdout(data)
                    self.send(data)

        if parent_tty_mode:
            tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH, parent_tty_mode)

    def close(self):
        self.eof_seen = True
        self.eof_sent = True
        try:
            self.sock.close()
            if self.exit_code is None:
                self.exit_code = 0
        except Exception as ex:
            self.exit_code = 3    # close exception
            if self.debug: write_debug(self.debug, b'SocketIO.close() exception: %r' % ex)
            raise

    def is_closed(self):
        if python_version_major < 3:
            return isinstance(self.sock._sock, socket._closedsocket)    # pylint: disable=no-member
        else:
            return self.sock._closed

    @property
    def exit_status(self):
        return self.exit_code

    def target_pid(self):       # code borrowed from https://github.com/Gallopsled/pwntools to implement gdb attach of local socket
        all_pids = [int(pid) for pid in os.listdir('/proc') if pid.isdigit()]

        def getpid(loc, rem):
            loc = b'%08X:%04X' % (l32(socket.inet_aton(loc[0])), loc[1])
            rem = b'%08X:%04X' % (l32(socket.inet_aton(rem[0])), rem[1])
            inode = None
            with open('/proc/net/tcp', 'rb') as fd:
                for line in fd:
                    line = line.split()
                    if line[1] == loc and line[2] == rem:
                        inode = line[9]
            if inode == None:
                return None
            for pid in all_pids:
                try:
                    for fd in os.listdir('/proc/%d/fd' % pid):
                        fd = os.readlink('/proc/%d/fd/%s' % (pid, fd))
                        m = re.match(r'socket:[(d+)]', fd)
                        if m:
                            this_inode = m.group(1)
                            if this_inode.encode() == inode:
                                return pid
                except:
                    pass

        # Specifically check for socat, since it has an intermediary process
        # if you do not specify "nofork" to the EXEC: argument
        # python(2640) -- socat(2642) -- socat(2643) -- bash(2644)
        def fix_socat(pid):
            if not pid:
                return None
            exe_path = os.readlink('/proc/%d/exe' % pid)
            if os.path.basename(exe_path) == 'socat':
                for p in all_pids:
                    try:
                        with open("/proc/%s/stat" % p, 'rb') as f:
                            data = f.read()
                            rpar = data.rfind(b')')
                            dset = data[rpar + 2:].split()
                            if int(dset[1]) == pid:
                                return int(data.split()[0])
                    except:
                        pass
            return None

        sock = self.sock.getsockname()
        peer = self.sock.getpeername()

        pid = getpid(peer, sock)
        if pid: return fix_socat(pid)

        pid = getpid(sock, peer)
        return fix_socat(pid)

    def __str__(self):
        return '<SocketIO ' + 'self.sock=' + repr(self.sock) + '>'

    def __repr__(self):
        return repr(self.sock)

class ProcessIO:
    mode = 'process'

    def __init__(self, target, timeout=None, debug=None, stdin=PIPE, stdout=TTY_RAW, cwd=None, env=None, sighup=None, write_delay=None, read_echoback=True):
        if os.name == 'nt':
            raise RuntimeError("zio (version %s) process mode does not support windows operation system." % __version__)

        self.timeout = timeout
        self.debug = debug

        self.write_delay = write_delay  # the delay before writing data, pexcept said Linux don't like this to be below 30ms
        self.read_echoback = read_echoback

        self.close_delay = 0.1          # like pexcept, will used by close(), to give kernel time to update process status, time in seconds
        self.terminate_delay = 0.1      # like close_delay

        self.exit_code = None
        self.pid = None

        self.eof_seen = False
        self.eof_sent = False

        # STEP 1: prepare command line args
        if isinstance(target, type('')):
            self.args = shlex.split(target)
        else:
            self.args = list(target)

        executable = find_executable(self.args[0])
        if not executable:
            raise ValueError('unable to find executable in path: %s' % self.args)

        if not os.access(executable, os.X_OK):
            raise RuntimeError('could not execute file without X bit set, please chmod +x %s' % executable)

        self.args[0] = executable

        # STEP 2: create pipes
        if stdout == PIPE:
            stdout_slave_fd, stdout_master_fd = self._pipe_cloexec()    # note: slave, master
        else:
            stdout_master_fd, stdout_slave_fd = pty.openpty()           # note: master, slave

        if stdout_master_fd < 0 or stdout_slave_fd < 0:
            raise RuntimeError('Could not create pipe or openpty for stdout/stderr')

        # use another pty for stdin because we don't want our input to be echoed back in stdout
        # set echo off does not help because in application like ssh, when you input the password
        # echo will be switched on again
        # and dont use os.pipe either, because many thing weired will happen, such as backspace not working, ssh lftp command hang

        stdin_master_fd, stdin_slave_fd = self._pipe_cloexec() if stdin == PIPE else pty.openpty()
        # write_debug(self.debug, b'stdin == %r, stdin_master_fd isatty = %r' % (stdin, os.isatty(stdin_master_fd)))

        if stdin_master_fd < 0 or stdin_slave_fd < 0:
            raise RuntimeError('Could not openpty for stdin')

        # STEP 3: fork and start engine

        gc_enabled = gc.isenabled()
        # Disable gc to avoid bug where gc -> file_dealloc ->
        # write to stderr -> hang.  http://bugs.python.org/issue1336
        gc.disable()
        try:
            self.pid = os.fork()
        except:
            if gc_enabled:
                gc.enable()
            raise

        if self.pid < 0:
            raise RuntimeError('failed to fork')
        elif self.pid == 0:     # Child
            os.close(stdout_master_fd)

            if os.isatty(stdin_slave_fd):
                self.__pty_make_controlling_tty(stdin_slave_fd)
                # self.__pty_make_controlling_tty(stdout_slave_fd)

            try:
                if os.isatty(stdout_slave_fd) and os.isatty(pty.STDIN_FILENO):
                    h, w = self._getwinsize(pty.STDIN_FILENO)
                    self._setwinsize(stdout_slave_fd, h, w)     # note that this may not be successful
            except BaseException as ex:
                if self.debug: write_debug(self.debug, b'[ WARN ] ProcessIO.__init__(%r) setwinsize exception: %r' % (target, ex))

            # Dup fds for child
            def _dup2(a, b):
                # dup2() removes the CLOEXEC flag but
                # we must do it ourselves if dup2()
                # would be a no-op (issue #10806).
                if a == b:
                    self._set_cloexec_flag(a, False)
                elif a is not None:
                    os.dup2(a, b)

            # redirect stdout and stderr to pty
            os.dup2(stdout_slave_fd, pty.STDOUT_FILENO)
            os.dup2(stdout_slave_fd, pty.STDERR_FILENO)

            # redirect stdin to stdin_slave_fd instead of stdout_slave_fd, to prevent input echoed back
            _dup2(stdin_slave_fd, pty.STDIN_FILENO)

            if stdout_slave_fd > 2:
                os.close(stdout_slave_fd)

            if stdin_master_fd is not None:
                os.close(stdin_master_fd)

            # do not allow child to inherit open file descriptors from parent

            max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
            os.closerange(3, max_fd)

            # the following line matters, for example, if SIG_DFL specified and sighup sent when exit, the exitcode of child process can be affected to 1
            if sighup is not None:
                # note that, self.signal could only be one of (SIG_IGN, SIG_DFL)
                signal.signal(signal.SIGHUP, sighup)

            if cwd is not None:
                os.chdir(cwd)
            
            if env is None:
                os.execv(executable, self.args)
            else:
                os.execvpe(executable, self.args, env)

            # TODO: add subprocess errpipe to detect child error
            # child exit here, the same as subprocess module do
            os._exit(255)

        else:
            # after fork, parent
            self.wfd = stdin_master_fd
            self.rfd = stdout_master_fd

            if os.isatty(self.wfd):
                # there is no way to eliminate controlling characters in tcattr
                # so we have to set raw mode here now
                self._wfd_init_mode = tty.tcgetattr(self.wfd)[:]
                if stdin == TTY_RAW:
                    ttyraw(self.wfd)
                    self._wfd_raw_mode = tty.tcgetattr(self.wfd)[:]
                else:
                    self._wfd_raw_mode = self._wfd_init_mode[:]

            if os.isatty(self.rfd):
                self._rfd_init_mode = tty.tcgetattr(self.rfd)[:]
                if stdout == TTY_RAW:
                    ttyraw(self.rfd, raw_in = False, raw_out = True)
                    self._rfd_raw_mode = tty.tcgetattr(self.rfd)[:]
                    if self.debug: write_debug(self.debug, b'stdout tty raw mode: %r
' % self._rfd_raw_mode)
                else:
                    self._rfd_raw_mode = self._rfd_init_mode[:]

            os.close(stdin_slave_fd)
            os.close(stdout_slave_fd)
            if gc_enabled:
                gc.enable()

            time.sleep(self.close_delay)

            atexit.register(self._kill, signal.SIGHUP)

    def recv(self, size=None):
        '''
        recv 1 or more available bytes then return
        return None to indicate EOF
        since we use b'' to indicate empty string in case of timeout, so do not return b'' for EOF
        '''
        if size is None:    # os.read does not allow None or -1 as argument
            size = 8192

        timeout = self.timeout

        # Note that some systems such as Solaris do not give an EOF when
        # the child dies. In fact, you can still try to read
        # from the child_fd -- it will block forever or until TIMEOUT.
        # For this case, I test isalive() before doing any reading.
        # If isalive() is false, then I pretend that this is the same as EOF.
        if not self._isalive():
            # timeout of 0 means "poll"
            r, _w, _e = select_ignoring_useless_signal([self.rfd], [], [], 0)
            if not r:
                self.eof_seen = True
                raise EOFError('End Of File (EOF). Braindead platform.')

        if timeout is not None and timeout > 0:
            end_time = time.time() + timeout
        else:
            end_time = float('inf')

        readfds = [self.rfd]

        if self.read_echoback:
            try:
                os.fstat(self.wfd)
                readfds.append(self.wfd)
            except:
                pass

        while True:
            now = time.time()
            if now > end_time:
                raise TimeoutError('Timeout exceeded.')

            if timeout is not None and timeout > 0:
                timeout = end_time - now

            r, _w, _e = select_ignoring_useless_signal(readfds, [], [], timeout)
            if not r:
                if not self._isalive():
                    # Some platforms, such as Irix, will claim that their
                    # processes are alive; timeout on the select; and
                    # then finally admit that they are not alive.
                    self.eof_seen = True
                    raise EOFError('End of File (EOF). Very slow platform.')

            if self.wfd in r:
                try:
                    data = os.read(self.wfd, size)
                    if self.debug: write_debug(self.debug, b'ProcessIO.recv(%r)[wfd=%r] -> %r' % (size, self.wfd, data))
                    if data:
                        return data
                except OSError as err:
                    # wfd read EOF (echo back)
                    pass

            if self.rfd in r:
                try:
                    b = os.read(self.rfd, size)
                    if self.debug: write_debug(self.debug, b'ProcessIO.recv(%r) -> %r' % (size, b))
                    # https://docs.python.org/3/library/os.html#os.read
                    # If the end of the file referred to by fd has been reached, an empty bytes object is returned.
                    if not b:                       # BSD style
                        self.eof_seen = True
                        return None
                    return b
                except OSError as err:
                    if self.debug: write_debug(self.debug, b'ProcessIO.recv(%r) raise OSError %r' % (size, err))
                    if err.errno in (errno.EIO, errno.EBADF):      # Linux does this
                        # EIO:   OSError: [Errno 5] Input/Output Error
                        # EBADF: OSError: [Errno 9] Bad file descriptor
                        self.eof_seen = True
                        return None
                    raise

    def send(self, buf, delay=True):
        if delay:       # prevent write too fast
            time.sleep(self.write_delay)
        if self.debug: write_debug(self.debug, b'ProcessIO.send(%r)' % buf)
        return os.write(self.wfd, buf)

    def send_eof(self, force_close=False):
        self.eof_sent = True

        if not os.isatty(self.wfd):     # pipes can be closed harmlessly
            os.close(self.wfd)

        # for pty, close master fd in Mac won't cause slave fd input/output error, so let's do it!
        elif sys.platform.startswith('darwin'):
            os.close(self.wfd)
        else:       # assume Linux here
            # according to http://linux.die.net/man/3/cfmakeraw
            # set min = 0 and time > 0, will cause read timeout and return 0 to indicate EOF
            # but the tricky thing here is, if child read is invoked before this
            # it will still block forever, so you have to call send_eof before that happens
            mode = tty.tcgetattr(self.wfd)[:]
            mode[tty.CC][tty.VMIN] = 0
            mode[tty.CC][tty.VTIME] = 1
            tty.tcsetattr(self.wfd, tty.TCSAFLUSH, mode)
            if force_close:
                time.sleep(self.close_delay)
                os.close(self.wfd)  # might cause EIO (input/output error)! use force_close at your own risk

    def interact(self, buffered=None, read_transform=None, write_transform=None, show_input=None, show_output=None):
        """
        when stdin is passed using os.pipe, backspace key will not work as expected,
        if wfd is not a tty, then when backspace pressed, I can see that 0x7f is passed, but vim does not delete backwards, so you should choose the right input when using zio
        """
        if show_output is None:
            show_output = True

        # if stdin is in TTY/TTY_RAW, we passthrough to let the inner tty handle everything
        # if wfd is a pipe, we keep parent tty in cooked mode, so line editing still works
        parent_tty_mode = None
        if os.isatty(pty.STDIN_FILENO) and os.isatty(self.wfd):
            parent_tty_mode = tty.tcgetattr(pty.STDIN_FILENO)   # save mode and restore after interact
            ttyraw(pty.STDIN_FILENO)                      # set to raw mode to pass all input thru, supporting apps as vim
            if self.debug: write_debug(self.debug, b'parent tty set to raw mode')

            if show_input is None:
                show_input = True       # do echo from underlying echo back
        else:
            if show_input is None:
                show_input = False      # parent tty in cooked mode and itself has echo back

        if buffered is not None:
            if read_transform is not None:
                buffered = read_transform(buffered)
            if show_output:
                write_stdout(buffered)

        if os.isatty(self.wfd):
            # here, enable cooked mode for process stdin
            # but we should only enable for those who need cooked mode, not stuff like vim
            # we just do a simple detection here
            wfd_mode = tty.tcgetattr(self.wfd)

            if self.debug: write_debug(self.debug, b'wfd now mode = %r
' % wfd_mode)
            if self.debug: write_debug(self.debug, b'wfd raw mode = %r
' % self._wfd_raw_mode)
            if self.debug: write_debug(self.debug, b'wfd ini mode = %r
' % self._wfd_init_mode)

            if wfd_mode == self._wfd_raw_mode:     # if untouched by forked child
                tty.tcsetattr(self.wfd, tty.TCSAFLUSH, self._wfd_init_mode)
                if self.debug: write_debug(self.debug, b'change wfd back to init mode
')
            # but wait, things here are far more complex than that
            # most applications set mode not by setting it to some value, but by flipping some bits in the flags
            # so, if we set wfd raw mode at the beginning, we are unable to set the correct mode here
            # to solve this situation, set stdin = TTY_RAW, but note that you will need to manually escape control characters by prefixing Ctrl-V

        try:
            rfdlist = [self.rfd, pty.STDIN_FILENO]
            if os.isatty(self.wfd):
                # wfd for tty echo
                rfdlist.append(self.wfd)
            while self._isalive():
                if len(rfdlist) == 0:
                    break
                if self.rfd not in rfdlist:
                    break
                try:
                    r, _w, _e = select_ignoring_useless_signal(rfdlist, [], [])
                except KeyboardInterrupt:
                    break

                if self.wfd in r:          # handle tty echo back first if wfd is a tty
                    try:
                        data = None
                        data = os.read(self.wfd, 1024)
                        if self.debug: write_debug(self.debug, b'[ProcessIO.interact] read data from wfd = %r' % data)
                    except OSError as e:
                        if e.errno != errno.EIO:
                            raise
                    if data:
                        if show_input:
                            write_stdout(data)
                    else:
                        rfdlist.remove(self.wfd)
                if self.rfd in r:
                    try:
                        data = None
                        data = os.read(self.rfd, 1024)
                        if self.debug: write_debug(self.debug, b'[ProcessIO.interact] read data from rfd = %r' % data)
                    except OSError as e:
                        if e.errno != errno.EIO:
                            raise
                    if data:
                        if read_transform:
                            data = read_transform(data)
                        if show_output:
                            # now we are in interact mode, so users want to see things in real
                            write_stdout(data)
                    else:
                        rfdlist.remove(self.rfd)
                        self.eof_seen = True
                if pty.STDIN_FILENO in r:
                    try:
                        data = None
                        data = os.read(pty.STDIN_FILENO, 1024)
                    except OSError as e:
                        # the subprocess may have closed before we get to reading it
                        if e.errno != errno.EIO:
                            raise
                    if self.debug and os.isatty(self.wfd):
                        wfd_mode = tty.tcgetattr(self.wfd)
                        if self.debug: write_debug(self.debug, b'stdin wfd mode = %r' % wfd_mode)
                    # in BSD, you can still read '' from rfd, so never use `data is not None` here
                    if data:
                        if self.debug: write_debug(self.debug, b'[ProcessIO.interact] write data = %r' % data)
                        if write_transform:
                            data = write_transform(data)
                        if not os.isatty(self.wfd):
                            if os.isatty(pty.STDIN_FILENO):
                                data = data.replace(b'
', b'
')     # we must do the translation when tty does not help
                            # also echo back by ourselves, now we are echoing things we input by hand
                            if show_input:
                                write_stdout(data)
                        while data != b'' and self._isalive():
                            n = self.send(data, delay=False)
                            data = data[n:]
                    else:
                        self.send_eof(force_close=True)
                        rfdlist.remove(pty.STDIN_FILENO)

            while True:     # read the final buffered output, note that the process probably is not alive, so use while True to read until end (fix pipe stdout interact mode bug)
                r, _w, _e = select_ignoring_useless_signal([self.rfd], [], [], timeout=self.close_delay)
                if self.rfd in r:
                    try:
                        data = None
                        data = os.read(self.rfd, 1024)
                    except OSError as e:
                        if e.errno != errno.EIO:
                            raise
                    # in BSD, you can still read '' from rfd, so never use `data is not None` here
                    if data:
                        if self.debug: write_debug(self.debug, b'[ProcessIO.interact] read remaining data = %r' % data)
                        if read_transform:
                            data = read_transform(data)
                        if show_output:
                            write_stdout(data)
                    else:
                        self.eof_seen = True
                        break
                else:
                    break
        finally:
            if parent_tty_mode:
                tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH, parent_tty_mode)
            if os.isatty(self.wfd):
                ttyraw(self.wfd)

    def close(self, force_close=True):
        '''
        close and clean up, nothing can and should be done after closing
        '''
        if self.is_closed():
            return
        try:
            os.close(self.wfd)
        except:
            pass    # may already closed in write_eof
        os.close(self.rfd)
        time.sleep(self.close_delay)
        if self._isalive():
            if not self._terminate(force_close):
                raise RuntimeError('Could not terminate child process')
        self.eof_seen = True
        self.eof_sent = True
        self.rfd = -1
        self.wfd = -1

    def is_closed(self):
        return self.rfd == -1 and self.wfd == -1 and self.eof_sent == True and self.eof_seen == True

    @property
    def exit_status(self):
        if self.exit_code is None:
            self._isalive()     # will modify exit_code if not alive
        return self.exit_code

    def target_pid(self):
        return self.pid

    def __str__(self):
        return '<ProcessIO cmdline=%s>' % (self.args)

    # ---- internal methods ----

    def _kill(self, sig):

        '''This sends the given signal to the child application. In keeping
        with UNIX tradition it has a misleading name. It does not necessarily
        kill the child unless you send the right signal. '''

        # Same as os.kill, but the pid is given for you.
        if self._isalive() and self.pid > 0:
            os.kill(self.pid, sig)

    def _terminate(self, force=False):

        '''This forces a child process to terminate. It starts nicely with
        SIGHUP and SIGINT. If "force" is True then moves onto SIGKILL. This
        returns True if the child was terminated. This returns False if the
        child could not be terminated. '''

        if not self._isalive():
            return True
        try:
            self._kill(signal.SIGHUP)
            time.sleep(self.terminate_delay)
            if not self._isalive():
                return True
            self._kill(signal.SIGCONT)
            time.sleep(self.terminate_delay)
            if not self._isalive():
                return True
            self._kill(signal.SIGINT)        # SIGTERM is nearly identical to SIGINT
            time.sleep(self.terminate_delay)
            if not self._isalive():
                return True
            if force:
                self._kill(signal.SIGKILL)
                time.sleep(self.terminate_delay)
                if not self._isalive():
                    return True
                else:
                    return False
            return False
        except OSError:
            # I think there are kernel timing issues that sometimes cause
            # this to happen. I think isalive() reports True, but the
            # process is dead to the kernel.
            # Make one last attempt to see if the kernel is up to date.
            time.sleep(self.terminate_delay)
            if not self._isalive():
                return True
            else:
                return False

    def _wait(self):

        '''This waits until the child exits. This is a blocking call. This will
        not read any data from the child, so this will block forever if the
        child has unread output and has terminated. In other words, the child
        may have printed output then called exit(), but, the child is
        technically still alive until its output is read by the parent. '''

        if self._isalive():
            _pid, status = os.waitpid(self.pid, 0)
        else:
            raise Exception('Cannot wait for dead child process.')
        self.exit_code = os.WEXITSTATUS(status)
        if os.WIFEXITED(status):
            self.exit_code = os.WEXITSTATUS(status)
        elif os.WIFSIGNALED(status):
            self.exit_code = os.WTERMSIG(status)
        elif os.WIFSTOPPED(status):
            # You can't call wait() on a child process in the stopped state.
            raise RuntimeError('Called wait() on a stopped child ' +
                    'process. This is not supported. Is some other ' +
                    'process attempting job control with our child pid?')
        return self.exit_code

    def _isalive(self):

        '''This tests if the child process is running or not. This is
        non-blocking. If the child was terminated then this will read the
        exit code or signalstatus of the child. This returns True if the child
        process appears to be running or False if not. It can take literally
        SECONDS for Solaris to return the right status. '''

        if self.exit_code is not None:
            return False

        if self.eof_seen:
            # This is for Linux, which requires the blocking form
            # of waitpid to # get status of a defunct process.
            # This is super-lame. The eof_seen would have been set
            # in recv(), so this should be safe.
            waitpid_options = 0
        else:
            waitpid_options = os.WNOHANG

        try:
            pid, status = os.waitpid(self.pid, waitpid_options)
        except OSError:
            err = sys.exc_info()[1]
            # No child processes
            if err.errno == errno.ECHILD:
                raise RuntimeError('isalive() encountered condition ' +
                        'where "terminated" is 0, but there was no child ' +
                        'process. Did someone else call waitpid() ' +
                        'on our process?')
            else:
                raise err

        # I have to do this twice for Solaris.
        # I can't even believe that I figured this out...
        # If waitpid() returns 0 it means that no child process
        # wishes to report, and the value of status is undefined.
        if pid == 0:
            try:
                ### os.WNOHANG) # Solaris!
                pid, status = os.waitpid(self.pid, waitpid_options)
            except OSError as e:
                # This should never happen...
                if e.errno == errno.ECHILD:
                    raise RuntimeError('isalive() encountered condition ' +
                            'that should never happen. There was no child ' +
                            'process. Did someone else call waitpid() ' +
                            'on our process?')
                else:
                    raise

            # If pid is still 0 after two calls to waitpid() then the process
            # really is alive. This seems to work on all platforms, except for
            # Irix which seems to require a blocking call on waitpid or select,
            # so I let read_nonblocking take care of this situation
            # (unfortunately, this requires waiting through the timeout).
            if pid == 0:
                return True

        if pid == 0:
            return True

        if os.WIFEXITED(status):
            self.exit_code = os.WEXITSTATUS(status)
        elif os.WIFSIGNALED(status):
            self.exit_code = os.WTERMSIG(status)
        elif os.WIFSTOPPED(status):
            raise RuntimeError('isalive() encountered condition ' +
                    'where child process is stopped. This is not ' +
                    'supported. Is some other process attempting ' +
                    'job control with our child pid?')
        return False

    def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty. Harmless if not already connected.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            if fd >= 0:
                os.close(fd)
        # which exception, shouldnt' we catch explicitly .. ?
        except:
            # Already disconnected. This happens if running inside cron.
            pass

        os.setsid()

        # Verify we are disconnected from controlling tty
        # by attempting to open it again.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            if fd >= 0:
                os.close(fd)
                raise Exception('Failed to disconnect from ' +
                    'controlling tty. It is still possible to open /dev/tty.')
        # which exception, shouldnt' we catch explicitly .. ?
        except:
            # Good! We are disconnected from a controlling tty.
            pass

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        if fd < 0:
            raise Exception("Could not open child pty, " + child_name)
        else:
            os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        if fd < 0:
            raise Exception("Could not open controlling tty, /dev/tty")
        else:
            os.close(fd)

    def _set_cloexec_flag(self, fd, cloexec=True):
        try:
            cloexec_flag = fcntl.FD_CLOEXEC
        except AttributeError:
            cloexec_flag = 1

        old = fcntl.fcntl(fd, fcntl.F_GETFD)
        if cloexec:
            fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
        else:
            fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)

    def _pipe_cloexec(self):
        """Create a pipe with FDs set CLOEXEC."""
        # Pipes' FDs are set CLOEXEC by default because we don't want them
        # to be inherited by other subprocesses: the CLOEXEC flag is removed
        # from the child's FDs by _dup2(), between fork() and exec().
        # This is not atomic: we would need the pipe2() syscall for that.
        r, w = os.pipe()
        self._set_cloexec_flag(r)
        self._set_cloexec_flag(w)
        return w, r

    def _setwinsize(self, fd, rows, cols):   # from pexpect, thanks!

        '''This sets the terminal window size of the child tty. This will cause
        a SIGWINCH signal to be sent to the child. This does not change the
        physical window size. It changes the size reported to TTY-aware
        applications like vi or curses -- applications that respond to the
        SIGWINCH signal. '''

        # Check for buggy platforms. Some Python versions on some platforms
        # (notably OSF1 Alpha and RedHat 7.1) truncate the value for
        # termios.TIOCSWINSZ. It is not clear why this happens.
        # These platforms don't seem to handle the signed int very well;
        # yet other platforms like OpenBSD have a large negative value for
        # TIOCSWINSZ and they don't have a truncate problem.
        # Newer versions of Linux have totally different values for TIOCSWINSZ.
        # Note that this fix is a hack.
        TIOCSWINSZ = getattr(termios, 'TIOCSWINSZ', -2146929561)
        if TIOCSWINSZ == 2148037735:
            # Same bits, but with sign.
            TIOCSWINSZ = -2146929561
        # Note, assume ws_xpixel and ws_ypixel are zero.
        s = struct.pack('HHHH', rows, cols, 0, 0)
        fcntl.ioctl(fd, TIOCSWINSZ, s)

    def _getwinsize(self, fd):

        '''This returns the terminal window size of the child tty. The return
        value is a tuple of (rows, cols). '''

        TIOCGWINSZ = getattr(termios, 'TIOCGWINSZ', 1074295912)
        s = struct.pack('HHHH', 0, 0, 0, 0)
        x = fcntl.ioctl(fd, TIOCGWINSZ, s)
        return struct.unpack('HHHH', x)[0:2]

# -------------------------------------------------
# =====> command line usage as a standalone app <=====

def usage():
    print("""
usage:

    $ zio [options] cmdline | host port

options:

    -h, --help              help page, you are reading this now!
    -i, --stdin             tty|pipe, specify tty or pipe stdin, default to tty
    -o, --stdout            tty|pipe, specify tty or pipe stdout, default to tty
    -t, --timeout           integer seconds, specify timeout
    -r, --read              how to print out content read from child process, may be RAW(True), NONE(False), REPR, HEX
    -w, --write             how to print out content written to child process, may be RAW(True), NONE(False), REPR, HEX
    -a, --ahead             message to feed into stdin before interact
    -b, --before            don't do anything before reading those input
    -d, --decode            when in interact mode, this option can be used to specify decode function REPR/HEX to input raw hex bytes
    -l, --delay             write delay, time to wait before write

examples:

    $ zio -h
        you are reading this help message

    $ zio [-t seconds] [-i [tty|pipe]] [-o [tty|pipe]] "cmdline -x opts and args"
        spawning process and interact with it

    $ zio [-t seconds] host port
        zio becomes a netcat

    $ zio tty
    $ zio cat
    $ zio vim
    $ zio ssh -p 22 root@127.0.0.1
    $ zio xxd
    $ zio 127.1 22                 # WOW! you can talk with sshd by hand!
    $ zio -i pipe ssh root@127.1   # you must be crazy to do this!
""")

def cmdline(argv):
    import getopt       # use getopt for better compatibility, argparse is not introduced until python2.7
    try:
        opts, args = getopt.getopt(argv, 'hi:o:t:r:w:d:e:a:b:l:', ['help', 'stdin=', 'stdout=', 'timeout=', 'read=', 'write=', 'decode=', 'encode=', 'ahead=', 'before=', 'debug=', 'delay=', 'show-input=', 'show-output='])
    except getopt.GetoptError as err:
        print(str(err))
        usage()
        sys.exit(10)

    kwargs = {
        'stdin': TTY,                     # don't use tty_raw now let's say few people use raw tty in the terminal by hand
        'stdout': TTY,
    }
    decode = None
    encode = None
    show_input = None
    show_output = None
    ahead = None
    before = None
    for o, a in opts:
        if o in ('-h', '--help'):
            usage()
            sys.exit(0)
        elif o in ('-i', '--stdin'):
            if a.lower() == TTY.lower():
                kwargs['stdin'] = TTY
            elif a.lower() == TTY_RAW.lower():
                kwargs['stdin'] = TTY_RAW
            else:
                kwargs['stdin'] = PIPE
        elif o in ('-o', '--stdout'):
            if a.lower() == PIPE.lower():
                kwargs['stdout'] = PIPE
            elif a.lower() == TTY_RAW.lower():
                kwargs['stdout'] = TTY_RAW
            else:
                kwargs['stdout'] = TTY
        elif o in ('-t', '--timeout'):
            try:
                kwargs['timeout'] = int(a)
            except:
                usage()
                sys.exit(11)
        elif o in ('-r', '--read'):
            if a.lower() == 'hex':
                kwargs['print_read'] = COLORED(HEX, 'yellow')
            elif a.lower() == 'repr':
                kwargs['print_read'] = COLORED(REPR, 'yellow')
            elif a.lower() == 'none':
                kwargs['print_read'] = NONE
            else:
                kwargs['print_read'] = RAW
        elif o in ('-w', '--write'):
            if a.lower() == 'hex':
                kwargs['print_write'] = COLORED(HEX, 'cyan')
            elif a.lower() == 'repr':
                kwargs['print_write'] = COLORED(REPR, 'cyan')
            elif a.lower() == 'none':
                kwargs['print_write'] = NONE
            else:
                kwargs['print_write'] = RAW
        elif o in ('-d', '--decode'):
            if a.lower() == 'eval':
                decode = EVAL
            elif a.lower() == 'unhex':
                decode = UNHEX
        elif o in ('-e', '--encode'):
            if a.lower() == 'repr':
                encode = REPR
            elif a.lower() == 'hex':
                encode = HEX
            elif a.lower() == 'bin':
                encode = BIN
        elif o in ('--show-input', ):
            show_input = a.lower() in ('true', '1', 't', 'yes', 'y')
        elif o in ('--show-output', ):
            show_output = a.lower() in ('true', '1', 't', 'yes', 'y')
        elif o in ('-a', '--ahead'):
            ahead = a
        elif o in ('-b', '--before'):
            before = a
        elif o in ('--debug',):
            if os.path.exists(a):
                choice = input('file exists at %s, overwrite(Y/n)?' % a)
                if choice.strip().lower() == 'n':
                    return
            kwargs['debug'] = open(a, 'wb')
        elif o in ('-l', '--delay'):
            kwargs['write_delay'] = float(a)

    target = None
    if len(args) == 2:
        try:
            port = int(args[1])
            if is_hostport_tuple((args[0], port)):
                target = (args[0], port)
        except:
            pass
    if not target:
        if len(args) == 1:
            target = args[0]
        else:
            target = args

    io = zio(target, **kwargs)
    if before:
        io.read_until(before.encode('latin-1'))
    if ahead:
        io.write(ahead.encode('latin-1'))
    io.interact(write_transform=decode, read_transform=encode, show_input=show_input, show_output=show_output)

def main():
    if len(sys.argv) < 2:
        usage()
        sys.exit(0)

    cmdline(sys.argv[1:])

if __name__ == '__main__':
    main()

# -------------------------------------------------
# =====> export useful objects and functions <=====

__all__ = [
    'l8', 'b8', 'l16', 'b16', 'l32', 'b32', 'l64', 'b64', 'convert_packing',
    'colored',
    'match_pattern',
    'write_stdout', 'write_stderr',
    'xor', 'bytes2hex', 'hex2bytes', 'tohex', 'unhex',
    'zio',
    'HEX', 'TOHEX', 'UNHEX', 'EVAL', 'REPR', 'RAW', 'NONE', 'HEXDUMP', 'HEXDUMP_INDENT4', 'HEXDUMP_INDENT8', 'HEXDUMP_INDENT16', 'BIN', 'UNBIN',
    'COLORED',
    'TTY', 'PIPE', 'TTY_RAW',
]

if python_version_major < 3:
    __all__.append('TimeoutError')

# vi:set et ts=4 sw=4 ft=python :
原文地址:https://www.cnblogs.com/CSE-kun/p/14884158.html