Die Klasse Async\TaskSet

(PHP 8.6+, True Async 1.0)

Einleitung

TaskGroup eignet sich hervorragend für Szenarien, in denen es um die Ergebnisse geht und nicht um die Tasks selbst. Es gibt jedoch viele Situationen, in denen die Anzahl der Tasks kontrolliert werden muss, während die Ergebnisse als Stream konsumiert werden.

Typische Beispiele:

TaskSet wurde entwickelt, um diese Probleme zu lösen. Es entfernt abgeschlossene Tasks automatisch bei der Ergebnisauslieferung über joinNext(), joinAll(), joinAny() oder foreach.

Unterschiede zu TaskGroup

Eigenschaft TaskGroup TaskSet
Ergebnisspeicherung Alle Ergebnisse bis zur expliziten Abfrage Nach Auslieferung entfernt
Wiederholte Methodenaufrufe Idempotent — gleiches Ergebnis Jeder Aufruf — nächstes Element
count() Gesamtanzahl der Tasks Anzahl nicht ausgelieferter Tasks
Wartemethoden all(), race(), any() joinAll(), joinNext(), joinAny()
Iteration Einträge bleiben erhalten Einträge werden nach foreach entfernt
Anwendungsfall Feste Menge von Tasks Dynamischer Task-Stream

Idempotenz vs. Konsumierung

Der zentrale konzeptuelle Unterschied zwischen TaskSet und TaskGroup.

TaskGroup ist idempotent. Aufrufe von race(), any(), all() liefern stets dasselbe Ergebnis. Die Iteration über foreach durchläuft immer alle Tasks. Ergebnisse werden in der Gruppe gespeichert und sind wiederholt abrufbar:

$group = new Async\TaskGroup();

$group->spawn(fn() => "alpha");
$group->spawn(fn() => "beta");
$group->spawn(fn() => "gamma");
$group->seal();

// race() liefert immer denselben zuerst abgeschlossenen Task
$first  = $group->race()->await(); // "alpha"
$same   = $group->race()->await(); // "alpha" — dasselbe Ergebnis!

// all() liefert immer das vollständige Array
$all1 = $group->all()->await(); // ["alpha", "beta", "gamma"]
$all2 = $group->all()->await(); // ["alpha", "beta", "gamma"] — dasselbe Array!

// foreach durchläuft immer alle Elemente
foreach ($group as $key => [$result, $error]) { /* 3 Iterationen */ }
foreach ($group as $key => [$result, $error]) { /* erneut 3 Iterationen */ }

echo $group->count(); // 3 — immer 3

TaskSet ist konsumierend. Jeder Aufruf von joinNext() / joinAny() entnimmt das nächste Element und entfernt es aus dem Set. Ein erneuter foreach findet bereits ausgelieferte Einträge nicht mehr. Dieses Verhalten ist vergleichbar mit dem Lesen aus einer Queue oder einem Channel:

$set = new Async\TaskSet();

$set->spawn(fn() => "alpha");
$set->spawn(fn() => "beta");
$set->spawn(fn() => "gamma");

// joinNext() liefert jedes Mal das NÄCHSTE Ergebnis
$first  = $set->joinNext()->await(); // "alpha"
$second = $set->joinNext()->await(); // "beta" — anderes Ergebnis!
$third  = $set->joinNext()->await(); // "gamma"

echo $set->count(); // 0 — Set ist leer

// joinAll() nach vollständiger Konsumierung — leeres Array
$set->seal();
$rest = $set->joinAll()->await(); // [] — nichts mehr vorhanden

Dieselbe Logik gilt auch für die Iteration:

$set = new Async\TaskSet();

$set->spawn(fn() => "alpha");
$set->spawn(fn() => "beta");
$set->spawn(fn() => "gamma");
$set->seal();

// Erstes foreach konsumiert alle Ergebnisse
foreach ($set as $key => [$result, $error]) {
    echo "$result\n"; // "alpha", "beta", "gamma"
}

echo $set->count(); // 0

// Zweites foreach — leer, nichts zu iterieren
foreach ($set as $key => [$result, $error]) {
    echo "wird nicht ausgeführt\n";
}

Regel: Wenn Sie wiederholt auf Ergebnisse zugreifen müssen — verwenden Sie TaskGroup. Wenn Ergebnisse einmalig verarbeitet werden und Speicher freigeben sollen — verwenden Sie TaskSet.

Semantik der Join-Methoden

Im Gegensatz zu TaskGroup, wo race() / any() / all() die Einträge in der Gruppe belassen, verwendet TaskSet Methoden mit Join-Semantik — Ergebnis ausgeliefert, Eintrag entfernt:

Automatische Bereinigung

Die automatische Bereinigung greift an allen Punkten der Ergebnisauslieferung:

$set = new Async\TaskSet();

$set->spawn(fn() => "a");
$set->spawn(fn() => "b");
echo $set->count(); // 2

$set->joinNext()->await();
echo $set->count(); // 1

