Async\TaskGroup 클래스

(PHP 8.6+, True Async 1.0)

소개

코루틴으로 작업할 때 여러 작업을 실행하고 그 결과를 기다려야 하는 경우가 자주 있습니다. spawn()await()를 직접 사용하면 개발자가 모든 코루틴이 대기되거나 취소되었는지 확인해야 할 책임을 집니다. 잊혀진 코루틴은 계속 실행되고, 처리되지 않은 오류는 사라지며, 작업 그룹을 취소하려면 수동 코드가 필요합니다.

await_all()await_any() 함수는 서로 다른 작업 간의 논리적 관계를 고려하지 않습니다. 예를 들어, 여러 요청을 보내고 첫 번째 결과를 가져온 다음 나머지를 취소해야 할 때, await_any()는 나머지 작업을 취소하기 위해 프로그래머에게 추가 코드를 요구합니다. 이러한 코드는 상당히 복잡할 수 있으므로 await_all()await_any()는 이 상황에서 안티패턴으로 간주해야 합니다.

이 목적으로 Scope를 사용하는 것은 적합하지 않습니다. 작업 코루틴이 다른 자식 코루틴을 생성할 수 있어 프로그래머가 작업 코루틴 목록을 유지하고 별도로 추적해야 하기 때문입니다.

TaskGroup은 이 모든 문제를 해결합니다. 모든 작업이 적절히 대기되거나 취소됨을 보장하는 고수준 구조적 동시성 패턴입니다. 작업을 논리적으로 그룹화하고 하나의 단위로 조작할 수 있게 합니다.

대기 전략

TaskGroup은 결과를 기다리는 여러 전략을 제공합니다. 각각은 Future를 반환하며, 타임아웃을 전달할 수 있습니다: ->await(Async\timeout(5.0)).

동시성 제한

concurrency 매개변수가 지정되면 TaskGroup은 코루틴 풀로 작동합니다: 제한을 초과하는 작업은 대기열에서 기다리며 빈 슬롯이 나타날 때까지 코루틴을 생성하지 않습니다. 이는 대량의 작업을 처리할 때 메모리를 절약하고 부하를 제어합니다.

TaskGroup과 Scope

TaskGroup은 작업 코루틴의 생명주기를 관리하기 위해 Scope를 사용합니다. TaskGroup을 생성할 때 기존 Scope를 전달하거나 TaskGroup이 현재 스코프에서 자식 Scope를 생성하도록 할 수 있습니다. TaskGroup에 추가된 모든 작업은 이 Scope 내에서 실행됩니다. 이는 TaskGroup이 취소되거나 파괴될 때 모든 코루틴이 자동으로 취소되어 안전한 리소스 관리와 누수 방지를 보장합니다.

봉인과 반복

TaskGroupseal() 메서드를 사용하여 봉인할 때까지 동적으로 작업을 추가할 수 있습니다.

all() 메서드는 대기열의 모든 기존 작업이 완료되면 트리거되는 Future를 반환합니다. 이를 통해 작업이 동적으로 추가되는 루프에서 TaskGroup을 사용하고, 현재 작업 세트의 결과를 얻기 위해 all()을 호출할 수 있습니다.

TaskGroup은 결과가 준비되는 대로 반복하기 위한 foreach도 지원합니다. 이 경우 모든 작업을 추가한 후 seal()을 호출하여 새 작업이 없음을 알리고, 모든 결과를 처리한 후 foreach가 종료될 수 있도록 해야 합니다.

클래스 개요

final class Async\TaskGroup implements Async\Awaitable, Countable, IteratorAggregate {

    /* 메서드 */
    public __construct(?int $concurrency = null, ?Async\Scope $scope = null)

    /* 작업 추가 */
    public spawn(callable $task, mixed ...$args): void
    public spawnWithKey(string|int $key, callable $task, mixed ...$args): void

    /* 결과 대기 */
    public all(bool $ignoreErrors = false): Async\Future
    public race(): Async\Future
    public any(): Async\Future
    public awaitCompletion(): void

    /* 생명주기 */
    public seal(): void
    public cancel(?Async\AsyncCancellation $cancellation = null): void
    public dispose(): void
    public finally(Closure $callback): void

    /* 상태 */
    public isFinished(): bool
    public isSealed(): bool
    public count(): int

    /* 결과와 오류 */
    public getResults(): array
    public getErrors(): array
    public suppressErrors(): void

    /* 반복 */
    public getIterator(): Iterator
}

예제

all() – 병렬 데이터 로딩

