Skip to content

Abstractions

abstractions.async_abstractions

This module contains abstractions for async programming.

run_bounded(tasks: AsyncIterator[Awaitable[T]], limit: int) -> AsyncIterator[Awaitable[T]] async

There are many situations where we want to run lots of tasks with a bounded amount of concurrency. For example, we may want to make thousands of requests to an API, but not not run more than a few requests concurrently. We may also want to run thousands of containerized jobs on a machine that has multiple cores, but not run too many containers concurrently.

The simplest way to achieve this is with a semaphore:

sema = asyncio.Semaphore(limit)

async def the_task(t):
    async with semaphore:
        # Do the task.

asyncio.gather(*(the_task(t) for t in tasks))

However, this approach does not address another problem that often occurs in these situations, which is that preparing tasks can take a lot of time and memory. For example, if each task is sourced from a file, and the file has thousands of lines, we are forced to read the entire file into memory before we can start running tasks.

The run_bounded function addresses this problem. You can use it to write code such as the following:

async def task(line):
    # Do the task.

async def task_gen():
    with open("tasks.txt") as f:
        for line in f:
            yield task(line)

async for result in run_bounded(task_gen(), limit=10):
    print(result)

The code above will run 10 tasks concurrently and also stream the file line-by-line. The results will be out of order.

Source code in src/abstractions/async_abstractions.py
async def run_bounded(
    tasks: AsyncIterator[Awaitable[T]], limit: int
) -> AsyncIterator[Awaitable[T]]:
    """
    There are many situations where we want to run lots of tasks with a bounded
    amount of concurrency. For example, we may want to make thousands of
    requests to an API, but not not run more than a few requests concurrently.
    We may also want to run thousands of containerized jobs on a machine that
    has multiple cores, but not run too many containers concurrently.

    The simplest way to achieve this is with a semaphore:

    ```python
    sema = asyncio.Semaphore(limit)

    async def the_task(t):
        async with semaphore:
            # Do the task.

    asyncio.gather(*(the_task(t) for t in tasks))
    ```

    However, this approach does not address another problem that often occurs
    in these situations, which is that preparing tasks can take a lot of time
    and memory. For example, if each task is sourced from a file, and the
    file has thousands of lines, we are forced to read the entire file into
    memory before we can start running tasks.

    The `run_bounded` function addresses this problem. You can use it to
    write code such as the following:

    ```python
    async def task(line):
        # Do the task.

    async def task_gen():
        with open("tasks.txt") as f:
            for line in f:
                yield task(line)

    async for result in run_bounded(task_gen(), limit=10):
        print(result)
    ```

    The code above will run 10 tasks concurrently and also stream the file
    line-by-line. The results will be **out of order**.
    """

    # A unique object that we put on a queue. When a task receives this, it
    # will shut down and stop receiving more items. In Python,
    # ({ } is { }) == False.
    complete_sentinel = {}

    task_q = asyncio.Queue(limit)
    result_q = asyncio.Queue()
    # Why do we need a semaphore, when the .put and .get methods on the queue
    # already block? We want to not pull the next task from tasks until we have
    # a consumer ready to run it immediately.
    #
    # This semaphore tracks the number of consumers waiting to get tasks from
    # the queue. The producer acquires (decrement) the semaphore immediately
    # before fetching a new task from the tasks generator. Each consumer
    # releases (increments) the semaphore immediately before it waits to .get()
    # a task from the queue. We initialize it to zero because at this point, no
    # consumers have been created.
    consumer_sema = asyncio.Semaphore(0)

    async def producer():
        # Fetch tasks and put them on the queue. Blocks when the queue hits
        # limit, which limits the rate at which we fetch tasks, and also ensures
        # that we do not fetch all tasks at once.
        await consumer_sema.acquire()
        async for coroutine in tasks:
            await task_q.put(coroutine)
            await consumer_sema.acquire()  # For the next iteration

        # Each consumer will receive a sentinel and shut down.
        for _ in range(limit):
            await task_q.put(complete_sentinel)

    async def consumer():
        # We create _limit_ consumers, which limits how many tasks run
        # concurrently.
        while True:
            consumer_sema.release()
            coroutine = await task_q.get()
            if coroutine is complete_sentinel:
                break
            fut = asyncio.Future()
            try:
                fut.set_result(await coroutine)
            except Exception as exn:
                fut.set_exception(exn)
            await result_q.put(fut)

    async def gen_results():
        while True:
            r = await result_q.get()
            if r is complete_sentinel:
                break
            yield r

    # Main body
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer())
        workers = [tg.create_task(consumer()) for _ in range(limit)]

        # Send a sentinel to shut down gen_results after all workers finish.
        async def send_complete():
            await asyncio.gather(*workers)
            await result_q.put(complete_sentinel)

        tg.create_task(send_complete())

        async for item in gen_results():
            yield item

