Skip to content

bounded_subprocess

bounded_subprocess

Bounded subprocess execution with timeout and output limits.

This package provides convenient functions for running subprocesses with bounded execution time and output size, with support for both synchronous and asynchronous execution patterns.

bounded_subprocess.bounded_subprocess

Synchronous subprocess execution with bounds on runtime and output size.

run(args: List[str], timeout_seconds: int = 15, max_output_size: int = 2048, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None) -> Result

Run a subprocess with a timeout and bounded stdout/stderr capture.

This helper starts the child in a new session so timeout cleanup can kill the entire process group. Stdout and stderr are read in nonblocking mode and truncated to max_output_size bytes each. If the timeout elapses, the returned Result.timeout is True and Result.exit_code is -1. If stdin_data cannot be fully written before stdin_write_timeout, Result.exit_code is set to -1 even if the process exits normally.

Example:

from bounded_subprocess import run

result = run(
    ["bash", "-lc", "echo ok; echo err 1>&2"],
    timeout_seconds=5,
    max_output_size=1024,
)
print(result.exit_code)
print(result.stdout.strip())
print(result.stderr.strip())
Source code in src/bounded_subprocess/bounded_subprocess.py
def run(
    args: List[str],
    timeout_seconds: int = 15,
    max_output_size: int = 2048,
    env=None,
    stdin_data: Optional[str] = None,
    stdin_write_timeout: Optional[int] = None,
) -> Result:
    """
    Run a subprocess with a timeout and bounded stdout/stderr capture.

    This helper starts the child in a new session so timeout cleanup can kill
    the entire process group. Stdout and stderr are read in nonblocking mode and
    truncated to `max_output_size` bytes each. If the timeout elapses, the
    returned `Result.timeout` is True and `Result.exit_code` is -1. If
    `stdin_data` cannot be fully written before `stdin_write_timeout`,
    `Result.exit_code` is set to -1 even if the process exits normally.

    Example:

    ```python
    from bounded_subprocess import run

    result = run(
        ["bash", "-lc", "echo ok; echo err 1>&2"],
        timeout_seconds=5,
        max_output_size=1024,
    )
    print(result.exit_code)
    print(result.stdout.strip())
    print(result.stderr.strip())
    ```
    """
    deadline = time.time() + timeout_seconds

    p = subprocess.Popen(
        args,
        env=env,
        stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        start_new_session=True,
        bufsize=MAX_BYTES_PER_READ,
    )
    process_group_id = os.getpgid(p.pid)

    set_nonblocking(p.stdout)
    set_nonblocking(p.stderr)

    if stdin_data is not None:
        set_nonblocking(p.stdin)
        write_ok = write_nonblocking_sync(
            fd=p.stdin,
            data=stdin_data.encode(),
            timeout_seconds=stdin_write_timeout
            if stdin_write_timeout is not None
            else 15,
        )
        # From what I recall, closing stdin is not necessary, but is customary.
        try:
            p.stdin.close()
        except (BrokenPipeError, BlockingIOError):
            pass

    bufs = read_to_eof_sync(
        [p.stdout, p.stderr],
        timeout_seconds=timeout_seconds,
        max_len=max_output_size,
    )

    # Without this, even the trivial test fails on Linux but not on macOS. It
    # seems possible for (1) both stdout and stderr to close (2) before the child
    # process exits, and we can observe the instant between (1) and (2). So, we
    # need to p.wait and not p.poll.
    #
    # Reading the above, we should be able to write a test case that just closes
    # both stdout and stderr explicitly, and then sleeps for an instant before
    # terminating normally. That program should not timeout.
    try:
        exit_code = p.wait(timeout=max(0, deadline - time.time()))
        is_timeout = False
    except subprocess.TimeoutExpired:
        exit_code = None
        is_timeout = True

    try:
        # Kills the process group. Without this line, test_fork_once fails.
        os.killpg(process_group_id, signal.SIGKILL)
    except ProcessLookupError:
        pass

    # Even if the process terminates normally, if we failed to write everything to
    # stdin, we return -1 as the exit code.
    exit_code = (
        -1 if is_timeout or (stdin_data is not None and not write_ok) else exit_code
    )

    return Result(
        timeout=is_timeout,
        exit_code=exit_code,
        stdout=bufs[0].decode(errors="ignore"),
        stderr=bufs[1].decode(errors="ignore"),
    )

