Skip to content

bounded_subprocess

Why this library exists

Python's subprocess module is enough until the child process misbehaves. A buggy program can:

  • spawn grandchildren that outlive the parent's timeout;
  • block on stdin and never read what you wrote;
  • print gigabytes to stdout and exhaust the parent's memory before you ever call .communicate().

bounded_subprocess is a small wrapper around subprocess that adds three hard bounds:

  1. Process-group cleanup. The child runs in its own session, so on timeout we kill the entire process group — including anything it forked.
  2. Bounded output capture. We keep at most max_output_size bytes from each of stdout and stderr — the prefix by default, the suffix when tail=True — and discard the rest.
  3. Wall-clock timeout. The run has a hard bound while collecting output and waiting for the child to exit. If you pass stdin_data, writing stdin has its own stdin_write_timeout; set it no larger than your desired overall deadline if that matters for your caller.

This is not isolation: the child can still touch the filesystem, the network, or escape into a new session of its own. If you need isolation, podman_run runs the same interface inside a container.

The library comes in several flavors, all built on the same primitives:

Function / class When to reach for it
run One-shot synchronous call.
run (async) The same, but await-able, and with an optional memory watchdog.
Interactive A long-lived child you talk to line by line.
podman_run Async execution inside a podman container.
podman_run_stream_lines Async line streaming from a podman container.

Quickstart

Run a command synchronously

from bounded_subprocess import run

result = run(["echo", "hello"], timeout_seconds=5)
print(result.exit_code)        # 0
print(result.stdout.strip())   # 'hello'

Run a command asynchronously

import asyncio
from bounded_subprocess.bounded_subprocess_async import run

async def main():
    result = await run(
        ["bash", "-lc", "echo ok; echo err 1>&2"],
        timeout_seconds=5,
    )
    print(result.exit_code, result.stdout.strip(), result.stderr.strip())

asyncio.run(main())

Talk to a long-running child

from bounded_subprocess.interactive import Interactive

proc = Interactive(["python3", "-iu"], read_buffer_size=4096)
proc.write(b"print(1 + 2)\n", timeout_seconds=1)
print(proc.read_line(timeout_seconds=1))  # b'3'
proc.close(nice_timeout_seconds=1)

Run a command in a container

import asyncio
from bounded_subprocess.bounded_subprocess_async import podman_run

async def main():
    result = await podman_run(
        ["cat"],
        image="alpine:latest",
        timeout_seconds=5,
        max_output_size=1024,
        stdin_data="hello\n",
    )
    print(result.stdout)

asyncio.run(main())

Stream lines from a container

import asyncio
from contextlib import aclosing
from bounded_subprocess.bounded_subprocess_async import podman_run_stream_lines

async def main():
    async with aclosing(podman_run_stream_lines(
        ["sh", "-c", "printf '%s\n' one two three"],
        image="alpine:latest",
        timeout_seconds=5,
        max_line_size=1024,
    )) as lines:
        async for line in lines:
            print(line)
            break

asyncio.run(main())

Each entry point takes plenty of additional knobs (working directory, environment, stdin, memory limit, container volumes, …); see the reference below.

Timing contract for run

timeout_seconds bounds output collection and the final wait for the launched child process. Two edge cases are worth knowing:

  • If stdin_data is supplied, the write phase is controlled by stdin_write_timeout (default 15 seconds), not preempted by timeout_seconds.
  • If a descendant process inherits stdout or stderr, the output pipe can remain open after the direct child exits. In that case run may wait until timeout_seconds before killing the process group, even if the direct child has already exited.

API reference

Synchronous execution

bounded_subprocess.bounded_subprocess.run

run(args: List[str], timeout_seconds: int = 15, max_output_size: int = 2048, tail: bool = False, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None, cwd: Optional[str] = None) -> Result

Run a subprocess with a timeout and bounded output capture.

The child runs in its own session, so on timeout we kill the entire process group, not just the child itself. We read stdout and stderr nonblockingly and keep at most max_output_size bytes from each — the prefix by default, or the suffix when tail=True.

