Класс Async\TaskGroup

(PHP 8.6+, True Async 1.0)

Введение

При работе с корутинами часто нужно запустить несколько задач и дождаться их результатов. Используя spawn() и await() напрямую, разработчик берёт на себя ответственность за то, чтобы каждая корутина была ожидаема или отменена. Забытая корутина продолжает работать, необработанная ошибка теряется, а отмена группы задач требует ручного кода.

Функции await_all(), await_any() не учитывают логические связи между разными задачами. Например, когда нужно сделать несколько запросов, взять первый результат и отменить остальные, await_any() требует от программиста дополнительного кода для отмены оставшихся задач. Такой код может быть достаточно сложным, поэтому функции await_all(), await_any() стоит рассматривать как антипатерны в этой ситуации.

Использование Scope для этой цели не подходит, так как корутины-задачи могут создавать другие дочерние корутины, что требует от программиста реализовывать список корутин-задач и отдельно отслеживать их.

TaskGroup решает все эти проблемы. Это высокоуровневый паттерн structured concurrency, который гарантирует: все задачи будут корректно ожидаемы или отменены. Он логические объединяет задачи, и позволяет оперировать ими как единым целым.

Стратегии ожидания

TaskGroup предоставляет несколько стратегий ожидания результатов. Каждая из них возвращает Future, что позволяет передать таймаут: ->await(Async\timeout(5.0)).

  • all() — возвращает Future, который разрешится массивом результатов всех задач, или реджектится с CompositeException, если хотя бы одна задача бросила исключение. С параметром ignoreErrors: true возвращает только успешные результаты.
  • race() — возвращает Future, который разрешится результатом первой завершившейся задачи, независимо от того, успешно она завершилась или нет. Остальные задачи продолжают работать.
  • any() — возвращает Future, который разрешится результатом первой успешно завершившейся задачи, игнорируя ошибки. Если все задачи упали — реджектится с CompositeException.
  • awaitCompletion() — ожидает полного завершения всех задач, а также других корутин в Scope.

Лимит конкурентности

При указании параметра concurrency TaskGroup работает как пул корутин: задачи, превышающие лимит, ожидают в очереди и не создают корутину до появления свободного слота. Это позволяет экономить память и контролировать нагрузку при обработке большого числа задач.

TaskGroup и Scope

TaskGroup использует Scope для управления жизненным циклом корутин-задач. При создании TaskGroup можно передать существующий Scope или позволить TaskGroup создать дочерний Scope от текущего. Все задачи, добавленные в TaskGroup, выполняются внутри этого Scope. Значит при отмене TaskGroup или при его уничтожении, все корутины будут автоматически отменены, что обеспечивает безопасное управление ресурсами и предотвращает утечки.

Запечатывание и итерация

TaskGroup позволяет добавлять задачи динамически, до тех пор пока он не будет запечатан с помощью метода seal().

Метод all() возвращает Future, который срабатывает, когда все существующие задачи в очереди завершены. Это позволяет использовать TaskGroup в цикле, где задачи добавляются динамически, а all() вызывается для получения результатов текущего набора задач.

TaskGroup также поддерживает foreach для итерации по результатам по мере их готовности. В этом случае seal() необходимо вызвать после добавления всех задач, чтобы сигнализировать о том, что новых задач не будет, и foreach может завершиться после обработки всех результатов.

Обзор класса

php
final class Async\TaskGroup implements Async\Awaitable, Countable, IteratorAggregate {

    /* Методы */
    public __construct(?int $concurrency = null, ?Async\Scope $scope = null)

    /* Добавление задач */
    public spawn(callable $task, mixed ...$args): void
    public spawnWithKey(string|int $key, callable $task, mixed ...$args): void

    /* Ожидание результатов */
    public all(bool $ignoreErrors = false): Async\Future
    public race(): Async\Future
    public any(): Async\Future
    public awaitCompletion(): void

    /* Жизненный цикл */
    public seal(): void
    public cancel(?Async\AsyncCancellation $cancellation = null): void
    public dispose(): void
    public finally(Closure $callback): void

    /* Состояние */
    public isFinished(): bool
    public isSealed(): bool
    public count(): int

    /* Результаты и ошибки */
    public getResults(): array
    public getErrors(): array
    public suppressErrors(): void