bounded_subprocess.bounded_subprocess_async

Asynchronous subprocess execution with bounds on runtime and output size.

podman_run(args: List[str], *, image: str, timeout_seconds: int, max_output_size: int, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None, volumes: List[str] = [], cwd: Optional[str] = None, memory_limit_mb: Optional[int] = None) -> Result async

Run a subprocess in a podman container asynchronously with bounded stdout/stderr capture.

This function wraps `run` but executes the command inside a podman container.
The container is automatically removed after execution. The interface is otherwise
the same as `run`, except for an additional `image` parameter to specify the
container image to use.

Args:
    args: Command arguments to run in the container.
    image: Container image to use.
    timeout_seconds: Maximum time to wait for the process to complete.
    max_output_size: Maximum size in bytes for stdout/stderr capture.
    env: Optional dictionary of environment variables.
    stdin_data: Optional string data to write to stdin.
    stdin_write_timeout: Optional timeout for writing stdin data.
    volumes: Optional list of volume mount specifications (e.g., ["/host/path:/container/path"]).
    cwd: Optional working directory path inside the container.
    memory_limit_mb: Optional memory limit in megabytes for the container.

Example:

```python
import asyncio
from bounded_subprocess.bounded_subprocess_async import podman_run

async def main():
    result = await podman_run(
        ["cat"],
        image="alpine:latest",
        timeout_seconds=5,
        max_output_size=1024,
        stdin_data="hello

", volumes=["/host/data:/container/data"], cwd="/container/data", ) print(result.exit_code) print(result.stdout.strip())

asyncio.run(main())
```
Source code in src/bounded_subprocess/bounded_subprocess_async.py
async def podman_run(
    args: List[str],
    *,
    image: str,
    timeout_seconds: int,
    max_output_size: int,
    env=None,
    stdin_data: Optional[str] = None,
    stdin_write_timeout: Optional[int] = None,
    volumes: List[str] = [],
    cwd: Optional[str] = None,
    memory_limit_mb: Optional[int] = None,
) -> Result:
    """
    Run a subprocess in a podman container asynchronously with bounded stdout/stderr capture.

    This function wraps `run` but executes the command inside a podman container.
    The container is automatically removed after execution. The interface is otherwise
    the same as `run`, except for an additional `image` parameter to specify the
    container image to use.

    Args:
        args: Command arguments to run in the container.
        image: Container image to use.
        timeout_seconds: Maximum time to wait for the process to complete.
        max_output_size: Maximum size in bytes for stdout/stderr capture.
        env: Optional dictionary of environment variables.
        stdin_data: Optional string data to write to stdin.
        stdin_write_timeout: Optional timeout for writing stdin data.
        volumes: Optional list of volume mount specifications (e.g., ["/host/path:/container/path"]).
        cwd: Optional working directory path inside the container.
        memory_limit_mb: Optional memory limit in megabytes for the container.

    Example:

    ```python
    import asyncio
    from bounded_subprocess.bounded_subprocess_async import podman_run

    async def main():
        result = await podman_run(
            ["cat"],
            image="alpine:latest",
            timeout_seconds=5,
            max_output_size=1024,
            stdin_data="hello\n",
            volumes=["/host/data:/container/data"],
            cwd="/container/data",
        )
        print(result.exit_code)
        print(result.stdout.strip())

    asyncio.run(main())
    ```
    """
    deadline = time.time() + timeout_seconds

    # Use --cidfile to get the container ID
    with tempfile.NamedTemporaryFile(
        mode="w", delete=False, prefix="bounded_subprocess_cid_"
    ) as cidfile:
        cidfile_path = cidfile.name

    # Build podman command
    podman_args = ["podman", "run", "--rm", "-i", "--cidfile", cidfile_path]

    # Handle environment variables
    if env is not None:
        # Convert env dict to -e flags for podman
        for key, value in env.items():
            podman_args.extend(["-e", f"{key}={value}"])

    # Handle volume mounts
    for volume in volumes:
        podman_args.extend(["-v", volume])

    # Handle memory limit
    if memory_limit_mb is not None:
        podman_args.extend(["--memory", f"{memory_limit_mb}m", "--memory-swap", f"{memory_limit_mb}m"])

    # Handle working directory
    if cwd is not None:
        podman_args.extend(["-w", cwd])

    podman_args.append(image)
    podman_args.extend(args)

    p = subprocess.Popen(
        podman_args,
        env=None,
        stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        bufsize=MAX_BYTES_PER_READ,
    )

    set_nonblocking(p.stdout)
    set_nonblocking(p.stderr)

    write_ok = True
    if stdin_data is not None:
        set_nonblocking(p.stdin)
        write_ok = await write_nonblocking_async(
            fd=p.stdin,
            data=stdin_data.encode(),
            timeout_seconds=stdin_write_timeout
            if stdin_write_timeout is not None
            else 15,
        )
        try:
            p.stdin.close()
        except (BrokenPipeError, BlockingIOError):
            pass

    bufs = await read_to_eof_async(
        [p.stdout, p.stderr],
        timeout_seconds=timeout_seconds,
        max_len=max_output_size,
    )

    # Busy-wait for the process to exit or the deadline. Why do we need this
    # when read_to_eof_async seems to do this? read_to_eof_async will return
    # when the process closes stdout and stderr, but the process can continue
    # running even after that. So, we really need to wait for an exit code.
    exit_code = None
    is_timeout = False
    while True:
        rc = p.poll()
        if rc is not None:
            exit_code = rc
            break
        remaining = deadline - time.time()
        if remaining <= 0:
            is_timeout = True
            break
        await asyncio.sleep(min(0.05, remaining))

    await _podman_rm(cidfile_path)
    exit_code = (
        -1 if is_timeout or (stdin_data is not None and not write_ok) else exit_code
    )
    return Result(
        timeout=is_timeout,
        exit_code=exit_code if exit_code is not None else -1,
        stdout=bufs[0].decode(errors="ignore"),
        stderr=bufs[1].decode(errors="ignore"),
    )

