The Async\TaskSet Class

(PHP 8.6+, True Async 1.0)

Introduction

TaskGroup is perfect for scenarios where the goal is the results, not the tasks themselves. However, there are many situations where you need to control the number of tasks while results are consumed as a stream.

Typical examples:

TaskSet is designed to solve these problems. It automatically removes completed tasks at the moment of result delivery via joinNext(), joinAll(), joinAny(), or foreach.

Differences from TaskGroup

Property TaskGroup TaskSet
Result storage All results until explicit request Removed after delivery
Repeated method calls Idempotent — same result Each call — next element
count() Total number of tasks Number of undelivered tasks
Waiting methods all(), race(), any() joinAll(), joinNext(), joinAny()
Iteration Entries remain Entries removed after foreach
Use case Fixed set of tasks Dynamic task stream

Idempotency vs Consumption

The key conceptual difference between TaskSet and TaskGroup.

TaskGroup is idempotent. Calls to race(), any(), all() always return the same result. Iteration via foreach always traverses all tasks. Results are stored in the group and available for repeated access:

$group = new Async\TaskGroup();

$group->spawn(fn() => "alpha");
$group->spawn(fn() => "beta");
$group->spawn(fn() => "gamma");
$group->seal();

// race() always returns the same first completed task
$first  = $group->race()->await(); // "alpha"
$same   = $group->race()->await(); // "alpha" — same result!

// all() always returns the full array
$all1 = $group->all()->await(); // ["alpha", "beta", "gamma"]
$all2 = $group->all()->await(); // ["alpha", "beta", "gamma"] — same array!

// foreach always traverses all elements
foreach ($group as $key => [$result, $error]) { /* 3 iterations */ }
foreach ($group as $key => [$result, $error]) { /* again 3 iterations */ }

echo $group->count(); // 3 — always 3

TaskSet is consuming. Each call to joinNext() / joinAny() extracts the next element and removes it from the set. A repeated foreach won’t find already delivered entries. This behavior is analogous to reading from a queue or channel:

$set = new Async\TaskSet();

$set->spawn(fn() => "alpha");
$set->spawn(fn() => "beta");
$set->spawn(fn() => "gamma");

// joinNext() returns the NEXT result each time
$first  = $set->joinNext()->await(); // "alpha"
$second = $set->joinNext()->await(); // "beta" — different result!
$third  = $set->joinNext()->await(); // "gamma"

echo $set->count(); // 0 — set is empty

// joinAll() after full consumption — empty array
$set->seal();
$rest = $set->joinAll()->await(); // [] — nothing to return

The same logic applies to iteration:

$set = new Async\TaskSet();

$set->spawn(fn() => "alpha");
$set->spawn(fn() => "beta");
$set->spawn(fn() => "gamma");
$set->seal();

// First foreach consumes all results
foreach ($set as $key => [$result, $error]) {
    echo "$result\n"; // "alpha", "beta", "gamma"
}

echo $set->count(); // 0

// Second foreach — empty, nothing to iterate
foreach ($set as $key => [$result, $error]) {
    echo "this won't execute\n";
}

Rule: if you need to access results repeatedly — use TaskGroup. If results are processed once and should free memory — use TaskSet.

Join Method Semantics

Unlike TaskGroup, where race() / any() / all() leave entries in the group, TaskSet uses methods with join semantics — result delivered, entry removed:

Automatic Cleanup

Auto-cleanup works at all result delivery points:

$set = new Async\TaskSet();

$set->spawn(fn() => "a");
$set->spawn(fn() => "b");
echo $set->count(); // 2

$set->joinNext()->await();
echo $set->count(); // 1

$set->joinNext()->await();
echo $set->count(); // 0

When iterating via foreach, each processed entry is removed immediately:

$set = new Async\TaskSet();

foreach ($urls as $url) {
    $set->spawn(fn() => fetch($url));
}
$set->seal();

