Async Primitives for TypeScript
A collection of async primitives for TypeScript. Queue, Mutex, AsyncStream, delay, throttle, and more.
I've built a collection of async utilities for TypeScript: @temelj/async.
Here's an overview of what's included.
Time and Delays
delay resolves after a specified duration:
import { delay } from "@temelj/async";
await delay(100); // Resolves after 100ms
// Cancel with AbortSignal
const controller = new AbortController();
delay(1000, { signal: controller.signal });
controller.abort(); // Rejects with AbortErrortimeout races a promise against a timer:
import { timeout, TimeoutError } from "@temelj/async";
try {
const result = await timeout(fetchData(), 5000);
} catch (e) {
if (e instanceof TimeoutError) {
console.log("Request timed out");
}
}
// Or use a fallback value instead of throwing
const result = await timeout(slowOperation(), 1000, {
fallback: "default",
});wait polls until a predicate returns true:
import { wait } from "@temelj/async";
await wait(() => document.readyState === "complete", { interval: 100, timeout: 5000 });Deferred Promises
Sometimes you need a promise with externally controlled resolution.
defer creates one:
import { defer } from "@temelj/async";
const { promise, resolve, reject } = defer<number>();
// Resolve later
resolve(42);
// Or reject
reject(new Error("failed"));
const result = await promise;This is useful when building custom async primitives or integrating with event-based APIs.
Concurrency Control
limit wraps a function to enforce concurrency limit:
import { limit } from "@temelj/async";
const processItem = limit(async (item: Item) => {
return await process(item);
}, 2); // Max 2 concurrent
const results = await Promise.all(items.map(processItem));Mutex ensures only one task accesses a resource at a time:
import { Mutex } from "@temelj/async";
const mutex = new Mutex();
async function updateSharedResource() {
const release = await mutex.acquire();
try {
// Exclusive access
await modifyResource();
} finally {
release();
}
}
// Or use runExclusive for convenience
const result = await mutex.runExclusive(async () => {
return computeValue();
});Barrier waits until a specified number of tasks have reached it:
import { Barrier } from "@temelj/async";
const barrier = new Barrier(3); // Wait for 3 tasks
// In 3 different async contexts
await barrier.wait();
// All three continue here simultaneouslyQueue is a promise/task queue with priority and concurrency:
import { Queue } from "@temelj/async";
const q = new Queue({ concurrency: 2 });
q.add(
async () => {
await processItem(1);
},
// High priority
{ priority: 10 },
);
q.add(
async () => {
await processItem(2);
},
// Low priority
{ priority: 0 },
);
await q.onIdle; // Wait for all tasks to completeIteration
map transforms items concurrently with configurable concurrency:
import { map, Skip } from "@temelj/async";
// Transform with concurrency limit
const results = await map(
items,
async (item) => {
return transform(item);
},
{ concurrency: 3 },
);
// Skip items using the Skip symbol
const filtered = await map([1, 2, 3, 4, 5], (x) => {
if (x % 2 === 0) return Skip;
return x * 2;
}); // [2, 6, 10]
// Handle errors without stopping
await map(
items,
async (item) => {
if (item.fails) {
throw new Error("oops");
}
return item.value;
},
{ stopOnError: false },
); // Rejects with AggregateErrorreduce processes items serially:
import { reduce } from "@temelj/async";
const sum = await reduce(
[1, 2, 3, 4],
async (acc, item) => {
return acc + item;
},
0,
); // 10Rate limiting
debounce delays execution until calls stop:
import { debounce } from "@temelj/async";
const debouncedSearch = debounce(async (query: string) => {
return await fetchResults(query);
}, 300);
// Only the last call within 300ms executes
search("h");
search("he");
search("hel");
search("hell");
search("hello"); // This one runs after 300ms
// Leading edge option
const leading = debounce(fn, 300, { leading: true, trailing: false });throttle limits execution to once per interval:
import { throttle } from "@temelj/async";
const throttledSave = throttle(async (data: Data) => {
await save(data);
}, 1000);
// First call executes immediately
save(data1);
// Subsequent calls within 1s return the same promise
save(data2);
save(data3); // Returns immediately with same promise as data2Retry
retry handles transient failures with configurable backoff:
import { retry } from "@temelj/async";
const result = await retry(
async (attempt) => {
return await unstableOperation();
},
{
times: 3, // Retry up to 3 times
delay: 100, // 100ms between attempts
shouldRetry: (e) => e instanceof TransientError,
},
);
// Exponential backoff with function
await retry(fn, {
times: 3,
delay: (attempt) => Math.pow(2, attempt) * 100, // 100, 200, 400ms
});Stream
AsyncStream provides a fluent API for async pipelines:
import { AsyncStream } from "@temelj/async";
const results = await AsyncStream.from(items)
.map(async (x) => x * 2)
.filter(async (x) => x > 5)
.toArray();
// Works with async iterables
async function* generate() {
yield 1;
yield 2;
yield 3;
}
const result = await AsyncStream.from(generate())
.map((x) => x * 10)
.toArray(); // [10, 20, 30]
// With concurrency
await AsyncStream.from(items)
.map(async (x) => process(x), { concurrency: 3 })
.drain(); // Execute without collecting resultsCommon Options
Most functions accept a standard set of options:
import { map } from "@temelj/async";
await map(items, fn, {
signal: abortController.signal, // Cancel operation
concurrency: 3, // For map, limit concurrency
stopOnError: false, // Continue on errors, collect them
});Conclusion
Add it to your project:
# pnpm
pnpm add @temelj/async
# npm
npm install @temelj/async
# JSR (Deno)
deno add jsr:@temelj/asyncThe package is part of Temelj, a core library for TypeScript.