run(args: List[str], timeout_seconds: int = 15, max_output_size: int = 2048, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None, memory_limit_mb: Optional[int] = None, memory_watchdog_interval_seconds: float = 1.0) -> Result async

Run a subprocess asynchronously with bounded stdout/stderr capture.

The child process is started in a new session and polled until it exits or the timeout elapses. Stdout and stderr are read in nonblocking mode and truncated to max_output_size bytes each. If the timeout elapses, Result.timeout is True and Result.exit_code is -1. If stdin_data cannot be fully written before stdin_write_timeout, Result.exit_code is set to -1 even if the process exits normally. If memory_limit_mb is set, a watchdog checks aggregate peak RSS (VmHWM, summed across the process group) at a fixed interval and kills the process group when the limit is exceeded.

Example:

import asyncio
from bounded_subprocess.bounded_subprocess_async import run

async def main():
    result = await run(
        ["bash", "-lc", "echo ok; echo err 1>&2"],
        timeout_seconds=5,
        max_output_size=1024,
    )
    print(result.exit_code)
    print(result.stdout.strip())
    print(result.stderr.strip())

asyncio.run(main())
Source code in src/bounded_subprocess/bounded_subprocess_async.py
async def run(
    args: List[str],
    timeout_seconds: int = 15,
    max_output_size: int = 2048,
    env=None,
    stdin_data: Optional[str] = None,
    stdin_write_timeout: Optional[int] = None,
    memory_limit_mb: Optional[int] = None,
    memory_watchdog_interval_seconds: float = 1.0,
) -> Result:
    """
    Run a subprocess asynchronously with bounded stdout/stderr capture.

    The child process is started in a new session and polled until it exits or
    the timeout elapses. Stdout and stderr are read in nonblocking mode and
    truncated to `max_output_size` bytes each. If the timeout elapses,
    `Result.timeout` is True and `Result.exit_code` is -1. If `stdin_data`
    cannot be fully written before `stdin_write_timeout`, `Result.exit_code`
    is set to -1 even if the process exits normally. If `memory_limit_mb` is
    set, a watchdog checks aggregate peak RSS (`VmHWM`, summed across the
    process group) at a fixed interval and kills the process group when the
    limit is exceeded.

    Example:

    ```python
    import asyncio
    from bounded_subprocess.bounded_subprocess_async import run

    async def main():
        result = await run(
            ["bash", "-lc", "echo ok; echo err 1>&2"],
            timeout_seconds=5,
            max_output_size=1024,
        )
        print(result.exit_code)
        print(result.stdout.strip())
        print(result.stderr.strip())

    asyncio.run(main())
    ```
    """

    deadline = time.time() + timeout_seconds

    p = subprocess.Popen(
        args,
        env=env,
        stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        start_new_session=True,
        bufsize=MAX_BYTES_PER_READ,
    )
    process_group_id = os.getpgid(p.pid)

    set_nonblocking(p.stdout)
    set_nonblocking(p.stderr)

    write_ok = True
    if stdin_data is not None:
        set_nonblocking(p.stdin)
        write_ok = await write_nonblocking_async(
            fd=p.stdin,
            data=stdin_data.encode(),
            timeout_seconds=stdin_write_timeout
            if stdin_write_timeout is not None
            else 15,
        )
        try:
            p.stdin.close()
        except (BrokenPipeError, BlockingIOError):
            pass

    memory_watchdog_task = None
    if memory_limit_mb is not None:
        memory_watchdog_task = asyncio.create_task(
            _memory_watchdog(
                p=p,
                process_group_id=process_group_id,
                deadline=deadline,
                memory_limit_mb=memory_limit_mb,
                memory_watchdog_interval_seconds=memory_watchdog_interval_seconds,
            )
        )

    bufs = await read_to_eof_async(
        [p.stdout, p.stderr],
        timeout_seconds=timeout_seconds,
        max_len=max_output_size,
    )

    exit_code = None
    is_timeout = False
    while True:
        rc = p.poll()
        if rc is not None:
            exit_code = rc
            break
        remaining = deadline - time.time()
        if remaining <= 0:
            is_timeout = True
            break
        await asyncio.sleep(min(0.05, remaining))

    memory_limit_exceeded = False
    if memory_watchdog_task is not None:
        if memory_watchdog_task.done():
            memory_limit_exceeded = memory_watchdog_task.result()
        else:
            memory_watchdog_task.cancel()
            try:
                await memory_watchdog_task
            except asyncio.CancelledError:
                pass

    try:
        os.killpg(process_group_id, signal.SIGKILL)
    except ProcessLookupError:
        pass

    exit_code = (
        -1
        if is_timeout
        or (stdin_data is not None and not write_ok)
        or memory_limit_exceeded
        else exit_code
    )

    return Result(
        timeout=is_timeout,
        exit_code=exit_code if exit_code is not None else -1,
        stdout=bufs[0].decode(errors="ignore"),
        stderr=bufs[1].decode(errors="ignore"),
    )

