Async\TaskGroup 클래스

(PHP 8.6+, True Async 1.0)

소개

코루틴으로 작업할 때 여러 작업을 실행하고 그 결과를 기다려야 하는 경우가 자주 있습니다. spawn()await()를 직접 사용하면 개발자가 모든 코루틴이 대기되거나 취소되었는지 확인해야 할 책임을 집니다. 잊혀진 코루틴은 계속 실행되고, 처리되지 않은 오류는 사라지며, 작업 그룹을 취소하려면 수동 코드가 필요합니다.

await_all()await_any() 함수는 서로 다른 작업 간의 논리적 관계를 고려하지 않습니다. 예를 들어, 여러 요청을 보내고 첫 번째 결과를 가져온 다음 나머지를 취소해야 할 때, await_any()는 나머지 작업을 취소하기 위해 프로그래머에게 추가 코드를 요구합니다. 이러한 코드는 상당히 복잡할 수 있으므로 await_all()await_any()는 이 상황에서 안티패턴으로 간주해야 합니다.

이 목적으로 Scope를 사용하는 것은 적합하지 않습니다. 작업 코루틴이 다른 자식 코루틴을 생성할 수 있어 프로그래머가 작업 코루틴 목록을 유지하고 별도로 추적해야 하기 때문입니다.

TaskGroup은 이 모든 문제를 해결합니다. 모든 작업이 적절히 대기되거나 취소됨을 보장하는 고수준 구조적 동시성 패턴입니다. 작업을 논리적으로 그룹화하고 하나의 단위로 조작할 수 있게 합니다.

대기 전략

TaskGroup은 결과를 기다리는 여러 전략을 제공합니다. 각각은 Future를 반환하며, 타임아웃을 전달할 수 있습니다: ->await(Async\timeout(5.0)).

  • all() -- 모든 작업 결과의 배열로 해결되는 Future를 반환합니다. 최소 하나의 작업이 예외를 던지면 CompositeException으로 거부됩니다. ignoreErrors: true 매개변수를 사용하면 성공한 결과만 반환합니다.
  • race() -- 성공 여부와 관계없이 첫 번째 완료된 작업의 결과로 해결되는 Future를 반환합니다. 다른 작업은 계속 실행됩니다.
  • any() -- 첫 번째 성공적으로 완료된 작업의 결과로 해결되는 Future를 반환하며, 오류를 무시합니다. 모든 작업이 실패하면 CompositeException으로 거부됩니다.
  • awaitCompletion() -- Scope 내의 다른 코루틴뿐만 아니라 모든 작업의 완전한 완료를 기다립니다.

동시성 제한

concurrency 매개변수가 지정되면 TaskGroup은 코루틴 풀로 작동합니다: 제한을 초과하는 작업은 대기열에서 기다리며 빈 슬롯이 나타날 때까지 코루틴을 생성하지 않습니다. 이는 대량의 작업을 처리할 때 메모리를 절약하고 부하를 제어합니다.

TaskGroup과 Scope

TaskGroup은 작업 코루틴의 생명주기를 관리하기 위해 Scope를 사용합니다. TaskGroup을 생성할 때 기존 Scope를 전달하거나 TaskGroup이 현재 스코프에서 자식 Scope를 생성하도록 할 수 있습니다. TaskGroup에 추가된 모든 작업은 이 Scope 내에서 실행됩니다. 이는 TaskGroup이 취소되거나 파괴될 때 모든 코루틴이 자동으로 취소되어 안전한 리소스 관리와 누수 방지를 보장합니다.

봉인과 반복

TaskGroupseal() 메서드를 사용하여 봉인할 때까지 동적으로 작업을 추가할 수 있습니다.

all() 메서드는 대기열의 모든 기존 작업이 완료되면 트리거되는 Future를 반환합니다. 이를 통해 작업이 동적으로 추가되는 루프에서 TaskGroup을 사용하고, 현재 작업 세트의 결과를 얻기 위해 all()을 호출할 수 있습니다.

TaskGroup은 결과가 준비되는 대로 반복하기 위한 foreach도 지원합니다. 이 경우 모든 작업을 추가한 후 seal()을 호출하여 새 작업이 없음을 알리고, 모든 결과를 처리한 후 foreach가 종료될 수 있도록 해야 합니다.

클래스 개요

php
final class Async\TaskGroup implements Async\Awaitable, Countable, IteratorAggregate {

    /* 메서드 */
    public __construct(?int $concurrency = null, ?Async\Scope $scope = null)

    /* 작업 추가 */
    public spawn(callable $task, mixed ...$args): void
    public spawnWithKey(string|int $key, callable $task, mixed ...$args): void

    /* 결과 대기 */
    public all(bool $ignoreErrors = false): Async\Future
    public race(): Async\Future
    public any(): Async\Future
    public awaitCompletion(): void

