Класс Async\TaskGroup

(PHP 8.6+, True Async 1.0)

Введение

При работе с корутинами часто нужно запустить несколько задач и дождаться их результатов. Используя spawn() и await() напрямую, разработчик берёт на себя ответственность за то, чтобы каждая корутина была ожидаема или отменена. Забытая корутина продолжает работать, необработанная ошибка теряется, а отмена группы задач требует ручного кода.

Функции await_all(), await_any() не учитывают логические связи между разными задачами. Например, когда нужно сделать несколько запросов, взять первый результат и отменить остальные, await_any() требует от программиста дополнительного кода для отмены оставшихся задач. Такой код может быть достаточно сложным, поэтому функции await_all(), await_any() стоит рассматривать как антипатерны в этой ситуации.

Использование Scope для этой цели не подходит, так как корутины-задачи могут создавать другие дочерние корутины, что требует от программиста реализовывать список корутин-задач и отдельно отслеживать их.

TaskGroup решает все эти проблемы. Это высокоуровневый паттерн structured concurrency, который гарантирует: все задачи будут корректно ожидаемы или отменены. Он логические объединяет задачи, и позволяет оперировать ими как единым целым.

Стратегии ожидания

TaskGroup предоставляет несколько стратегий ожидания результатов. Каждая из них возвращает Future, что позволяет передать таймаут: ->await(Async\timeout(5.0)).

Лимит конкурентности

При указании параметра concurrency TaskGroup работает как пул корутин: задачи, превышающие лимит, ожидают в очереди и не создают корутину до появления свободного слота. Это позволяет экономить память и контролировать нагрузку при обработке большого числа задач.

TaskGroup и Scope

TaskGroup использует Scope для управления жизненным циклом корутин-задач. При создании TaskGroup можно передать существующий Scope или позволить TaskGroup создать дочерний Scope от текущего. Все задачи, добавленные в TaskGroup, выполняются внутри этого Scope. Значит при отмене TaskGroup или при его уничтожении, все корутины будут автоматически отменены, что обеспечивает безопасное управление ресурсами и предотвращает утечки.

Запечатывание и итерация

TaskGroup позволяет добавлять задачи динамически, до тех пор пока он не будет запечатан с помощью метода seal().

Метод all() возвращает Future, который срабатывает, когда все существующие задачи в очереди завершены. Это позволяет использовать TaskGroup в цикле, где задачи добавляются динамически, а all() вызывается для получения результатов текущего набора задач.

TaskGroup также поддерживает foreach для итерации по результатам по мере их готовности. В этом случае seal() необходимо вызвать после добавления всех задач, чтобы сигнализировать о том, что новых задач не будет, и foreach может завершиться после обработки всех результатов.

Обзор класса

final class Async\TaskGroup implements Async\Awaitable, Countable, IteratorAggregate {

    /* Методы */
    public __construct(?int $concurrency = null, ?Async\Scope $scope = null)

    /* Добавление задач */
    public spawn(callable $task, mixed ...$args): void
    public spawnWithKey(string|int $key, callable $task, mixed ...$args): void

    /* Ожидание результатов */
    public all(bool $ignoreErrors = false): Async\Future
    public race(): Async\Future
    public any(): Async\Future
    public awaitCompletion(): void

    /* Жизненный цикл */
    public seal(): void
    public cancel(?Async\AsyncCancellation $cancellation = null): void
    public dispose(): void
    public finally(Closure $callback): void

    /* Состояние */
    public isFinished(): bool
    public isSealed(): bool
    public count(): int

    /* Результаты и ошибки */
    public getResults(): array
    public getErrors(): array
    public suppressErrors(): void

    /* Итерация */
    public getIterator(): Iterator
}

Примеры

all() — параллельная загрузка данных

Самый частый сценарий — загрузить данные из нескольких источников одновременно:

$group = new Async\TaskGroup();