bounded_subprocess.interactive

Interactive subprocess wrapper with nonblocking stdin/stdout.

Interactive(args: List[str], read_buffer_size: int)

Interact with a subprocess using nonblocking I/O.

The subprocess is started with pipes for stdin and stdout. Writes are
bounded by a timeout, and reads return complete lines (without the trailing
newline). The internal buffer is capped at `read_buffer_size`; older bytes
are dropped if output grows without line breaks.

Example:

```python
from bounded_subprocess.interactive import Interactive

proc = Interactive(["python3", "-u", "-c", "print(input())"], read_buffer_size=4096)
ok = proc.write(b"hello

", timeout_seconds=1) line = proc.read_line(timeout_seconds=1) rc = proc.close(nice_timeout_seconds=1) ```

Start a subprocess with a bounded stdout buffer.

The child process is created with nonblocking stdin/stdout pipes. The internal read buffer keeps at most read_buffer_size bytes of recent output.

Source code in src/bounded_subprocess/interactive.py
def __init__(self, args: List[str], read_buffer_size: int) -> None:
    """
    Start a subprocess with a bounded stdout buffer.

    The child process is created with nonblocking stdin/stdout pipes. The
    internal read buffer keeps at most `read_buffer_size` bytes of recent
    output.
    """
    self._state = _InteractiveState(args, read_buffer_size)

