Класс Async\TaskSet

(PHP 8.6+, True Async 1.0)

Введение

TaskGroup отлично подходит для сценариев, когда все целью являются результаты, а не задачи. Но есть много ситуаций, когда нужно контролировать число задач, а результаты потребляться потоком.

Типичные примеры:

TaskSet спроектирована для решения данных проблем. Она удаляет завершённые задачи автоматически в момент доставки результата через joinNext(), joinAll(), joinAny() или foreach.

Отличия от TaskGroup

Свойство TaskGroup TaskSet
Хранение результатов Все результаты до явного запроса Удаляются после доставки
Повторный вызов методов Идемпотентный — тот же результат Каждый вызов — следующий элемент
count() Общее число задач Число задач, ещё не доставленных
Методы ожидания all(), race(), any() joinAll(), joinNext(), joinAny()
Итерация Записи остаются Записи удаляются после foreach
Сценарий Фиксированный набор задач Динамический поток задач

Идемпотентность vs потребление

Главное концептуальное отличие TaskSet от TaskGroup.

TaskGroup — идемпотентный. Вызовы race(), any(), all() всегда возвращают один и тот же результат. Итерация через foreach всегда обходит все задачи. Результаты хранятся в группе и доступны повторно:

$group = new Async\TaskGroup();

$group->spawn(fn() => "alpha");
$group->spawn(fn() => "beta");
$group->spawn(fn() => "gamma");
$group->seal();

// race() всегда возвращает одну и ту же первую завершившуюся задачу
$first  = $group->race()->await(); // "alpha"
$same   = $group->race()->await(); // "alpha" — тот же результат!

// all() всегда возвращает полный массив
$all1 = $group->all()->await(); // ["alpha", "beta", "gamma"]
$all2 = $group->all()->await(); // ["alpha", "beta", "gamma"] — тот же массив!

// foreach всегда обходит все элементы
foreach ($group as $key => [$result, $error]) { /* 3 итерации */ }
foreach ($group as $key => [$result, $error]) { /* снова 3 итерации */ }

echo $group->count(); // 3 — всегда 3

TaskSet — потребляющий. Каждый вызов joinNext() / joinAny() извлекает следующий элемент и удаляет его из набора. Повторный foreach не найдёт уже доставленных записей. Это поведение аналогично чтению из очереди или канала:

$set = new Async\TaskSet();

$set->spawn(fn() => "alpha");
$set->spawn(fn() => "beta");
$set->spawn(fn() => "gamma");

// joinNext() каждый раз возвращает СЛЕДУЮЩИЙ результат
$first  = $set->joinNext()->await(); // "alpha"
$second = $set->joinNext()->await(); // "beta" — другой результат!
$third  = $set->joinNext()->await(); // "gamma"

echo $set->count(); // 0 — набор пуст

// joinAll() после полного потребления — пустой массив
$set->seal();
$rest = $set->joinAll()->await(); // [] — нечего возвращать

Та же логика работает и для итерации:

$set = new Async\TaskSet();

$set->spawn(fn() => "alpha");
$set->spawn(fn() => "beta");
$set->spawn(fn() => "gamma");
$set->seal();

// Первый foreach потребляет все результаты
foreach ($set as $key => [$result, $error]) {
    echo "$result\n"; // "alpha", "beta", "gamma"
}

echo $set->count(); // 0

// Второй foreach — пуст, итерировать нечего
foreach ($set as $key => [$result, $error]) {
    echo "это не выполнится\n";
}

Правило: если вам нужно обращаться к результатам повторно — используйте TaskGroup. Если результаты обрабатываются однократно и должны освобождать память — используйте TaskSet.

Семантика join-методов

В отличие от TaskGroup, где race() / any() / all() оставляют записи в группе, TaskSet использует методы с семантикой join — результат доставлен, запись удалена:

Автоматическая очистка

Автоочистка работает во всех точках доставки результатов:

$set = new Async\TaskSet();

$set->spawn(fn() => "a");
$set->spawn(fn() => "b");
echo $set->count(); // 2

$set->joinNext()->await();
echo $set->count(); // 1

$set->joinNext()->await();
echo $set->count(); // 0

При итерации через foreach каждая обработанная запись удаляется немедленно:

$set = new Async\TaskSet();