$group->spawnWithKey('user',    fn() => $db->query('SELECT * FROM users WHERE id = ?', [$id]));
$group->spawnWithKey('orders',  fn() => $db->query('SELECT * FROM orders WHERE user_id = ?', [$id]));
$group->spawnWithKey('reviews', fn() => $api->get("/users/{$id}/reviews"));

$data = $group->all()->await();
// ['user' => ..., 'orders' => [...], 'reviews' => [...]]

return new UserProfile($data['user'], $data['orders'], $data['reviews']);

Все три запроса выполняются параллельно. Если любой из них бросит исключение, all() вернёт Future, который реджектится с CompositeException.

race() — hedged requests

Паттерн «hedged request» — отправить один и тот же запрос на несколько реплик и взять первый ответ. Это снижает задержку при медленных или перегруженных серверах:

$replicas = ['db-replica-1', 'db-replica-2', 'db-replica-3'];

$group = new Async\TaskGroup();

foreach ($replicas as $host) {
    $group->spawn(fn() => pg_query($host, 'SELECT * FROM products WHERE id = 42'));
}

// Первый ответ — результат, остальные задачи продолжают работать
$product = $group->race()->await();

any() — устойчивый к ошибкам поиск

Запросить несколько провайдеров, взять первый успешный ответ, игнорируя ошибки:

$group = new Async\TaskGroup();

$group->spawn(fn() => searchGoogle($query));
$group->spawn(fn() => searchBing($query));
$group->spawn(fn() => searchDuckDuckGo($query));

// any() игнорирует провайдеры, которые упали, и возвращает первый успешный результат
$results = $group->any()->await();

// Ошибки упавших провайдеров нужно явно обработать, иначе деструктор бросит исключение
$group->suppressErrors();

Если все провайдеры упали, any() выбросит CompositeException со всеми ошибками.

Лимит конкурентности — обработка очереди

Обработать 10 000 задач, но не более 50 одновременно:

$group = new Async\TaskGroup(concurrency: 50);

foreach ($urls as $url) {
    $group->spawn(fn() => httpClient()->get($url)->getBody());
}

$results = $group->all()->await();

TaskGroup автоматически ставит задачи в очередь. Корутина создаётся только тогда, когда появляется свободный слот, что экономит память при большом объёме задач.

Итерация по результатам по мере готовности

Обрабатывать результаты, не дожидаясь завершения всех задач:

$group = new Async\TaskGroup();

foreach ($imageFiles as $file) {
    $group->spawn(fn() => processImage($file));
}

$group->seal();

foreach ($group as $key => $result) {
    // Результаты приходят по мере готовности, а не в порядке добавления
    saveToStorage($result);
}

Таймаут для группы задач

Ограничить время ожидания результатов:

$group = new Async\TaskGroup();

$group->spawn(fn() => slowApi()->fetchReport());
$group->spawn(fn() => anotherApi()->fetchStats());
$group->seal();

try {
    $results = $group->all()->await(Async\timeout(5.0));
} catch (Async\TimeoutException) {
    echo "Не удалось получить данные за 5 секунд";
}

Аналоги в других языках

Возможность PHP TaskGroup Python asyncio.TaskGroup Java StructuredTaskScope Kotlin coroutineScope
Structured concurrency seal() + all()->await() async with блок try-with-resources + join() Автоматически через scope
Стратегии ожидания all(), race(), any() → Future Только all (через async with) ShutdownOnSuccess, ShutdownOnFailure async/await, select
Лимит конкурентности concurrency: N Нет (нужен Semaphore) Нет Нет (нужен Semaphore)
Итерация по результатам foreach по мере готовности Нет Нет Channel
Обработка ошибок CompositeException, getErrors() ExceptionGroup throwIfFailed() Исключение отменяет scope

PHP TaskGroup объединяет возможности, которые в других языках разнесены по нескольким примитивам: лимит конкурентности без семафора, несколько стратегий ожидания в одном объекте и итерация по результатам по мере готовности.

Содержание