abstractions.iterables

This module contains abstractions for iterables.

batches(iterable: Iterable[T], *, batch_size: int, epochs: int) -> Iterator[List[T]]

Yields batches of items, for the given number of epochs.

The final item may not be of length `batch_size. At the epoch boundary, a batch may have items from two successive epochs.

Source code in src/abstractions/iterables.py
def batches(
    iterable: Iterable[T], *, batch_size: int, epochs: int
) -> Iterator[List[T]]:
    """
    Yields batches of items, for the given number of epochs.

    The final item may not be of length `batch_size. At the epoch boundary,
    a batch may have items from two successive epochs.
    """
    if batch_size <= 0:
        raise ValueError("batch_size must be greater than 0")
    if epochs <= 0:
        raise ValueError("epochs must be greater than 0")

    batch = []
    dataset_len = None
    for i in range(epochs):
        for j, item in enumerate(iterable):
            batch.append(item)
            if len(batch) == batch_size:
                yield batch
                batch = []
        if dataset_len is None:
            dataset_len = j + 1
        elif j + 1 < dataset_len:
            raise ValueError(
                f"First epoch had {dataset_len} items, but second epoch had {j + 1} items"
            )

    if batch:
        yield batch

abstractions.storage

This module contains abstractions for data storage.

create_or_resume_jsonl_file(file_name: Path, key_name: str, key_count: int, key_generator: Iterator[str], value_generator: RowGenerator, *, on_error: OnError) async

An abstraction to help persist generated data to a JSONL file that supports resuming from an interrupted run.

The goal is to produce a JSONL file where each line has the shape:

{ key_name: value, ... }

And each value appears exactly key_count times. To use this function, the caller must be able to generate the list of expected keys with key_generator, and then produce each row with value_generator.

The value_generator receives a list of (value, count) tuples, and must produce a row with the shape { key_name: value, ... } exactly count times.

Source code in src/abstractions/storage.py
async def create_or_resume_jsonl_file(
    file_name: Path,
    key_name: str,
    key_count: int,
    key_generator: Iterator[str],
    value_generator: RowGenerator,
    *,
    on_error: OnError,
):
    """
    An abstraction to help persist generated data to a JSONL file that supports
    resuming from an interrupted run.

    The goal is to produce a JSONL file where each line has the shape:

    ```
    { key_name: value, ... }
    ```

    And each `value` appears exactly `key_count` times. To use this function,
    the caller must be able to generate the list of expected keys with
    `key_generator`, and then produce each row with `value_generator`.

    The `value_generator` receives a list of `(value, count)` tuples, and must
    produce a row with the shape `{ key_name: value, ... }` exactly `count` times.
    """
    if not file_name.exists():
        # Handle the trivial case with trivial code.
        with file_name.open("wt") as f:
            all_values = [(k, key_count) for k in key_generator]
            async for value in value_generator(all_values):
                json.dump(await value, f)
                f.write("\n")
                f.flush()
        return

    # Pass through the file: we compute how many keys need to be generated.
    values_needed = {k: key_count for k in key_generator}
    with file_name.open("rt") as f:
        for line in f:
            data = json.loads(line)
            this_value = data[key_name]
            if this_value not in values_needed:
                _error(
                    on_error,
                    f"{file_name} has {this_value}, but key_generator does not",
                )
                continue

            this_value_count = values_needed[this_value]
            if this_value_count == 0:
                _error(
                    on_error,
                    f"{file_name} has more entries for {this_value} than key_generator demands",
                )
                continue

            values_needed[this_value] = values_needed[this_value] - 1

    # Not significant, but note that all keys_needed may map to 0, in which case
    # the loop below will be trivial.
    with file_name.open("at") as f:
        all_values = [(k, n) for k, n in values_needed.items() if n > 0]
        async for value in value_generator(all_values):
            json.dump(await value, f)
            f.write("\n")
            f.flush()

map_by_key_jsonl_file(src: Path, dst: Path, f: Callable[[dict], Awaitable[dict]], *, key: str, num_concurrent: int, keep_columns: List[str], on_error: OnError, progress: Optional[Callable[[], None]] = None) async

Apply an async transformation to exactly one representative row from each equivalence class in src (rows that share key), writing a row for every line in src to dst.

Parameters
  • src, dst : Path: source and destination JSONL files.
  • f : Callable[[dict], Awaitable[dict]]: async function invoked once per distinct value of key.
  • key : str: column whose value defines the equivalence classes.
  • num_concurrent : int: maximum number of concurrent invocations of f.
  • keep_columns : List[str]: columns to copy verbatim from src to each output row.
  • on_error : Literal["print", "raise"]: how to handle inconsistencies while resuming.
  • progress: an optional function that is is called after each row is processed, even if f does not receive the row. You can use this to display a progress bar.
Behaviour
  • The first time a key is encountered, that row is passed to f; its result is cached and duplicated for every row in the same class.

  • If dst already exists, rows are read to determine which keys are complete so the computation can resume without repeating work.

Example

Consider the following input file:

{ "key": 10, "other": "A", "discarded": "X" }
{ "key": 20, "other": "B", "discarded": "Y" }
{ "key": 10, "other": "C", "discarded": "Z" }

Consider the following application (inside an async context):

async def compute(row):
    return { "result": row["key"] + 1, "key": row["key"] }

await map_by_key_jsonl_file(
    src,
    dst,
    f=compute,
    key="key",
    keep_columns=["other"],
    on_error="raise",
    num_concurrent=1,
)

Because the file contains two rows whose key is 10 and one whose key is 20, f is called twice: once with the row with key 10 and once with the row with key 20.

The output file dst will be a permutation of:

{ "key": 10, "result": 11, "other": "A" }
{ "key": 20, "result": 21, "other": "B" }
{ "key": 10, "result": 11, "other": "C" }

It omits the discarded column. We always emit the key column. We emit the column other because it was specified in the keep_columns argument. Finally, we include all the columns from f's output.

Source code in src/abstractions/storage.py
async def map_by_key_jsonl_file(
    src: Path,
    dst: Path,
    f: Callable[[dict], Awaitable[dict]],
    *,
    key: str,
    num_concurrent: int,
    keep_columns: List[str],
    on_error: OnError,
    progress: Optional[Callable[[],None]] = None,
):
    """
    Apply an async transformation to exactly one representative row from each
    equivalence class in *src* (rows that share *key*), writing a row for every
    line in *src* to *dst*.

    ### Parameters

     - `src, dst : Path`: source and destination JSONL files.
     - `f : Callable[[dict], Awaitable[dict]]`: async function invoked once per distinct value of *key*.
     - `key : str`: column whose value defines the equivalence classes.
     - `num_concurrent : int`: maximum number of concurrent invocations of *f*.
     - `keep_columns : List[str]`: columns to copy verbatim from *src* to each output row.
     - `on_error : Literal["print", "raise"]`: how to handle inconsistencies while resuming.
     - `progress`: an optional function that is is called after each row is processed, even if *f* does not receive
       the row. You can use this to display a progress bar.

    ### Behaviour

    - The first time a key is encountered, that row is passed to *f*; its result
       is cached and duplicated for every row in the same class.

    - If *dst* already exists, rows are read to determine which keys are
      complete so the computation can resume without repeating work.

    ### Example

     Consider the following input file:

     ```jsonl
     { "key": 10, "other": "A", "discarded": "X" }
     { "key": 20, "other": "B", "discarded": "Y" }
     { "key": 10, "other": "C", "discarded": "Z" }
     ```

     Consider the following application (inside an async context):

     ```python
     async def compute(row):
         return { "result": row["key"] + 1, "key": row["key"] }

     await map_by_key_jsonl_file(
         src,
         dst,
         f=compute,
         key="key",
         keep_columns=["other"],
         on_error="raise",
         num_concurrent=1,
     )
     ```

     Because the file contains two rows whose key is 10 and one whose key is 20,
     *f* is called twice: once with the row with *key* 10 and once with the row
     with *key* 20.

     The output file *dst* will be a permutation of:

     ```jsonl
     { "key": 10, "result": 11, "other": "A" }
     { "key": 20, "result": 21, "other": "B" }
     { "key": 10, "result": 11, "other": "C" }
     ```

     It omits the *discarded* column. We always emit the *key* column. We
     emit the column *other* because it was specified in the *keep_columns*
     argument. Finally, we include all the columns from *f*'s output.
    """

    # The assumption is that src may be enormous, and we don't want to read
    # all of it into memory.
    MAX_SRC_LINES_TO_HOLD_IN_MEMORY = 100

    f_args_buffer = asyncio.Queue(MAX_SRC_LINES_TO_HOLD_IN_MEMORY)

    # f_results[key] is a future that will hold the result of applying f to the
    # representative row for key. We will eventually hold all results from f
    # in memory. So, f should return a compact result.
    f_results: Dict[str, asyncio.Future[dict]] = {}

    dst_rows_buffer = asyncio.Queue()

    async def read_src_proc(skip_rows: int):
        with src.open("rt") as f:
            for line_num, line in enumerate(f):
                if line_num < skip_rows:
                    continue
                row = json.loads(line)
                row_key = row[key]

                # If row_key not in f_results, this is the representative row.
                # So, we add it to f_args_buffer.
                if row_key not in f_results:
                    f_results[row_key] = asyncio.Future()
                    # The await below stops us from keeping too many full rows
                    # in memory if f is slow.
                    await f_args_buffer.put(row)

                if progress is not None:
                    progress()

                partial_dst_row = {k: row[k] for k in keep_columns}
                partial_dst_row[key] = row_key
                # The await below stops us from keeping too many output rows
                # in memory if f is slow.
                await dst_rows_buffer.put(partial_dst_row)
            # The Nones below signal end of input to the other processes.
            await dst_rows_buffer.put(None)
            for _ in range(num_concurrent):
                await f_args_buffer.put(None)

    async def write_dst_proc():
        with dst.open("at") as dst_f:
            while True:
                # It is partial because we don't know the result of f yet.
                partial_dst_row = await dst_rows_buffer.get()
                if partial_dst_row is None:
                    break
                try:
                    f_result = await f_results[partial_dst_row[key]]
                except:
                    # If f had failed, we skip it on output. We would have either
                    # printed a warning once, or raised an exception earlier that
                    # would have aborted the whole task group.
                    continue

                dst_row = {**partial_dst_row, **f_result}
                json.dump(dst_row, dst_f)
                dst_f.write("\n")
                dst_f.flush()

    async def apply_f_proc():
        while True:
            row = await f_args_buffer.get()
            if row is None:
                break
            row_key = row[key]
            f_slot = f_results[row_key]
            try:
                result = await f(row)
                f_slot.set_result(result)
            except Exception as e:
                f_slot.set_exception(e)
                _error(on_error, f"Error applying f to {row}: {e}")

    def initialize_f_results(dst_f):
        skip_rows = 0
        for line in dst_f:
            skip_rows = skip_rows + 1
            row = json.loads(line)
            row_key = row[key]
            if row_key not in f_results:
                fut = asyncio.Future()
                fut.set_result(
                    {
                        k: row[k]
                        for k in row.keys()
                        if k != key and k not in keep_columns
                    }
                )
                f_results[row_key] = fut
            if progress is not None:
                progress()
        return skip_rows

    async with asyncio.TaskGroup() as tg:
        if dst.exists():
            with dst.open("rt") as dst_f:
                skip_rows = initialize_f_results(dst_f)
        else:
            skip_rows = 0

        tg.create_task(read_src_proc(skip_rows))
        tg.create_task(write_dst_proc())
        for _ in range(num_concurrent):
            tg.create_task(apply_f_proc())

run_bounded_create_or_resume_jsonl_file(file_name: Path, key_name: str, key_count: int, key_generator: Iterator[str], value_generator: RowGenerator, *, limit: int, on_error: OnError) async

Encapsulates the boilerplate needed to compose create_or_resume_jsonl_file with run_bounded.

Source code in src/abstractions/storage.py
async def run_bounded_create_or_resume_jsonl_file(
    file_name: Path,
    key_name: str,
    key_count: int,
    key_generator: Iterator[str],
    value_generator: RowGenerator,
    *,
    limit: int,
    on_error: OnError,
):
    """
    Encapsulates the boilerplate needed to compose `create_or_resume_jsonl_file`
    with `run_bounded`.
    """

    async def parallel_value_generator(
        new_value_counts: List[Tuple[str, int]],
    ) -> AsyncIterator[Awaitable[dict]]:
        async for value in run_bounded(value_generator(new_value_counts), limit=limit):
            yield value

    await create_or_resume_jsonl_file(
        file_name=file_name,
        key_name=key_name,
        key_count=key_count,
        key_generator=key_generator,
        value_generator=parallel_value_generator,
        on_error=on_error,
    )