La Classe Async\TaskGroup
(PHP 8.6+, True Async 1.0)
Introduzione
Quando si lavora con le coroutine, spesso è necessario lanciare diversi task e attendere i loro risultati.
Usando direttamente spawn() e await(), lo sviluppatore si assume la responsabilità di garantire
che ogni coroutine venga attesa o cancellata. Una coroutine dimenticata continua a funzionare,
un errore non gestito va perso e la cancellazione di un gruppo di task richiede codice manuale.
Le funzioni await_all() e await_any() non tengono conto delle relazioni logiche tra task diversi.
Per esempio, quando devi fare diverse richieste, prendere il primo risultato e cancellare il resto,
await_any() richiede codice aggiuntivo dal programmatore per cancellare i task rimanenti.
Tale codice può essere piuttosto complesso, quindi await_all() e await_any() dovrebbero essere considerati
anti-pattern in questa situazione.
Usare Scope per questo scopo non è adatto, poiché le coroutine dei task possono creare altre coroutine figlie,
il che richiede al programmatore di mantenere una lista delle coroutine dei task e tracciarle separatamente.
TaskGroup risolve tutti questi problemi. È un pattern di concorrenza strutturata di alto livello che garantisce: tutti i task verranno correttamente attesi o cancellati. Raggruppa logicamente i task e permette di operare su di essi come un’unità singola.
Strategie di Attesa
TaskGroup fornisce diverse strategie per attendere i risultati.
Ognuna restituisce un Future, che permette di passare un timeout: ->await(Async\timeout(5.0)).
all()– restituisce unFutureche si risolve con un array di tutti i risultati dei task, o viene rifiutato conCompositeExceptionse almeno un task ha lanciato un’eccezione. Con il parametroignoreErrors: true, restituisce solo i risultati riusciti.race()– restituisce unFutureche si risolve con il risultato del primo task completato, indipendentemente dal fatto che sia completato con successo o meno. Gli altri task continuano a funzionare.any()– restituisce unFutureche si risolve con il risultato del primo task completato con successo, ignorando gli errori. Se tutti i task falliscono – viene rifiutato conCompositeException.awaitCompletion()– attende il completamento totale di tutti i task, così come delle altre coroutine nelloScope.
Limite di Concorrenza
Quando viene specificato il parametro concurrency, TaskGroup funziona come un pool di coroutine:
i task che superano il limite attendono in coda e non creano una coroutine finché non si libera uno slot.
Questo risparmia memoria e controlla il carico durante l’elaborazione di un gran numero di task.
TaskGroup e Scope
TaskGroup usa Scope per gestire il ciclo di vita delle coroutine dei task.
Quando si crea un TaskGroup, puoi passare uno Scope esistente o lasciare che TaskGroup crei uno Scope figlio da quello corrente.
Tutti i task aggiunti a TaskGroup vengono eseguiti all’interno di questo Scope.
Questo significa che quando TaskGroup viene cancellato o distrutto,
tutte le coroutine verranno automaticamente cancellate, garantendo una gestione sicura delle risorse e prevenendo perdite.
Sigillatura e Iterazione
TaskGroup permette di aggiungere task dinamicamente, finché non viene
sigillato usando il metodo seal().
Il metodo all() restituisce un Future che si attiva quando tutti i task esistenti
nella coda sono completati. Questo permette di usare TaskGroup in un ciclo, dove i task vengono aggiunti dinamicamente,
e all() viene chiamato per ottenere i risultati dell’insieme corrente di task.
TaskGroup supporta anche foreach per iterare sui risultati man mano che diventano pronti.
In questo caso, seal() deve essere chiamato dopo aver aggiunto tutti i task per segnalare che
non ci saranno nuovi task, e foreach può terminare dopo aver elaborato tutti i risultati.
Panoramica della Classe
final class Async\TaskGroup implements Async\Awaitable, Countable, IteratorAggregate {
/* Metodi */
public __construct(?int $concurrency = null, ?Async\Scope $scope = null)
/* Aggiunta di task */
public spawn(callable $task, mixed ...$args): void
public spawnWithKey(string|int $key, callable $task, mixed ...$args): void
/* Attesa dei risultati */
public all(bool $ignoreErrors = false): Async\Future
public race(): Async\Future
public any(): Async\Future
public awaitCompletion(): void
/* Ciclo di vita */
public seal(): void
public cancel(?Async\AsyncCancellation $cancellation = null): void
public dispose(): void
public finally(Closure $callback): void
/* Stato */
public isFinished(): bool
public isSealed(): bool
public count(): int
/* Risultati ed errori */
public getResults(): array
public getErrors(): array
public suppressErrors(): void
/* Iterazione */
public getIterator(): Iterator
}
Esempi
all() – Caricamento Parallelo dei Dati
Lo scenario più comune – caricamento di dati da più sorgenti simultaneamente:
$group = new Async\TaskGroup();
$group->spawnWithKey('user', fn() => $db->query('SELECT * FROM users WHERE id = ?', [$id]));
$group->spawnWithKey('orders', fn() => $db->query('SELECT * FROM orders WHERE user_id = ?', [$id]));
$group->spawnWithKey('reviews', fn() => $api->get("/users/{$id}/reviews"));
$data = $group->all()->await();
// ['user' => ..., 'orders' => [...], 'reviews' => [...]]
return new UserProfile($data['user'], $data['orders'], $data['reviews']);
Tutte e tre le richieste vengono eseguite in parallelo. Se una di esse lancia un’eccezione,
all() restituisce un Future che viene rifiutato con CompositeException.
race() – Richieste Hedged
Il pattern “hedged request” – invia la stessa richiesta a più repliche e prendi la prima risposta. Questo riduce la latenza con server lenti o sovraccarichi:
$replicas = ['db-replica-1', 'db-replica-2', 'db-replica-3'];
$group = new Async\TaskGroup();
foreach ($replicas as $host) {
$group->spawn(fn() => pg_query($host, 'SELECT * FROM products WHERE id = 42'));
}
// La prima risposta è il risultato, gli altri task continuano a funzionare
$product = $group->race()->await();
any() – Ricerca Tollerante agli Errori
Interroga più provider, prendi la prima risposta riuscita, ignorando gli errori:
$group = new Async\TaskGroup();
$group->spawn(fn() => searchGoogle($query));
$group->spawn(fn() => searchBing($query));
$group->spawn(fn() => searchDuckDuckGo($query));
// any() ignora i provider che hanno fallito e restituisce il primo risultato riuscito
$results = $group->any()->await();
// Gli errori dei provider falliti devono essere gestiti esplicitamente, altrimenti il distruttore lancerà un'eccezione
$group->suppressErrors();
Se tutti i provider falliscono, any() lancerà CompositeException con tutti gli errori.
Limite di Concorrenza – Elaborazione di una Coda
Elabora 10.000 task, ma non più di 50 simultaneamente:
$group = new Async\TaskGroup(concurrency: 50);
foreach ($urls as $url) {
$group->spawn(fn() => httpClient()->get($url)->getBody());
}
$results = $group->all()->await();
TaskGroup accoda automaticamente i task. Una coroutine viene creata solo quando
si libera uno slot, risparmiando memoria con grandi volumi di task.
Iterazione sui Risultati Man Mano che si Completano
Elabora i risultati senza attendere il completamento di tutti i task:
$group = new Async\TaskGroup();
foreach ($imageFiles as $file) {
$group->spawn(fn() => processImage($file));
}
$group->seal();
foreach ($group as $key => $result) {
// I risultati arrivano man mano che diventano pronti, non nell'ordine in cui sono stati aggiunti
saveToStorage($result);
}
Timeout per un Gruppo di Task
Limita il tempo di attesa dei risultati:
$group = new Async\TaskGroup();
$group->spawn(fn() => slowApi()->fetchReport());
$group->spawn(fn() => anotherApi()->fetchStats());
$group->seal();
try {
$results = $group->all()->await(Async\timeout(5.0));
} catch (Async\TimeoutException) {
echo "Non è stato possibile ottenere i dati entro 5 secondi";
}
Analoghi in Altri Linguaggi
| Capacità | PHP TaskGroup |
Python asyncio.TaskGroup |
Java StructuredTaskScope |
Kotlin coroutineScope |
|---|---|---|---|---|
| Concorrenza strutturata | seal() + all()->await() |
blocco async with |
try-with-resources + join() |
Automaticamente tramite scope |
| Strategie di attesa | all(), race(), any() -> Future |
Solo all (tramite async with) |
ShutdownOnSuccess, ShutdownOnFailure |
async/await, select |
| Limite di concorrenza | concurrency: N |
No (serve Semaphore) |
No | No (serve Semaphore) |
| Iterazione dei risultati | foreach man mano che si completano |
No | No | Channel |
| Gestione degli errori | CompositeException, getErrors() |
ExceptionGroup |
throwIfFailed() |
L’eccezione cancella lo scope |
PHP TaskGroup combina capacità che in altri linguaggi sono distribuite su più primitive:
limitazione della concorrenza senza semaforo, strategie di attesa multiple in un singolo oggetto e iterazione dei risultati man mano che si completano.
Contenuti
- TaskGroup::__construct – Crea un gruppo di task
- TaskGroup::spawn – Aggiungi un task con chiave auto-increment
- TaskGroup::spawnWithKey – Aggiungi un task con chiave esplicita
- TaskGroup::all – Attendi tutti i task e ottieni i risultati
- TaskGroup::race – Ottieni il risultato del primo task completato
- TaskGroup::any – Ottieni il risultato del primo task riuscito
- TaskGroup::awaitCompletion – Attendi il completamento di tutti i task
- TaskGroup::seal – Sigilla il gruppo per nuovi task
- TaskGroup::cancel – Cancella tutti i task
- TaskGroup::dispose – Distruggi lo scope del gruppo
- TaskGroup::finally – Registra un handler di completamento
- TaskGroup::isFinished – Verifica se tutti i task sono terminati
- TaskGroup::isSealed – Verifica se il gruppo è sigillato
- TaskGroup::count – Ottieni il numero di task
- TaskGroup::getResults – Ottieni un array di risultati riusciti
- TaskGroup::getErrors – Ottieni un array di errori
- TaskGroup::suppressErrors – Marca gli errori come gestiti
- TaskGroup::getIterator – Itera sui risultati man mano che si completano