가장 흔한 시나리오 – 여러 소스에서 동시에 데이터를 로딩합니다:

$group = new Async\TaskGroup();

$group->spawnWithKey('user',    fn() => $db->query('SELECT * FROM users WHERE id = ?', [$id]));
$group->spawnWithKey('orders',  fn() => $db->query('SELECT * FROM orders WHERE user_id = ?', [$id]));
$group->spawnWithKey('reviews', fn() => $api->get("/users/{$id}/reviews"));

$data = $group->all()->await();
// ['user' => ..., 'orders' => [...], 'reviews' => [...]]

return new UserProfile($data['user'], $data['orders'], $data['reviews']);

세 요청 모두 병렬로 실행됩니다. 그 중 하나가 예외를 던지면 all()CompositeException으로 거부되는 Future를 반환합니다.

race() – 헤지드 요청

“헤지드 요청” 패턴 – 동일한 요청을 여러 레플리카에 보내고 첫 번째 응답을 취합니다. 이는 느리거나 과부하된 서버에서 지연 시간을 줄입니다:

$replicas = ['db-replica-1', 'db-replica-2', 'db-replica-3'];

$group = new Async\TaskGroup();

foreach ($replicas as $host) {
    $group->spawn(fn() => pg_query($host, 'SELECT * FROM products WHERE id = 42'));
}

// 첫 번째 응답이 결과가 되고, 다른 작업은 계속 실행됩니다
$product = $group->race()->await();

any() – 오류 내성 검색

여러 제공자에 쿼리하고, 오류를 무시하면서 첫 번째 성공한 응답을 취합니다:

$group = new Async\TaskGroup();

$group->spawn(fn() => searchGoogle($query));
$group->spawn(fn() => searchBing($query));
$group->spawn(fn() => searchDuckDuckGo($query));

// any()는 실패한 제공자를 무시하고 첫 번째 성공한 결과를 반환합니다
$results = $group->any()->await();

// 실패한 제공자의 오류는 명시적으로 처리해야 합니다, 그렇지 않으면 소멸자가 예외를 던집니다
$group->suppressErrors();

모든 제공자가 실패하면 any()는 모든 오류가 포함된 CompositeException을 던집니다.

동시성 제한 – 큐 처리

10,000개의 작업을 처리하되 동시에 50개 이하로:

$group = new Async\TaskGroup(concurrency: 50);

foreach ($urls as $url) {
    $group->spawn(fn() => httpClient()->get($url)->getBody());
}

$results = $group->all()->await();

TaskGroup은 자동으로 작업을 대기열에 넣습니다. 빈 슬롯이 나타날 때만 코루틴이 생성되어 대량의 작업에서 메모리를 절약합니다.

완료되는 대로 결과 반복

모든 작업이 끝날 때까지 기다리지 않고 결과를 처리합니다:

$group = new Async\TaskGroup();

foreach ($imageFiles as $file) {
    $group->spawn(fn() => processImage($file));
}

$group->seal();

foreach ($group as $key => $result) {
    // 결과는 추가된 순서가 아니라 준비되는 대로 도착합니다
    saveToStorage($result);
}

작업 그룹 타임아웃

결과 대기 시간을 제한합니다:

$group = new Async\TaskGroup();

$group->spawn(fn() => slowApi()->fetchReport());
$group->spawn(fn() => anotherApi()->fetchStats());
$group->seal();

try {
    $results = $group->all()->await(Async\timeout(5.0));
} catch (Async\TimeoutException) {
    echo "5초 내에 데이터를 가져오지 못했습니다";
}

다른 언어의 유사 기능

기능 PHP TaskGroup Python asyncio.TaskGroup Java StructuredTaskScope Kotlin coroutineScope
구조적 동시성 seal() + all()->await() async with 블록 try-with-resources + join() 스코프를 통해 자동
대기 전략 all(), race(), any() -> Future async with만 가능 ShutdownOnSuccess, ShutdownOnFailure async/await, select
동시성 제한 concurrency: N 없음 (Semaphore 필요) 없음 없음 (Semaphore 필요)
결과 반복 완료 순서대로 foreach 없음 없음 Channel
오류 처리 CompositeException, getErrors() ExceptionGroup throwIfFailed() 예외가 스코프를 취소

PHP TaskGroup은 다른 언어에서 여러 프리미티브에 분산되어 있는 기능을 결합합니다: 세마포어 없는 동시성 제한, 단일 객체의 다양한 대기 전략, 완료 순서대로의 결과 반복.

목차