foreach ($set as $key => [$result, $error]) {
    // $set->count() decreases with each iteration
    process($result);
}

Concurrency Limit

Like TaskGroup, TaskSet supports concurrency limiting:

$set = new Async\TaskSet(concurrency: 10);

foreach ($tasks as $task) {
    $set->spawn(fn() => processTask($task));
}

Tasks exceeding the limit are queued and started when a slot becomes available.

Class Synopsis

final class Async\TaskSet implements Async\Awaitable, Countable, IteratorAggregate {

    /* Methods */
    public __construct(?int $concurrency = null, ?Async\Scope $scope = null)

    /* Adding tasks */
    public spawn(callable $task, mixed ...$args): void
    public spawnWithKey(string|int $key, callable $task, mixed ...$args): void

    /* Waiting for results (with auto-cleanup) */
    public joinNext(): Async\Future
    public joinAny(): Async\Future
    public joinAll(bool $ignoreErrors = false): Async\Future

    /* Lifecycle */
    public seal(): void
    public cancel(?Async\AsyncCancellation $cancellation = null): void
    public dispose(): void
    public finally(Closure $callback): void

    /* State */
    public isFinished(): bool
    public isSealed(): bool
    public count(): int

    /* Awaiting completion */
    public awaitCompletion(): void

    /* Iteration (with auto-cleanup) */
    public getIterator(): Iterator
}

Examples

joinAll() — parallel loading with auto-cleanup

$set = new Async\TaskSet();

$set->spawnWithKey('user',    fn() => $db->query('SELECT * FROM users WHERE id = ?', [$id]));
$set->spawnWithKey('orders',  fn() => $db->query('SELECT * FROM orders WHERE user_id = ?', [$id]));
$set->spawnWithKey('reviews', fn() => $api->get("/users/{$id}/reviews"));

$set->seal();
$data = $set->joinAll()->await();
// $set->count() === 0, all entries removed

return new UserProfile($data['user'], $data['orders'], $data['reviews']);

joinNext() — processing tasks as they complete

$set = new Async\TaskSet(concurrency: 5);

foreach ($urls as $url) {
    $set->spawn(fn() => httpClient()->get($url)->getBody());
}
$set->seal();

while ($set->count() > 0) {
    $result = $set->joinNext()->await();
    echo "Got result, remaining: {$set->count()}\n";
}
$set = new Async\TaskSet();

$set->spawn(fn() => searchProvider1($query));
$set->spawn(fn() => searchProvider2($query));
$set->spawn(fn() => searchProvider3($query));

// First successful result, entry removed
$result = $set->joinAny()->await();
echo "Found, active tasks: {$set->count()}\n";

foreach — streaming processing

$set = new Async\TaskSet(concurrency: 20);

foreach ($imageFiles as $file) {
    $set->spawn(fn() => processImage($file));
}
$set->seal();

foreach ($set as $key => [$result, $error]) {
    if ($error !== null) {
        log("Error processing $key: {$error->getMessage()}");
        continue;
    }
    saveToStorage($result);
    // Entry removed, memory freed
}

Worker loop with dynamic task addition

$set = new Async\TaskSet(concurrency: 10);

// One coroutine adds tasks
spawn(function() use ($set, $queue) {
    while ($message = $queue->receive()) {
        $set->spawn(fn() => processMessage($message));
    }
    $set->seal();
});

// Another processes results
spawn(function() use ($set) {
    foreach ($set as $key => [$result, $error]) {
        if ($error !== null) {
            log("Error: {$error->getMessage()}");
        }
    }
});

Equivalents in Other Languages

Feature PHP TaskSet Python asyncio Kotlin Go
Dynamic set spawn() + joinNext() asyncio.as_completed() Channel + select errgroup + chan
Auto-cleanup Automatic Manual management Manual management Manual management
Concurrency limit concurrency: N Semaphore Semaphore Buffered channel
Streaming iteration foreach async for + as_completed for + Channel for range + chan

Contents