close(nice_timeout_seconds: int) -> int

Close pipes, wait briefly, then kill the subprocess.

Returns the subprocess return code, or -9 if the process is still running when it is killed.

Source code in src/bounded_subprocess/interactive.py
def close(self, nice_timeout_seconds: int) -> int:
    """
    Close pipes, wait briefly, then kill the subprocess.

    Returns the subprocess return code, or -9 if the process is still
    running when it is killed.
    """
    self._state.close_pipes()
    for _ in range(nice_timeout_seconds):
        if self._state.poll() is not None:
            break
        time.sleep(1)
    self._state.kill()
    return self._state.return_code()

read_line(timeout_seconds: int) -> Optional[bytes]

Read the next line from stdout, or return None on timeout/EOF.

The returned line does not include the trailing newline byte.

Source code in src/bounded_subprocess/interactive.py
def read_line(self, timeout_seconds: int) -> Optional[bytes]:
    """
    Read the next line from stdout, or return None on timeout/EOF.

    The returned line does not include the trailing newline byte.
    """
    line = self._state.pop_line(0)
    if line is not None:
        return line
    if self._state.poll() is not None:
        return None
    deadline = time.time() + timeout_seconds
    while time.time() < deadline:
        new_bytes = self._state.read_chunk()
        if new_bytes is None:
            time.sleep(_SLEEP_AFTER_WOUND_BLOCK)
            continue
        if len(new_bytes) == 0:
            return None
        prev_len = len(self._state.stdout_saved_bytes)
        self._state.append_stdout(new_bytes)
        line = self._state.pop_line(prev_len)
        if line is not None:
            return line
        self._state.trim_stdout()
        time.sleep(_SLEEP_AFTER_WOUND_BLOCK)
    return None

write(stdin_data: bytes, timeout_seconds: int) -> bool

Write stdin_data to the subprocess within timeout_seconds.

Returns False if the subprocess has already exited or if writing fails.

Source code in src/bounded_subprocess/interactive.py
def write(self, stdin_data: bytes, timeout_seconds: int) -> bool:
    """
    Write `stdin_data` to the subprocess within `timeout_seconds`.

    Returns False if the subprocess has already exited or if writing fails.
    """
    if self._state.poll() is not None:
        return False
    return write_loop_sync(
        self._state.write_chunk,
        stdin_data,
        timeout_seconds,
        sleep_interval=_SLEEP_AFTER_WOUND_BLOCK,
    )

bounded_subprocess.util

Utilities for bounded subprocess I/O and nonblocking pipe helpers.

Result(timeout, exit_code, stdout, stderr) dataclass

Result of a bounded subprocess run.

The stdout and stderr fields contain at most the requested number of bytes, decoded with errors ignored. timeout is True only when the overall timeout elapses. When a timeout or stdin write failure occurs, exit_code is -1.

Source code in src/bounded_subprocess/util.py
def __init__(self, timeout, exit_code, stdout, stderr):
    self.timeout = timeout
    self.exit_code = exit_code
    self.stdout = stdout
    self.stderr = stderr

can_read(fd) async

Wait until the file descriptor is readable.

Source code in src/bounded_subprocess/util.py
async def can_read(fd):
    """
    Wait until the file descriptor is readable.
    """
    future = asyncio.Future()
    loop = asyncio.get_running_loop()
    loop.add_reader(fd, future.set_result, None)
    future.add_done_callback(lambda f: loop.remove_reader(fd))
    await future

