The Async\TaskGroup Class

(PHP 8.6+, True Async 1.0)

Introduction

When working with coroutines, you often need to launch several tasks and wait for their results. Using spawn() and await() directly, the developer takes responsibility for ensuring that every coroutine is either awaited or cancelled. A forgotten coroutine keeps running, an unhandled error is lost, and cancelling a group of tasks requires manual code.

The await_all() and await_any() functions don’t account for logical relationships between different tasks. For example, when you need to make several requests, take the first result, and cancel the rest, await_any() requires additional code from the programmer to cancel the remaining tasks. Such code can be quite complex, so await_all() and await_any() should be considered anti-patterns in this situation.

Using Scope for this purpose is not suitable, since task coroutines may create other child coroutines, which requires the programmer to maintain a list of task coroutines and track them separately.

TaskGroup solves all these problems. It is a high-level structured concurrency pattern that guarantees: all tasks will be properly awaited or cancelled. It logically groups tasks and allows operating on them as a single unit.

Waiting Strategies

TaskGroup provides several strategies for waiting on results. Each returns a Future, which allows passing a timeout: ->await(Async\timeout(5.0)).

Concurrency Limit

When the concurrency parameter is specified, TaskGroup works as a coroutine pool: tasks exceeding the limit wait in a queue and don’t create a coroutine until a free slot appears. This saves memory and controls load when processing a large number of tasks.

TaskGroup and Scope

TaskGroup uses Scope for managing the lifecycle of task coroutines. When creating a TaskGroup, you can pass an existing Scope or let TaskGroup create a child Scope from the current one. All tasks added to TaskGroup execute inside this Scope. This means that when TaskGroup is cancelled or destroyed, all coroutines will be automatically cancelled, ensuring safe resource management and preventing leaks.

Sealing and Iteration

TaskGroup allows adding tasks dynamically, until it is sealed using the seal() method.

The all() method returns a Future that triggers when all existing tasks in the queue are completed. This allows using TaskGroup in a loop, where tasks are added dynamically, and all() is called to get results of the current set of tasks.

TaskGroup also supports foreach for iterating over results as they become ready. In this case, seal() must be called after adding all tasks to signal that there will be no new tasks, and foreach can finish after processing all results.

Class Overview

final class Async\TaskGroup implements Async\Awaitable, Countable, IteratorAggregate {

    /* Methods */
    public __construct(?int $concurrency = null, ?Async\Scope $scope = null)

    /* Adding tasks */
    public spawn(callable $task, mixed ...$args): void
    public spawnWithKey(string|int $key, callable $task, mixed ...$args): void

    /* Waiting for results */
    public all(bool $ignoreErrors = false): Async\Future
    public race(): Async\Future
    public any(): Async\Future
    public awaitCompletion(): void

    /* Lifecycle */
    public seal(): void
    public cancel(?Async\AsyncCancellation $cancellation = null): void
    public dispose(): void
    public finally(Closure $callback): void

    /* State */
    public isFinished(): bool
    public isSealed(): bool
    public count(): int

    /* Results and errors */
    public getResults(): array
    public getErrors(): array
    public suppressErrors(): void

    /* Iteration */
    public getIterator(): Iterator
}

Examples

all() – Parallel Data Loading

The most common scenario – loading data from multiple sources simultaneously:

$group = new Async\TaskGroup();

$group->spawnWithKey('user',    fn() => $db->query('SELECT * FROM users WHERE id = ?', [$id]));
$group->spawnWithKey('orders',  fn() => $db->query('SELECT * FROM orders WHERE user_id = ?', [$id]));
$group->spawnWithKey('reviews', fn() => $api->get("/users/{$id}/reviews"));

$data = $group->all()->await();
// ['user' => ..., 'orders' => [...], 'reviews' => [...]]

return new UserProfile($data['user'], $data['orders'], $data['reviews']);

All three requests execute in parallel. If any of them throws an exception, all() returns a Future that rejects with CompositeException.

race() – Hedged Requests

The “hedged request” pattern – send the same request to multiple replicas and take the first response. This reduces latency with slow or overloaded servers:

$replicas = ['db-replica-1', 'db-replica-2', 'db-replica-3'];

$group = new Async\TaskGroup();

foreach ($replicas as $host) {
    $group->spawn(fn() => pg_query($host, 'SELECT * FROM products WHERE id = 42'));
}

// First response is the result, other tasks continue running
$product = $group->race()->await();

Query multiple providers, take the first successful response, ignoring errors:

$group = new Async\TaskGroup();

$group->spawn(fn() => searchGoogle($query));
$group->spawn(fn() => searchBing($query));
$group->spawn(fn() => searchDuckDuckGo($query));

// any() ignores providers that failed and returns the first successful result
$results = $group->any()->await();

// Errors from failed providers must be explicitly handled, otherwise the destructor will throw an exception
$group->suppressErrors();

If all providers failed, any() will throw CompositeException with all errors.

Concurrency Limit – Processing a Queue

Process 10,000 tasks, but no more than 50 simultaneously:

$group = new Async\TaskGroup(concurrency: 50);

foreach ($urls as $url) {
    $group->spawn(fn() => httpClient()->get($url)->getBody());
}

$results = $group->all()->await();

TaskGroup automatically queues tasks. A coroutine is created only when a free slot appears, saving memory with large volumes of tasks.

Iterating Over Results as They Complete

Process results without waiting for all tasks to finish:

$group = new Async\TaskGroup();

foreach ($imageFiles as $file) {
    $group->spawn(fn() => processImage($file));
}

$group->seal();

foreach ($group as $key => $result) {
    // Results arrive as they become ready, not in the order they were added
    saveToStorage($result);
}

Timeout for a Task Group

Limit the waiting time for results:

$group = new Async\TaskGroup();

$group->spawn(fn() => slowApi()->fetchReport());
$group->spawn(fn() => anotherApi()->fetchStats());
$group->seal();

try {
    $results = $group->all()->await(Async\timeout(5.0));
} catch (Async\TimeoutException) {
    echo "Failed to get data within 5 seconds";
}

Analogues in Other Languages

Capability PHP TaskGroup Python asyncio.TaskGroup Java StructuredTaskScope Kotlin coroutineScope
Structured concurrency seal() + all()->await() async with block try-with-resources + join() Automatically via scope
Waiting strategies all(), race(), any() -> Future Only all (via async with) ShutdownOnSuccess, ShutdownOnFailure async/await, select
Concurrency limit concurrency: N No (need Semaphore) No No (need Semaphore)
Result iteration foreach as they complete No No Channel
Error handling CompositeException, getErrors() ExceptionGroup throwIfFailed() Exception cancels scope

PHP TaskGroup combines capabilities that in other languages are spread across multiple primitives: concurrency limiting without a semaphore, multiple waiting strategies in a single object, and result iteration as they complete.

Contents