$set->joinNext()->await();
echo $set->count(); // 0

Bei der Iteration über foreach wird jeder verarbeitete Eintrag sofort entfernt:

$set = new Async\TaskSet();

foreach ($urls as $url) {
    $set->spawn(fn() => fetch($url));
}
$set->seal();

foreach ($set as $key => [$result, $error]) {
    // $set->count() verringert sich mit jeder Iteration
    process($result);
}

Nebenläufigkeitslimit

Wie TaskGroup unterstützt auch TaskSet eine Begrenzung der Nebenläufigkeit:

$set = new Async\TaskSet(concurrency: 10);

foreach ($tasks as $task) {
    $set->spawn(fn() => processTask($task));
}

Tasks, die das Limit überschreiten, werden in eine Warteschlange eingereiht und gestartet, sobald ein Slot frei wird.

Klassenübersicht

final class Async\TaskSet implements Async\Awaitable, Countable, IteratorAggregate {

    /* Methoden */
    public __construct(?int $concurrency = null, ?Async\Scope $scope = null)

    /* Tasks hinzufügen */
    public spawn(callable $task, mixed ...$args): void
    public spawnWithKey(string|int $key, callable $task, mixed ...$args): void

    /* Auf Ergebnisse warten (mit automatischer Bereinigung) */
    public joinNext(): Async\Future
    public joinAny(): Async\Future
    public joinAll(bool $ignoreErrors = false): Async\Future

    /* Lebenszyklus */
    public seal(): void
    public cancel(?Async\AsyncCancellation $cancellation = null): void
    public dispose(): void
    public finally(Closure $callback): void

    /* Zustand */
    public isFinished(): bool
    public isSealed(): bool
    public count(): int

    /* Auf Abschluss warten */
    public awaitCompletion(): void

    /* Iteration (mit automatischer Bereinigung) */
    public getIterator(): Iterator
}

Beispiele

joinAll() — paralleles Laden mit automatischer Bereinigung

$set = new Async\TaskSet();

$set->spawnWithKey('user',    fn() => $db->query('SELECT * FROM users WHERE id = ?', [$id]));
$set->spawnWithKey('orders',  fn() => $db->query('SELECT * FROM orders WHERE user_id = ?', [$id]));
$set->spawnWithKey('reviews', fn() => $api->get("/users/{$id}/reviews"));

$set->seal();
$data = $set->joinAll()->await();
// $set->count() === 0, alle Einträge entfernt

return new UserProfile($data['user'], $data['orders'], $data['reviews']);

joinNext() — Tasks verarbeiten, sobald sie fertig sind

$set = new Async\TaskSet(concurrency: 5);

foreach ($urls as $url) {
    $set->spawn(fn() => httpClient()->get($url)->getBody());
}
$set->seal();

while ($set->count() > 0) {
    $result = $set->joinNext()->await();
    echo "Ergebnis erhalten, verbleibend: {$set->count()}\n";
}

joinAny() — fehlertolerante Suche

$set = new Async\TaskSet();

$set->spawn(fn() => searchProvider1($query));
$set->spawn(fn() => searchProvider2($query));
$set->spawn(fn() => searchProvider3($query));

// Erstes erfolgreiches Ergebnis, Eintrag entfernt
$result = $set->joinAny()->await();
echo "Gefunden, aktive Tasks: {$set->count()}\n";

foreach — Stream-Verarbeitung

$set = new Async\TaskSet(concurrency: 20);

foreach ($imageFiles as $file) {
    $set->spawn(fn() => processImage($file));
}
$set->seal();

foreach ($set as $key => [$result, $error]) {
    if ($error !== null) {
        log("Fehler bei der Verarbeitung von $key: {$error->getMessage()}");
        continue;
    }
    saveToStorage($result);
    // Eintrag entfernt, Speicher freigegeben
}

Worker-Schleife mit dynamischem Hinzufügen von Tasks

$set = new Async\TaskSet(concurrency: 10);

// Eine Coroutine fügt Tasks hinzu
spawn(function() use ($set, $queue) {
    while ($message = $queue->receive()) {
        $set->spawn(fn() => processMessage($message));
    }
    $set->seal();
});

// Eine andere verarbeitet die Ergebnisse
spawn(function() use ($set) {
    foreach ($set as $key => [$result, $error]) {
        if ($error !== null) {
            log("Fehler: {$error->getMessage()}");
        }
    }
});

Entsprechungen in anderen Sprachen

Funktion PHP TaskSet Python asyncio Kotlin Go
Dynamisches Set spawn() + joinNext() asyncio.as_completed() Channel + select errgroup + chan
Automatische Bereinigung Automatisch Manuelle Verwaltung Manuelle Verwaltung Manuelle Verwaltung
Nebenläufigkeitslimit concurrency: N Semaphore Semaphore Gepufferter Channel
Stream-Iteration foreach async for + as_completed for + Channel for range + chan

Inhalt