can_write(fd) async

Wait until the file descriptor is writable.

Source code in src/bounded_subprocess/util.py
async def can_write(fd):
    """
    Wait until the file descriptor is writable.
    """
    future = asyncio.Future()
    loop = asyncio.get_running_loop()
    loop.add_writer(fd, future.set_result, None)
    future.add_done_callback(lambda f: loop.remove_writer(fd))
    await future

read_to_eof_async(files: list, *, timeout_seconds: int, max_len: int) -> List[bytes] async

Asynchronously read from nonblocking FDs until EOF or timeout.

The returned list preserves the order of the files argument.

Source code in src/bounded_subprocess/util.py
async def read_to_eof_async(
    files: list,
    *,
    timeout_seconds: int,
    max_len: int,
) -> List[bytes]:
    """
    Asynchronously read from nonblocking FDs until EOF or timeout.

    The returned list preserves the order of the `files` argument.
    """
    bufs = {fd: bytearray() for fd in files}
    avail = list(files)
    end_at = time.time() + timeout_seconds

    while avail and time.time() < end_at:
        remaining = max(0, end_at - time.time())
        fd = await _wait_for_any_read(avail, remaining)
        if fd is None:
            break
        try:
            chunk = fd.read(MAX_BYTES_PER_READ)
            if not chunk:
                avail.remove(fd)
                continue
            buf = bufs[fd]
            if len(buf) < max_len:
                keep = max_len - len(buf)
                buf.extend(chunk[:keep])
        except (BlockingIOError, InterruptedError):
            pass
        except OSError:
            avail.remove(fd)

    return [bytes(bufs[fd]) for fd in files]

read_to_eof_sync(files: list, *, timeout_seconds: int, max_len: int) -> Optional[List[bytes]]

Read from nonblocking file descriptors until EOF, with limits on how long to wait and the maximum number of bytes to read.

Returns the data read, or None if the timeout elapsed.

Source code in src/bounded_subprocess/util.py
def read_to_eof_sync(
    files: list,
    *,
    timeout_seconds: int,
    max_len: int,
) -> Optional[List[bytes]]:
    """
    Read from nonblocking file descriptors until EOF, with limits on how long
    to wait and the maximum number of bytes to read.

    Returns the data read, or None if the timeout elapsed.
    """
    bufs = {fd: bytearray() for fd in files}
    avail = set(files)
    end_at = time.time() + timeout_seconds

    while avail and time.time() < end_at:
        # Wait only as long as we still have time left
        remaining = max(0, end_at - time.time())
        ready, _, _ = select.select(avail, [], [], remaining)
        if not ready:
            break
        for fd in ready:
            try:
                chunk = fd.read(MAX_BYTES_PER_READ)
                if not chunk:
                    # Reached EOF, so we can stop reading from this file.
                    avail.discard(fd)
                    continue
                the_buf = bufs[fd]
                # Keep at most max_len bytes, silently dropping any extra bytes.
                if len(the_buf) < max_len:
                    keep = max_len - len(the_buf)
                    the_buf.extend(chunk[:keep])
            except (BlockingIOError, InterruptedError):
                # Would-block, so we can't read from this file.
                pass
            except OSError:
                # Broken pipe, bad fd, etc.
                avail.discard(fd)

    # Preserve the caller-supplied order
    return [bytes(bufs[fd]) for fd in files]

set_nonblocking(reader)

Mark a file descriptor as nonblocking.

This is required before using the read/write helpers that rely on nonblocking behavior.

Source code in src/bounded_subprocess/util.py
def set_nonblocking(reader):
    """
    Mark a file descriptor as nonblocking.

    This is required before using the read/write helpers that rely on
    nonblocking behavior.
    """
    fd = reader.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

write_loop_sync(write_chunk: Callable[[memoryview], tuple[int, bool]], data: bytes, timeout_seconds: float, *, sleep_interval: float) -> bool

Repeatedly write data using write_chunk until complete or timeout.