On timeout, Result.timeout is True and Result.exit_code is -1. timeout_seconds bounds output collection and the final wait for the launched child process. If you pass stdin_data, the stdin write phase is governed by stdin_write_timeout seconds (default 15), not preempted by timeout_seconds; if the write cannot finish in that time, we force exit_code to -1 even when the child exits cleanly.

If a descendant process inherits stdout or stderr, that pipe can remain open after the direct child exits. In that case this function may wait until timeout_seconds before killing the process group.

from bounded_subprocess import run

result = run(
    ["bash", "-lc", "echo ok; echo err 1>&2"],
    timeout_seconds=5,
    max_output_size=1024,
)
print(result.exit_code, result.stdout.strip(), result.stderr.strip())

Asynchronous execution

bounded_subprocess.bounded_subprocess_async.run async

run(args: List[str], timeout_seconds: int = 15, max_output_size: int = 2048, tail: bool = False, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None, memory_limit_mb: Optional[int] = None, memory_watchdog_interval_seconds: float = 1.0, cwd: Optional[str] = None) -> Result

Async counterpart of bounded_subprocess.run, with an optional memory limit.

The child runs in its own session. We read stdout and stderr nonblockingly and keep at most max_output_size bytes from each (prefix by default; suffix when tail=True). On timeout, Result.timeout is True and Result.exit_code is -1.

timeout_seconds bounds output collection and the final wait for the launched child process. If you pass stdin_data, the stdin write phase is governed by stdin_write_timeout seconds (default 15), not preempted by timeout_seconds; if the write cannot finish in that time, we force exit_code to -1 even when the child exits cleanly.

If a descendant process inherits stdout or stderr, that pipe can remain open after the direct child exits. In that case this function may wait until timeout_seconds before killing the process group.

When you set memory_limit_mb, a watchdog polls aggregate peak RSS (VmHWM from /proc, summed across the process group) every memory_watchdog_interval_seconds and kills the whole group when usage exceeds the limit. We accept this non-cgroup approximation deliberately: it overcounts shared pages and can miss very short-lived children, but it works without elevated privileges on a typical cluster node.

import asyncio
from bounded_subprocess.bounded_subprocess_async import run

async def main():
    result = await run(
        ["bash", "-lc", "echo ok; echo err 1>&2"],
        timeout_seconds=5,
        max_output_size=1024,
    )
    print(result.exit_code, result.stdout.strip(), result.stderr.strip())

asyncio.run(main())

bounded_subprocess.bounded_subprocess_async.podman_run async

podman_run(args: List[str], *, image: str, timeout_seconds: int, max_output_size: int, tail: bool = False, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None, volumes: List[str] = [], cwd: Optional[str] = None, memory_limit_mb: Optional[int] = None, entrypoint: Optional[str] = None) -> Result
Run a command inside a podman container, with the same bounds as `run`.

We launch the container with `--rm -i`, track its id through a
`--cidfile`, and force-remove it when the call returns (whether the
command succeeded, timed out, or errored). `volumes` flow through as
`-v` flags, `env` as `-e` flags, and `cwd` as `-w`.

`entrypoint=""` is meaningful: it clears the image's ENTRYPOINT, so
`args` becomes the full command. Passing `None` (the default) omits
the flag entirely and leaves the image's ENTRYPOINT in place.

```python
import asyncio
from bounded_subprocess.bounded_subprocess_async import podman_run

async def main():
    result = await podman_run(
        ["cat"],
        image="alpine:latest",
        timeout_seconds=5,
        max_output_size=1024,
        stdin_data="hello

", volumes=["/host/data:/container/data"], cwd="/container/data", ) print(result.exit_code, result.stdout)

asyncio.run(main())
```

bounded_subprocess.bounded_subprocess_async.podman_run_stream_lines async

podman_run_stream_lines(args: List[str], *, image: str, timeout_seconds: int, max_line_size: int, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None, volumes: List[str] = [], cwd: Optional[str] = None, memory_limit_mb: Optional[int] = None, entrypoint: Optional[str] = None, stderr_max_output_size: int = 2048) -> AsyncIterator[str]

Stream stdout lines from a command inside a podman container.