    /* Итерация */
    public getIterator(): Iterator
}

Примеры

all() — параллельная загрузка данных

Самый частый сценарий — загрузить данные из нескольких источников одновременно:

php
$group = new Async\TaskGroup();

$group->spawnWithKey('user',    fn() => $db->query('SELECT * FROM users WHERE id = ?', [$id]));
$group->spawnWithKey('orders',  fn() => $db->query('SELECT * FROM orders WHERE user_id = ?', [$id]));
$group->spawnWithKey('reviews', fn() => $api->get("/users/{$id}/reviews"));

$data = $group->all()->await();
// ['user' => ..., 'orders' => [...], 'reviews' => [...]]

return new UserProfile($data['user'], $data['orders'], $data['reviews']);

Все три запроса выполняются параллельно. Если любой из них бросит исключение, all() вернёт Future, который реджектится с CompositeException.

race() — hedged requests

Паттерн «hedged request» — отправить один и тот же запрос на несколько реплик и взять первый ответ. Это снижает задержку при медленных или перегруженных серверах:

php
$replicas = ['db-replica-1', 'db-replica-2', 'db-replica-3'];

$group = new Async\TaskGroup();

foreach ($replicas as $host) {
    $group->spawn(fn() => pg_query($host, 'SELECT * FROM products WHERE id = 42'));
}

// Первый ответ — результат, остальные задачи продолжают работать
$product = $group->race()->await();

any() — устойчивый к ошибкам поиск

Запросить несколько провайдеров, взять первый успешный ответ, игнорируя ошибки:

php
$group = new Async\TaskGroup();

$group->spawn(fn() => searchGoogle($query));
$group->spawn(fn() => searchBing($query));
$group->spawn(fn() => searchDuckDuckGo($query));

// any() игнорирует провайдеры, которые упали, и возвращает первый успешный результат
$results = $group->any()->await();

// Ошибки упавших провайдеров нужно явно обработать, иначе деструктор бросит исключение
$group->suppressErrors();

Если все провайдеры упали, any() выбросит CompositeException со всеми ошибками.

Лимит конкурентности — обработка очереди

Обработать 10 000 задач, но не более 50 одновременно:

php
$group = new Async\TaskGroup(concurrency: 50);

foreach ($urls as $url) {
    $group->spawn(fn() => httpClient()->get($url)->getBody());
}

$results = $group->all()->await();

TaskGroup автоматически ставит задачи в очередь. Корутина создаётся только тогда, когда появляется свободный слот, что экономит память при большом объёме задач.

Итерация по результатам по мере готовности

Обрабатывать результаты, не дожидаясь завершения всех задач:

php
$group = new Async\TaskGroup();

foreach ($imageFiles as $file) {
    $group->spawn(fn() => processImage($file));
}

$group->seal();

foreach ($group as $key => $result) {
    // Результаты приходят по мере готовности, а не в порядке добавления
    saveToStorage($result);
}

Таймаут для группы задач

Ограничить время ожидания результатов:

php
$group = new Async\TaskGroup();

$group->spawn(fn() => slowApi()->fetchReport());
$group->spawn(fn() => anotherApi()->fetchStats());
$group->seal();

try {
    $results = $group->all()->await(Async\timeout(5.0));
} catch (Async\TimeoutException) {
    echo "Не удалось получить данные за 5 секунд";
}

Аналоги в других языках

ВозможностьPHP TaskGroupPython asyncio.TaskGroupJava StructuredTaskScopeKotlin coroutineScope
Structured concurrencyseal() + all()->await()async with блокtry-with-resources + join()Автоматически через scope
Стратегии ожиданияall(), race(), any() → FutureТолько all (через async with)ShutdownOnSuccess, ShutdownOnFailureasync/await, select
Лимит конкурентностиconcurrency: NНет (нужен Semaphore)НетНет (нужен Semaphore)
Итерация по результатамforeach по мере готовностиНетНетChannel
Обработка ошибокCompositeException, getErrors()ExceptionGroupthrowIfFailed()Исключение отменяет scope

PHP TaskGroup объединяет возможности, которые в других языках разнесены по нескольким примитивам: лимит конкурентности без семафора, несколько стратегий ожидания в одном объекте и итерация по результатам по мере готовности.

Содержание