The write_chunk callback returns (bytes_written, keep_going). If keep_going is False, this function returns False immediately.

Source code in src/bounded_subprocess/util.py
def write_loop_sync(
    write_chunk: Callable[[memoryview], tuple[int, bool]],
    data: bytes,
    timeout_seconds: float,
    *,
    sleep_interval: float,
) -> bool:
    """
    Repeatedly write data using `write_chunk` until complete or timeout.

    The `write_chunk` callback returns `(bytes_written, keep_going)`. If
    `keep_going` is False, this function returns False immediately.
    """
    mv = memoryview(data)
    start = 0
    start_time = time.time()
    while start < len(mv):
        written, keep_going = write_chunk(mv[start:])
        start += written
        if not keep_going:
            return False
        if start < len(mv):
            if time.time() - start_time > timeout_seconds:
                return False
            time.sleep(sleep_interval)
    return True

write_nonblocking_async(*, fd, data: bytes, timeout_seconds: int) -> bool async

Writes to a nonblocking file descriptor with the timeout.

Returns True if all the data was written. False indicates that there was either a timeout or a broken pipe.

This function does not close the file descriptor.

Source code in src/bounded_subprocess/util.py
async def write_nonblocking_async(*, fd, data: bytes, timeout_seconds: int) -> bool:
    """
    Writes to a nonblocking file descriptor with the timeout.

    Returns True if all the data was written. False indicates that there was
    either a timeout or a broken pipe.

    This function does not close the file descriptor.
    """
    start_time_seconds = time.time()

    # A slice, data[..], would create a copy. A memoryview does not.
    mv = memoryview(data)
    start = 0
    while start < len(mv):
        try:
            # Write as much as possible without blocking.
            written = fd.write(mv[start:])
            if written is None:
                written = 0
            start = start + written
        except BrokenPipeError:
            return False
        except BlockingIOError as exn:
            if exn.errno != errno.EAGAIN:
                # NOTE(arjun): I am not certain why this would happen. However,
                # you are only supposed to retry on EAGAIN.
                return False
            # Some, but not all the bytes were written.
            start = start + exn.characters_written

            # Compute how much more time we have left.
            wait_timeout = timeout_seconds - (time.time() - start_time_seconds)
            # We are already past the deadline, so abort.
            if wait_timeout <= 0:
                return False
            try:
                await asyncio.wait_for(can_write(fd), wait_timeout)
            except asyncio.TimeoutError:
                # Deadline elapsed, so abort.
                return False

    return True

write_nonblocking_sync(*, fd, data: bytes, timeout_seconds: int) -> bool

Writes to a nonblocking file descriptor with the timeout.

Returns True if all the data was written. False indicates that there was either a timeout or a broken pipe.

This function does not close the file descriptor.

Source code in src/bounded_subprocess/util.py
def write_nonblocking_sync(*, fd, data: bytes, timeout_seconds: int) -> bool:
    """
    Writes to a nonblocking file descriptor with the timeout.

    Returns True if all the data was written. False indicates that there was
    either a timeout or a broken pipe.

    This function does not close the file descriptor.
    """
    start_time_seconds = time.time()

    # A slice, data[..], would create a copy. A memoryview does not.
    mv = memoryview(data)
    start = 0
    while start < len(mv):
        try:
            # Write as much as possible without blocking.
            written = fd.write(mv[start:])
            if written is None:
                written = 0
            start = start + written
        except BrokenPipeError:
            return False
        except BlockingIOError as exn:
            if exn.errno != errno.EAGAIN:
                # NOTE(arjun): I am not certain why this would happen. However,
                # you are only supposed to retry on EAGAIN.
                return False
            # Some, but not all the bytes were written.
            start = start + exn.characters_written

            # Compute how much more time we have left.
            wait_timeout = timeout_seconds - (time.time() - start_time_seconds)
            # We are already past the deadline, so abort.
            if wait_timeout <= 0:
                return False
            select_result = select.select([], [fd], [], wait_timeout)
            if len(select_result[1]) == 0:
                # Deadline elapsed, so abort.
                return False

    return True