Abstractions
abstractions.async_abstractions
This module contains abstractions for async programming.
run_bounded(tasks: AsyncIterator[Awaitable[T]], limit: int) -> AsyncIterator[Awaitable[T]]
async
There are many situations where we want to run lots of tasks with a bounded amount of concurrency. For example, we may want to make thousands of requests to an API, but not not run more than a few requests concurrently. We may also want to run thousands of containerized jobs on a machine that has multiple cores, but not run too many containers concurrently.
The simplest way to achieve this is with a semaphore:
sema = asyncio.Semaphore(limit)
async def the_task(t):
async with semaphore:
# Do the task.
asyncio.gather(*(the_task(t) for t in tasks))
However, this approach does not address another problem that often occurs in these situations, which is that preparing tasks can take a lot of time and memory. For example, if each task is sourced from a file, and the file has thousands of lines, we are forced to read the entire file into memory before we can start running tasks.
The run_bounded
function addresses this problem. You can use it to
write code such as the following:
async def task(line):
# Do the task.
async def task_gen():
with open("tasks.txt") as f:
for line in f:
yield task(line)
async for result in run_bounded(task_gen(), limit=10):
print(result)
The code above will run 10 tasks concurrently and also stream the file line-by-line. The results will be out of order.
Source code in src/abstractions/async_abstractions.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
|
abstractions.iterables
This module contains abstractions for iterables.
batches(iterable: Iterable[T], *, batch_size: int, epochs: int) -> Iterator[List[T]]
Yields batches of items, for the given number of epochs.
The final item may not be of length `batch_size. At the epoch boundary, a batch may have items from two successive epochs.
Source code in src/abstractions/iterables.py
abstractions.storage
This module contains abstractions for data storage.
create_or_resume_jsonl_file(file_name: Path, key_name: str, key_count: int, key_generator: Iterator[str], value_generator: RowGenerator, *, on_error: OnError)
async
An abstraction to help persist generated data to a JSONL file that supports resuming from an interrupted run.
The goal is to produce a JSONL file where each line has the shape:
And each value
appears exactly key_count
times. To use this function,
the caller must be able to generate the list of expected keys with
key_generator
, and then produce each row with value_generator
.
The value_generator
receives a list of (value, count)
tuples, and must
produce a row with the shape { key_name: value, ... }
exactly count
times.
Source code in src/abstractions/storage.py
map_by_key_jsonl_file(src: Path, dst: Path, f: Callable[[dict], Awaitable[dict]], *, key: str, num_concurrent: int, keep_columns: List[str], on_error: OnError, progress: Optional[Callable[[], None]] = None)
async
Apply an async transformation to exactly one representative row from each equivalence class in src (rows that share key), writing a row for every line in src to dst.
Parameters
src, dst : Path
: source and destination JSONL files.f : Callable[[dict], Awaitable[dict]]
: async function invoked once per distinct value of key.key : str
: column whose value defines the equivalence classes.num_concurrent : int
: maximum number of concurrent invocations of f.keep_columns : List[str]
: columns to copy verbatim from src to each output row.on_error : Literal["print", "raise"]
: how to handle inconsistencies while resuming.progress
: an optional function that is is called after each row is processed, even if f does not receive the row. You can use this to display a progress bar.
Behaviour
-
The first time a key is encountered, that row is passed to f; its result is cached and duplicated for every row in the same class.
-
If dst already exists, rows are read to determine which keys are complete so the computation can resume without repeating work.
Example
Consider the following input file:
{ "key": 10, "other": "A", "discarded": "X" }
{ "key": 20, "other": "B", "discarded": "Y" }
{ "key": 10, "other": "C", "discarded": "Z" }
Consider the following application (inside an async context):
async def compute(row):
return { "result": row["key"] + 1, "key": row["key"] }
await map_by_key_jsonl_file(
src,
dst,
f=compute,
key="key",
keep_columns=["other"],
on_error="raise",
num_concurrent=1,
)
Because the file contains two rows whose key is 10 and one whose key is 20, f is called twice: once with the row with key 10 and once with the row with key 20.
The output file dst will be a permutation of:
{ "key": 10, "result": 11, "other": "A" }
{ "key": 20, "result": 21, "other": "B" }
{ "key": 10, "result": 11, "other": "C" }
It omits the discarded column. We always emit the key column. We emit the column other because it was specified in the keep_columns argument. Finally, we include all the columns from f's output.
Source code in src/abstractions/storage.py
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
|
run_bounded_create_or_resume_jsonl_file(file_name: Path, key_name: str, key_count: int, key_generator: Iterator[str], value_generator: RowGenerator, *, limit: int, on_error: OnError)
async
Encapsulates the boilerplate needed to compose create_or_resume_jsonl_file
with run_bounded
.