bounded_subprocess
Why this library exists
Python's subprocess module is enough until the child process misbehaves.
A buggy program can:
- spawn grandchildren that outlive the parent's
timeout; - block on stdin and never read what you wrote;
- print gigabytes to stdout and exhaust the parent's memory before you ever
call
.communicate().
bounded_subprocess is a small wrapper around subprocess that adds three
hard bounds:
- Process-group cleanup. The child runs in its own session, so on timeout we kill the entire process group — including anything it forked.
- Bounded output capture. We keep at most
max_output_sizebytes from each of stdout and stderr — the prefix by default, the suffix whentail=True— and discard the rest. - Wall-clock timeout. The run has a hard bound while collecting output and
waiting for the child to exit. If you pass
stdin_data, writing stdin has its ownstdin_write_timeout; set it no larger than your desired overall deadline if that matters for your caller.
This is not isolation: the child can still touch the filesystem, the
network, or escape into a new session of its own. If you need isolation,
podman_run runs the same interface inside a container.
The library comes in several flavors, all built on the same primitives:
| Function / class | When to reach for it |
|---|---|
run |
One-shot synchronous call. |
run (async) |
The same, but await-able, and with an optional memory watchdog. |
Interactive |
A long-lived child you talk to line by line. |
podman_run |
Async execution inside a podman container. |
podman_run_stream_lines |
Async line streaming from a podman container. |
Quickstart
Run a command synchronously
from bounded_subprocess import run
result = run(["echo", "hello"], timeout_seconds=5)
print(result.exit_code) # 0
print(result.stdout.strip()) # 'hello'
Run a command asynchronously
import asyncio
from bounded_subprocess.bounded_subprocess_async import run
async def main():
result = await run(
["bash", "-lc", "echo ok; echo err 1>&2"],
timeout_seconds=5,
)
print(result.exit_code, result.stdout.strip(), result.stderr.strip())
asyncio.run(main())
Talk to a long-running child
from bounded_subprocess.interactive import Interactive
proc = Interactive(["python3", "-iu"], read_buffer_size=4096)
proc.write(b"print(1 + 2)\n", timeout_seconds=1)
print(proc.read_line(timeout_seconds=1)) # b'3'
proc.close(nice_timeout_seconds=1)
Run a command in a container
import asyncio
from bounded_subprocess.bounded_subprocess_async import podman_run
async def main():
result = await podman_run(
["cat"],
image="alpine:latest",
timeout_seconds=5,
max_output_size=1024,
stdin_data="hello\n",
)
print(result.stdout)
asyncio.run(main())
Stream lines from a container
import asyncio
from contextlib import aclosing
from bounded_subprocess.bounded_subprocess_async import podman_run_stream_lines
async def main():
async with aclosing(podman_run_stream_lines(
["sh", "-c", "printf '%s\n' one two three"],
image="alpine:latest",
timeout_seconds=5,
max_line_size=1024,
)) as lines:
async for line in lines:
print(line)
break
asyncio.run(main())
Each entry point takes plenty of additional knobs (working directory, environment, stdin, memory limit, container volumes, …); see the reference below.
Timing contract for run
timeout_seconds bounds output collection and the final wait for the launched
child process. Two edge cases are worth knowing:
- If
stdin_datais supplied, the write phase is controlled bystdin_write_timeout(default 15 seconds), not preempted bytimeout_seconds. - If a descendant process inherits stdout or stderr, the output pipe can remain
open after the direct child exits. In that case
runmay wait untiltimeout_secondsbefore killing the process group, even if the direct child has already exited.
API reference
Synchronous execution
bounded_subprocess.bounded_subprocess.run
run(args: List[str], timeout_seconds: int = 15, max_output_size: int = 2048, tail: bool = False, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None, cwd: Optional[str] = None) -> Result
Run a subprocess with a timeout and bounded output capture.
The child runs in its own session, so on timeout we kill the entire
process group, not just the child itself. We read stdout and stderr
nonblockingly and keep at most max_output_size bytes from each — the
prefix by default, or the suffix when tail=True.
On timeout, Result.timeout is True and Result.exit_code is -1.
timeout_seconds bounds output collection and the final wait for the
launched child process. If you pass stdin_data, the stdin write phase is
governed by stdin_write_timeout seconds (default 15), not preempted by
timeout_seconds; if the write cannot finish in that time, we force
exit_code to -1 even when the child exits cleanly.
If a descendant process inherits stdout or stderr, that pipe can remain
open after the direct child exits. In that case this function may wait
until timeout_seconds before killing the process group.
Asynchronous execution
bounded_subprocess.bounded_subprocess_async.run
async
run(args: List[str], timeout_seconds: int = 15, max_output_size: int = 2048, tail: bool = False, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None, memory_limit_mb: Optional[int] = None, memory_watchdog_interval_seconds: float = 1.0, cwd: Optional[str] = None) -> Result
Async counterpart of bounded_subprocess.run, with an optional memory limit.
The child runs in its own session. We read stdout and stderr nonblockingly
and keep at most max_output_size bytes from each (prefix by default;
suffix when tail=True). On timeout, Result.timeout is True and
Result.exit_code is -1.
timeout_seconds bounds output collection and the final wait for the
launched child process. If you pass stdin_data, the stdin write phase is
governed by stdin_write_timeout seconds (default 15), not preempted by
timeout_seconds; if the write cannot finish in that time, we force
exit_code to -1 even when the child exits cleanly.
If a descendant process inherits stdout or stderr, that pipe can remain
open after the direct child exits. In that case this function may wait
until timeout_seconds before killing the process group.
When you set memory_limit_mb, a watchdog polls aggregate peak RSS
(VmHWM from /proc, summed across the process group) every
memory_watchdog_interval_seconds and kills the whole group when usage
exceeds the limit. We accept this non-cgroup approximation deliberately:
it overcounts shared pages and can miss very short-lived children, but
it works without elevated privileges on a typical cluster node.
bounded_subprocess.bounded_subprocess_async.podman_run
async
podman_run(args: List[str], *, image: str, timeout_seconds: int, max_output_size: int, tail: bool = False, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None, volumes: List[str] = [], cwd: Optional[str] = None, memory_limit_mb: Optional[int] = None, entrypoint: Optional[str] = None) -> Result
Run a command inside a podman container, with the same bounds as `run`.
We launch the container with `--rm -i`, track its id through a
`--cidfile`, and force-remove it when the call returns (whether the
command succeeded, timed out, or errored). `volumes` flow through as
`-v` flags, `env` as `-e` flags, and `cwd` as `-w`.
`entrypoint=""` is meaningful: it clears the image's ENTRYPOINT, so
`args` becomes the full command. Passing `None` (the default) omits
the flag entirely and leaves the image's ENTRYPOINT in place.
```python
import asyncio
from bounded_subprocess.bounded_subprocess_async import podman_run
async def main():
result = await podman_run(
["cat"],
image="alpine:latest",
timeout_seconds=5,
max_output_size=1024,
stdin_data="hello
", volumes=["/host/data:/container/data"], cwd="/container/data", ) print(result.exit_code, result.stdout)
asyncio.run(main())
```
bounded_subprocess.bounded_subprocess_async.podman_run_stream_lines
async
podman_run_stream_lines(args: List[str], *, image: str, timeout_seconds: int, max_line_size: int, env=None, stdin_data: Optional[str] = None, stdin_write_timeout: Optional[int] = None, volumes: List[str] = [], cwd: Optional[str] = None, memory_limit_mb: Optional[int] = None, entrypoint: Optional[str] = None, stderr_max_output_size: int = 2048) -> AsyncIterator[str]
Stream stdout lines from a command inside a podman container.
This is the streaming counterpart to podman_run for the case where stdin
is provided all at once as a str, but stdout is consumed incrementally.
The async iterator yields decoded str lines without trailing newlines.
Pending line data is capped at max_line_size bytes; if a line grows past
the cap before a newline arrives, older bytes are discarded.
The container is force-removed in a finally block, so breaking out early
works as long as the async generator is closed by the caller. For
deterministic early termination when storing the generator in a variable,
use contextlib.aclosing or call aclose().
from contextlib import aclosing
from bounded_subprocess.bounded_subprocess_async import podman_run_stream_lines
async with aclosing(podman_run_stream_lines(
["sh", "-c", "printf '%s\n' one two three"],
image="alpine:latest",
timeout_seconds=5,
max_line_size=1024,
)) as lines:
async for line in lines:
print(line)
break
Interactive execution
bounded_subprocess.interactive.Interactive
A long-lived subprocess you can write to and read lines from.
The child runs with nonblocking stdin/stdout pipes. write honors a
timeout; read_line returns one complete line at a time (without the
trailing newline) or None on timeout / EOF.
read_buffer_size caps how many bytes of recent stdout we retain while
waiting for a newline. Lines longer than this lose bytes from the front
— useful when a child spews structured output without ever emitting
\n, but lossy if you actually need to read very long lines.
from bounded_subprocess.interactive import Interactive
proc = Interactive(["python3", "-u", "-c", "print(input())"], read_buffer_size=4096)
proc.write(b"hello\n", timeout_seconds=1)
line = proc.read_line(timeout_seconds=1) # b'hello'
rc = proc.close(nice_timeout_seconds=1)
Spawn the child process. See the class docstring for parameter semantics.
close
Close the pipes, wait up to nice_timeout_seconds for a clean exit,
then SIGKILL the child if it is still running. Returns the child's
exit code, or -9 if we had to kill it.
read_line
Read the next line of stdout (without the trailing newline), or
return None on timeout / EOF.
bounded_subprocess.interactive_async.Interactive
Async counterpart of bounded_subprocess.interactive.Interactive.
Same model — a long-lived child with nonblocking line-oriented I/O — but
write, read_line, and close are coroutines. read_buffer_size
caps retained stdout the same way; lines longer than the buffer lose
bytes from the front.
import asyncio
from bounded_subprocess.interactive_async import Interactive
async def main():
proc = Interactive(["python3", "-iu"], read_buffer_size=4096)
await proc.write(b"print(1 + 2)\n", timeout_seconds=1)
print(await proc.read_line(timeout_seconds=1)) # b'3'
await proc.close(nice_timeout_seconds=1)
asyncio.run(main())
Spawn the child process. See the class docstring for parameter semantics.
close
async
Close the pipes, wait up to nice_timeout_seconds for a clean exit,
then SIGKILL the child if it is still running. Returns the child's
exit code, or -9 if we had to kill it.
read_line
async
Read the next line of stdout (without the trailing newline), or
return None on timeout / EOF.
Result type
bounded_subprocess.util.Result
dataclass
The result of a bounded subprocess run.
stdout and stderr each contain at most max_output_size bytes; we
decode them with errors="ignore". timeout is True only when the
wall-clock deadline elapsed. exit_code holds the child's exit status,
or -1 when the run aborted because of a timeout, a failed stdin write,
or the memory watchdog.