Abstractions
abstractions.async_abstractions
This module contains abstractions for async programming.
run_bounded(tasks: AsyncIterator[Awaitable[T]], limit: int) -> AsyncIterator[Awaitable[T]]
async
There are many situations where we want to run lots of tasks with a bounded amount of concurrency. For example, we may want to make thousands of requests to an API, but not not run more than a few requests concurrently. We may also want to run thousands of containerized jobs on a machine that has multiple cores, but not run too many containers concurrently.
The simplest way to achieve this is with a semaphore:
sema = asyncio.Semaphore(limit)
async def the_task(t):
async with semaphore:
# Do the task.
asyncio.gather(*(the_task(t) for t in tasks))
However, this approach does not address another problem that often occurs in these situations, which is that preparing tasks can take a lot of time and memory. For example, if each task is sourced from a file, and the file has thousands of lines, we are forced to read the entire file into memory before we can start running tasks.
The run_bounded function addresses this problem. You can use it to
write code such as the following:
async def task(line):
# Do the task.
async def task_gen():
with open("tasks.txt") as f:
for line in f:
yield task(line)
async for result in run_bounded(task_gen(), limit=10):
print(result)
The code above will run 10 tasks concurrently and also stream the file line-by-line. The results will be out of order.
Source code in src/abstractions/async_abstractions.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | |
abstractions.iterables
This module contains abstractions for iterables.
batches(iterable: Iterable[T], *, batch_size: int, epochs: int) -> Iterator[List[T]]
Yields batches of items, for the given number of epochs.
The final item may not be of length `batch_size. At the epoch boundary, a batch may have items from two successive epochs.
Source code in src/abstractions/iterables.py
recv_dict_vec(out_keys: List[str], f: Callable)
The purpose of this abstraction is to make it easier to process batches of items with the Hugging Face datasets library.
This function wraps f as follows. It receives a dictionary whose items are
lists, e.g.,:
We assume that the dictionary has at least one key.
It calls f on each item:
f({"key_1": v_11, "key_2": v_21})
f({"key_1": v_12, "key_2": v_22})
f({"key_1": v_13, "key_2": v_23})
...
f must optionally return a dictionary with exactly the keys in out_keys.
The wrapped function will accumulate all results from f into a dictionary
of lists, similar to the input.
This means that you can do this:
import datasets
the_dataset = datasets.load_dataset("openai/openai_humaneval")
def f(item):
if len(item["canonical_solution"]) > 50:
return None
return {
"canonical_solution": item["canonical_solution"],
"prompt": item["prompt"],
}
the_dataset = the_dataset.map(
recv_dict_vec(["prompt", "canonical_solution"], f),
remove_columns=the_dataset["test"].column_names,
batched=True,
batch_size=10
num_proc=2
)
Note that you must remove the original columns if f filters out any
items. f may return a column that is removed. finally, note that
that the .column_names property of a DatasetDict returns a dictionary
of column names, which is why the code above uses
the_dataset["test"].column_names.
Source code in src/abstractions/iterables.py
abstractions.storage
This module contains abstractions for data storage.
disk_cache(path: str | Path)
Cache variables assigned inside the with block to disk using pickle.
Example:
Source code in src/abstractions/storage.py
create_or_resume_jsonl_file(file_name: Path, key_name: str, key_count: int, key_generator: Iterator[str], value_generator: RowGenerator, *, on_error: OnError)
async
An abstraction to help persist generated data to a JSONL file that supports resuming from an interrupted run.
The goal is to produce a JSONL file where each line has the shape:
And each value appears exactly key_count times. To use this function,
the caller must be able to generate the list of expected keys with
key_generator, and then produce each row with value_generator.
The value_generator receives a list of (value, count) tuples, and must
produce a row with the shape { key_name: value, ... } exactly count times.
Source code in src/abstractions/storage.py
flatmap_by_key_jsonl_file(src: Path, dst: Path, f: Callable[[dict], Awaitable[List[dict]]], *, key: str, num_concurrent: int, keep_columns: List[str], on_error: OnError, progress: Optional[Callable[[bool], None]] = None)
async
Apply an async transformation to each row in src, where the transformation can produce multiple output rows per input row.
IMPORTANT: Keys must be unique in src. Each row must have a distinct value for key.
Parameters
src, dst : Path: source and destination JSONL files.f : Callable[[dict], Awaitable[List[dict]]]: async function invoked once per row. Returns a list of dicts.key : str: column that uniquely identifies each row in src.num_concurrent : int: maximum number of concurrent invocations of f.keep_columns : List[str]: columns to copy verbatim from src to each output row.on_error : Literal["print", "raise"]: how to handle errors from f.progress: an optional function that is called after each output row is written to dst (or once per input row on failure). The function receives a boolean indicating success (True) or failure (False). During initialization (when resuming), it is called with True for each existing row in dst.
Behaviour
- Each row in src is passed to f, which returns a list of dicts.
- For each dict in the list, an output row is written combining the kept columns with the dict from f.
- If f returns an empty list, no output rows are written for that input row.
- If dst already exists, rows are read to determine which keys are complete so the computation can resume without repeating work.
Example
Consider the following input file:
Consider the following application (inside an async context):
async def compute(row):
# Returns multiple results per row
return [
{ "result": row["key"] + 1, "key": row["key"] },
{ "result": row["key"] + 2, "key": row["key"] },
]
await flatmap_by_key_jsonl_file(
src,
dst,
f=compute,
key="key",
keep_columns=["other"],
on_error="raise",
num_concurrent=1,
)
The output file dst will be a permutation of:
{ "key": 10, "result": 11, "other": "A" }
{ "key": 10, "result": 12, "other": "A" }
{ "key": 20, "result": 21, "other": "B" }
{ "key": 20, "result": 22, "other": "B" }
Source code in src/abstractions/storage.py
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 | |
map_by_key_jsonl_file(src: Path, dst: Path, f: Callable[[dict], Awaitable[dict]], *, key: str, num_concurrent: int, keep_columns: List[str], on_error: OnError, progress: Optional[Callable[[bool], None]] = None)
async
Apply an async transformation to exactly one representative row from each equivalence class in src (rows that share key), writing a row for every line in src to dst.
Parameters
src, dst : Path: source and destination JSONL files.f : Callable[[dict], Awaitable[dict]]: async function invoked once per distinct value of key.key : str: column whose value defines the equivalence classes.num_concurrent : int: maximum number of concurrent invocations of f.keep_columns : List[str]: columns to copy verbatim from src to each output row.on_error : Literal["print", "raise"]: how to handle inconsistencies while resuming.progress: an optional function that is called after each row is written to dst (or skipped due to failure). The function receives a boolean indicating success (True) or failure (False). You can use this to display a progress bar. During initialization (when resuming), it is called with True for each existing row in dst.
Behaviour
-
The first time a key is encountered, that row is passed to f; its result is cached and duplicated for every row in the same class.
-
If dst already exists, rows are read to determine which keys are complete so the computation can resume without repeating work.
Example
Consider the following input file:
{ "key": 10, "other": "A", "discarded": "X" }
{ "key": 20, "other": "B", "discarded": "Y" }
{ "key": 10, "other": "C", "discarded": "Z" }
Consider the following application (inside an async context):
async def compute(row):
return { "result": row["key"] + 1, "key": row["key"] }
await map_by_key_jsonl_file(
src,
dst,
f=compute,
key="key",
keep_columns=["other"],
on_error="raise",
num_concurrent=1,
)
Because the file contains two rows whose key is 10 and one whose key is 20, f is called twice: once with the row with key 10 and once with the row with key 20.
The output file dst will be a permutation of:
{ "key": 10, "result": 11, "other": "A" }
{ "key": 20, "result": 21, "other": "B" }
{ "key": 10, "result": 11, "other": "C" }
It omits the discarded column. We always emit the key column. We emit the column other because it was specified in the keep_columns argument. Finally, we include all the columns from f's output.
Source code in src/abstractions/storage.py
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 | |
run_bounded_create_or_resume_jsonl_file(file_name: Path, key_name: str, key_count: int, key_generator: Iterator[str], value_generator: RowGenerator, *, limit: int, on_error: OnError)
async
Encapsulates the boilerplate needed to compose create_or_resume_jsonl_file
with run_bounded.