    /* 생명주기 */
    public seal(): void
    public cancel(?Async\AsyncCancellation $cancellation = null): void
    public dispose(): void
    public finally(Closure $callback): void

    /* 상태 */
    public isFinished(): bool
    public isSealed(): bool
    public count(): int

    /* 결과와 오류 */
    public getResults(): array
    public getErrors(): array
    public suppressErrors(): void

    /* 반복 */
    public getIterator(): Iterator
}

예제

all() -- 병렬 데이터 로딩

가장 흔한 시나리오 -- 여러 소스에서 동시에 데이터를 로딩합니다:

php
$group = new Async\TaskGroup();

$group->spawnWithKey('user',    fn() => $db->query('SELECT * FROM users WHERE id = ?', [$id]));
$group->spawnWithKey('orders',  fn() => $db->query('SELECT * FROM orders WHERE user_id = ?', [$id]));
$group->spawnWithKey('reviews', fn() => $api->get("/users/{$id}/reviews"));

$data = $group->all()->await();
// ['user' => ..., 'orders' => [...], 'reviews' => [...]]

return new UserProfile($data['user'], $data['orders'], $data['reviews']);

세 요청 모두 병렬로 실행됩니다. 그 중 하나가 예외를 던지면 all()CompositeException으로 거부되는 Future를 반환합니다.

race() -- 헤지드 요청

"헤지드 요청" 패턴 -- 동일한 요청을 여러 레플리카에 보내고 첫 번째 응답을 취합니다. 이는 느리거나 과부하된 서버에서 지연 시간을 줄입니다:

php
$replicas = ['db-replica-1', 'db-replica-2', 'db-replica-3'];

$group = new Async\TaskGroup();

foreach ($replicas as $host) {
    $group->spawn(fn() => pg_query($host, 'SELECT * FROM products WHERE id = 42'));
}

// 첫 번째 응답이 결과가 되고, 다른 작업은 계속 실행됩니다
$product = $group->race()->await();

any() -- 오류 내성 검색

여러 제공자에 쿼리하고, 오류를 무시하면서 첫 번째 성공한 응답을 취합니다:

php
$group = new Async\TaskGroup();

$group->spawn(fn() => searchGoogle($query));
$group->spawn(fn() => searchBing($query));
$group->spawn(fn() => searchDuckDuckGo($query));

// any()는 실패한 제공자를 무시하고 첫 번째 성공한 결과를 반환합니다
$results = $group->any()->await();

// 실패한 제공자의 오류는 명시적으로 처리해야 합니다, 그렇지 않으면 소멸자가 예외를 던집니다
$group->suppressErrors();

모든 제공자가 실패하면 any()는 모든 오류가 포함된 CompositeException을 던집니다.

동시성 제한 -- 큐 처리

10,000개의 작업을 처리하되 동시에 50개 이하로:

php
$group = new Async\TaskGroup(concurrency: 50);

foreach ($urls as $url) {
    $group->spawn(fn() => httpClient()->get($url)->getBody());
}

$results = $group->all()->await();

TaskGroup은 자동으로 작업을 대기열에 넣습니다. 빈 슬롯이 나타날 때만 코루틴이 생성되어 대량의 작업에서 메모리를 절약합니다.

완료되는 대로 결과 반복

모든 작업이 끝날 때까지 기다리지 않고 결과를 처리합니다:

php
$group = new Async\TaskGroup();

foreach ($imageFiles as $file) {
    $group->spawn(fn() => processImage($file));
}

$group->seal();

foreach ($group as $key => $result) {
    // 결과는 추가된 순서가 아니라 준비되는 대로 도착합니다
    saveToStorage($result);
}

작업 그룹 타임아웃

결과 대기 시간을 제한합니다:

php
$group = new Async\TaskGroup();

$group->spawn(fn() => slowApi()->fetchReport());
$group->spawn(fn() => anotherApi()->fetchStats());
$group->seal();

try {
    $results = $group->all()->await(Async\timeout(5.0));
} catch (Async\TimeoutException) {
    echo "5초 내에 데이터를 가져오지 못했습니다";
}

다른 언어의 유사 기능

기능PHP TaskGroupPython asyncio.TaskGroupJava StructuredTaskScopeKotlin coroutineScope
구조적 동시성seal() + all()->await()async with 블록try-with-resources + join()스코프를 통해 자동
대기 전략all(), race(), any() -> Futureasync with만 가능ShutdownOnSuccess, ShutdownOnFailureasync/await, select
동시성 제한concurrency: N없음 (Semaphore 필요)없음없음 (Semaphore 필요)
결과 반복완료 순서대로 foreach없음없음Channel
오류 처리CompositeException, getErrors()ExceptionGroupthrowIfFailed()예외가 스코프를 취소

PHP TaskGroup은 다른 언어에서 여러 프리미티브에 분산되어 있는 기능을 결합합니다: 세마포어 없는 동시성 제한, 단일 객체의 다양한 대기 전략, 완료 순서대로의 결과 반복.

목차