foreach ($urls as $url) {
    $set->spawn(fn() => fetch($url));
}
$set->seal();

foreach ($set as $key => [$result, $error]) {
    // $set->count() уменьшается с каждой итерацией
    process($result);
}

Лимит конкурентности

Как и TaskGroup, TaskSet поддерживает ограничение конкурентности:

$set = new Async\TaskSet(concurrency: 10);

foreach ($tasks as $task) {
    $set->spawn(fn() => processTask($task));
}

Задачи, превышающие лимит, помещаются в очередь и запускаются при освобождении слота.

Обзор класса

final class Async\TaskSet implements Async\Awaitable, Countable, IteratorAggregate {

    /* Методы */
    public __construct(?int $concurrency = null, ?Async\Scope $scope = null)

    /* Добавление задач */
    public spawn(callable $task, mixed ...$args): void
    public spawnWithKey(string|int $key, callable $task, mixed ...$args): void

    /* Ожидание результатов (с автоочисткой) */
    public joinNext(): Async\Future
    public joinAny(): Async\Future
    public joinAll(bool $ignoreErrors = false): Async\Future

    /* Жизненный цикл */
    public seal(): void
    public cancel(?Async\AsyncCancellation $cancellation = null): void
    public dispose(): void
    public finally(Closure $callback): void

    /* Состояние */
    public isFinished(): bool
    public isSealed(): bool
    public count(): int

    /* Ожидание завершения */
    public awaitCompletion(): void

    /* Итерация (с автоочисткой) */
    public getIterator(): Iterator
}

Примеры

joinAll() — параллельная загрузка с автоочисткой

$set = new Async\TaskSet();

$set->spawnWithKey('user',    fn() => $db->query('SELECT * FROM users WHERE id = ?', [$id]));
$set->spawnWithKey('orders',  fn() => $db->query('SELECT * FROM orders WHERE user_id = ?', [$id]));
$set->spawnWithKey('reviews', fn() => $api->get("/users/{$id}/reviews"));

$set->seal();
$data = $set->joinAll()->await();
// $set->count() === 0, все записи удалены

return new UserProfile($data['user'], $data['orders'], $data['reviews']);

joinNext() — обработка задач по мере готовности

$set = new Async\TaskSet(concurrency: 5);

foreach ($urls as $url) {
    $set->spawn(fn() => httpClient()->get($url)->getBody());
}
$set->seal();

while ($set->count() > 0) {
    $result = $set->joinNext()->await();
    echo "Получен результат, осталось: {$set->count()}\n";
}

joinAny() — устойчивый поиск

$set = new Async\TaskSet();

$set->spawn(fn() => searchProvider1($query));
$set->spawn(fn() => searchProvider2($query));
$set->spawn(fn() => searchProvider3($query));

// Первый успешный результат, запись удалена
$result = $set->joinAny()->await();
echo "Найдено, активных задач: {$set->count()}\n";

foreach — потоковая обработка

$set = new Async\TaskSet(concurrency: 20);

foreach ($imageFiles as $file) {
    $set->spawn(fn() => processImage($file));
}
$set->seal();

foreach ($set as $key => [$result, $error]) {
    if ($error !== null) {
        log("Ошибка обработки $key: {$error->getMessage()}");
        continue;
    }
    saveToStorage($result);
    // Запись удалена, память освобождена
}

Worker-цикл с динамическим добавлением задач

$set = new Async\TaskSet(concurrency: 10);

// Одна корутина добавляет задачи
spawn(function() use ($set, $queue) {
    while ($message = $queue->receive()) {
        $set->spawn(fn() => processMessage($message));
    }
    $set->seal();
});

// Другая обрабатывает результаты
spawn(function() use ($set) {
    foreach ($set as $key => [$result, $error]) {
        if ($error !== null) {
            log("Ошибка: {$error->getMessage()}");
        }
    }
});

Аналоги в других языках

Возможность PHP TaskSet Python asyncio Kotlin Go
Динамический набор spawn() + joinNext() asyncio.as_completed() Channel + select errgroup + chan
Автоочистка Автоматически Ручное управление Ручное управление Ручное управление
Лимит конкурентности concurrency: N Semaphore Semaphore Буферизованный канал
Потоковая итерация foreach async for + as_completed for + Channel for range + chan

Содержание