This is the streaming counterpart to podman_run for the case where stdin is provided all at once as a str, but stdout is consumed incrementally. The async iterator yields decoded str lines without trailing newlines. Pending line data is capped at max_line_size bytes; if a line grows past the cap before a newline arrives, older bytes are discarded.

The container is force-removed in a finally block, so breaking out early works as long as the async generator is closed by the caller. For deterministic early termination when storing the generator in a variable, use contextlib.aclosing or call aclose().

from contextlib import aclosing
from bounded_subprocess.bounded_subprocess_async import podman_run_stream_lines

async with aclosing(podman_run_stream_lines(
    ["sh", "-c", "printf '%s\n' one two three"],
    image="alpine:latest",
    timeout_seconds=5,
    max_line_size=1024,
)) as lines:
    async for line in lines:
        print(line)
        break

Interactive execution

bounded_subprocess.interactive.Interactive

Interactive(args: List[str], read_buffer_size: int, cwd: Optional[str] = None)

A long-lived subprocess you can write to and read lines from.

The child runs with nonblocking stdin/stdout pipes. write honors a timeout; read_line returns one complete line at a time (without the trailing newline) or None on timeout / EOF.

read_buffer_size caps how many bytes of recent stdout we retain while waiting for a newline. Lines longer than this lose bytes from the front — useful when a child spews structured output without ever emitting \n, but lossy if you actually need to read very long lines.

from bounded_subprocess.interactive import Interactive

proc = Interactive(["python3", "-u", "-c", "print(input())"], read_buffer_size=4096)
proc.write(b"hello\n", timeout_seconds=1)
line = proc.read_line(timeout_seconds=1)   # b'hello'
rc = proc.close(nice_timeout_seconds=1)

Spawn the child process. See the class docstring for parameter semantics.

close

close(nice_timeout_seconds: int) -> int

Close the pipes, wait up to nice_timeout_seconds for a clean exit, then SIGKILL the child if it is still running. Returns the child's exit code, or -9 if we had to kill it.

read_line

read_line(timeout_seconds: int) -> Optional[bytes]

Read the next line of stdout (without the trailing newline), or return None on timeout / EOF.

write

write(stdin_data: bytes, timeout_seconds: int) -> bool

Write stdin_data to the child within the timeout. Returns False if the child already exited or the write failed (e.g. broken pipe).

bounded_subprocess.interactive_async.Interactive

Interactive(args: List[str], read_buffer_size: int, cwd: Optional[str] = None)

Async counterpart of bounded_subprocess.interactive.Interactive.

Same model — a long-lived child with nonblocking line-oriented I/O — but write, read_line, and close are coroutines. read_buffer_size caps retained stdout the same way; lines longer than the buffer lose bytes from the front.

import asyncio
from bounded_subprocess.interactive_async import Interactive

async def main():
    proc = Interactive(["python3", "-iu"], read_buffer_size=4096)
    await proc.write(b"print(1 + 2)\n", timeout_seconds=1)
    print(await proc.read_line(timeout_seconds=1))   # b'3'
    await proc.close(nice_timeout_seconds=1)

asyncio.run(main())

Spawn the child process. See the class docstring for parameter semantics.

close async

close(nice_timeout_seconds: int) -> int

Close the pipes, wait up to nice_timeout_seconds for a clean exit, then SIGKILL the child if it is still running. Returns the child's exit code, or -9 if we had to kill it.

read_line async

read_line(timeout_seconds: int) -> Optional[bytes]

Read the next line of stdout (without the trailing newline), or return None on timeout / EOF.

write async

write(stdin_data: bytes, timeout_seconds: int) -> bool

Write stdin_data to the child within the timeout. Returns False if the child already exited or the write failed.

Result type

bounded_subprocess.util.Result dataclass

Result(timeout, exit_code, stdout, stderr)

The result of a bounded subprocess run.

stdout and stderr each contain at most max_output_size bytes; we decode them with errors="ignore". timeout is True only when the wall-clock deadline elapsed. exit_code holds the child's exit status, or -1 when the run aborted because of a timeout, a failed stdin write, or the memory watchdog.