Async\Pool: Pool Universale di Risorse
Perché Serve un Pool
Quando si lavora con le coroutine, sorge il problema della condivisione dei descrittori di I/O. Se lo stesso socket viene usato da due coroutine che contemporaneamente scrivono o leggono pacchetti diversi da esso, i dati si mescolano e il risultato è imprevedibile. Pertanto, non puoi semplicemente usare lo stesso oggetto PDO in diverse coroutine!
D'altra parte, creare una connessione separata per ogni coroutine è una strategia molto dispendiosa. Annulla i vantaggi dell'I/O concorrente. Pertanto, tipicamente si usano pool di connessioni per interagire con API esterne, database e altre risorse.
Un pool risolve questo problema: le risorse vengono create in anticipo, fornite alle coroutine su richiesta, e restituite per il riutilizzo.
use Async\Pool;
// Pool di connessioni HTTP
$pool = new Pool(
factory: fn() => new HttpConnection('api.example.com'),
destructor: fn($conn) => $conn->close(),
min: 2,
max: 10,
);
// Una coroutine prende una connessione, la usa e la restituisce
$conn = $pool->acquire();
$response = $conn->request('GET', '/users');
$pool->release($conn);Creazione di un Pool
$pool = new Pool(
factory: fn() => createResource(), // Come creare una risorsa
destructor: fn($r) => $r->close(), // Come distruggere una risorsa
healthcheck: fn($r) => $r->ping(), // La risorsa è attiva?
beforeAcquire: fn($r) => $r->isValid(), // Controllo prima della consegna
beforeRelease: fn($r) => !$r->isBroken(), // Controllo prima della restituzione
min: 2, // Pre-crea 2 risorse
max: 10, // Massimo 10 risorse
healthcheckInterval: 30000, // Controllo ogni 30 sec
);| Parametro | Scopo | Predefinito |
|---|---|---|
factory | Crea una nuova risorsa. Obbligatorio | -- |
destructor | Distrugge una risorsa quando viene rimossa dal pool | null |
healthcheck | Controllo periodico: la risorsa è ancora attiva? | null |
beforeAcquire | Controllo prima della consegna. false -- distruggi e prendi la prossima | null |
beforeRelease | Controllo prima della restituzione. false -- distruggi, non restituire | null |
min | Quante risorse creare in anticipo (pre-warming) | 0 |
max | Risorse massime (libere + in uso) | 10 |
healthcheckInterval | Intervallo di health check in background (ms, 0 = disabilitato) | 0 |
Acquire e Release
Acquire Bloccante
// Attendi finché una risorsa non diventa disponibile (indefinitamente)
$resource = $pool->acquire();
// Attendi al massimo 5 secondi
$resource = $pool->acquire(timeout: 5000);Se il pool è pieno (tutte le risorse sono in uso e max è raggiunto), la coroutine si sospende e attende finché un'altra coroutine non restituisce una risorsa. Le altre coroutine continuano a funzionare.
Al timeout, viene lanciata una PoolException.
tryAcquire Non Bloccante
$resource = $pool->tryAcquire();
if ($resource === null) {
echo "Tutte le risorse sono occupate, riproviamo dopo\n";
} else {
// Usa la risorsa
$pool->release($resource);
}tryAcquire() restituisce null immediatamente se una risorsa non è disponibile. La coroutine non viene sospesa.
Release
$resource = $pool->acquire();
try {
doWork($resource);
} finally {
// IMPORTANTE: restituisci sempre la risorsa al pool!
$pool->release($resource);
}Se beforeRelease è impostato e restituisce false, la risorsa è considerata danneggiata e viene distrutta invece di essere restituita al pool.
Statistiche
echo $pool->count(); // Risorse totali (libere + in uso)
echo $pool->idleCount(); // Libere, pronte per la consegna
echo $pool->activeCount(); // Attualmente in uso dalle coroutineChiusura del Pool
$pool->close();Alla chiusura:
- Tutte le coroutine in attesa ricevono una
PoolException - Tutte le risorse libere vengono distrutte tramite
destructor - Le risorse occupate vengono distrutte al successivo
release
Healthcheck: Controllo in Background
Se healthcheckInterval è impostato, il pool controlla periodicamente le risorse libere. Le risorse morte vengono distrutte e sostituite con nuove (se il conteggio è sceso sotto min).
$pool = new Pool(
factory: fn() => new DatabaseConnection($dsn),
destructor: fn($conn) => $conn->close(),
healthcheck: fn($conn) => $conn->ping(), // Controllo: la connessione è attiva?
min: 3,
max: 10,
healthcheckInterval: 10000, // Ogni 10 secondi
);L'healthcheck funziona solo per le risorse libere. Le risorse occupate non vengono controllate.
Circuit Breaker
Il pool implementa il pattern Circuit Breaker per gestire la disponibilità del servizio.
Tre Stati
| Stato | Comportamento |
|---|---|
ACTIVE | Tutto funziona, le richieste passano |
INACTIVE | Servizio non disponibile, acquire() lancia un'eccezione |
RECOVERING | Modalità test, richieste limitate |
use Async\CircuitBreakerState;
// Verifica lo stato
$state = $pool->getState(); // CircuitBreakerState::ACTIVE
// Controllo manuale
$pool->deactivate(); // Passa a INACTIVE
$pool->recover(); // Passa a RECOVERING
$pool->activate(); // Passa a ACTIVEGestione Automatica tramite Strategia
use Async\CircuitBreakerStrategy;
class MyStrategy implements CircuitBreakerStrategy
{
private int $failures = 0;
public function reportSuccess(mixed $source): void {
$this->failures = 0;
$source->activate();
}
public function reportFailure(mixed $source, \Throwable $error): void {
$this->failures++;
if ($this->failures >= 5) {
$source->deactivate();
}
}
}
$pool->setCircuitBreakerStrategy(new MyStrategy());La strategia viene chiamata automaticamente:
reportSuccess()-- alla restituzione riuscita della risorsa al poolreportFailure()-- quandobeforeReleaserestituiscefalse(risorsa danneggiata)
Ciclo di Vita della Risorsa
Esempio Reale: Pool di Connessioni Redis
use Async\Pool;
use function Async\spawn;
use function Async\await;
$redis = new Pool(
factory: function() {
$conn = new Redis();
$conn->connect('127.0.0.1', 6379);
return $conn;
},
destructor: fn($conn) => $conn->close(),
healthcheck: fn($conn) => $conn->ping(),
min: 2,
max: 20,
healthcheckInterval: 15000,
);
// 100 coroutine leggono concorrentemente da Redis attraverso 20 connessioni
$coroutines = [];
for ($i = 0; $i < 100; $i++) {
$coroutines[] = spawn(function() use ($redis, $i) {
$conn = $redis->acquire(timeout: 3000);
try {
return $conn->get("key:$i");
} finally {
$redis->release($conn);
}
});
}
$results = array_map(fn($c) => await($c), $coroutines);
$redis->close();PDO Pool
Per PDO, esiste un'integrazione built-in con Async\Pool che rende il pooling completamente trasparente. Invece di acquire/release manuali, il pool viene gestito automaticamente dietro le quinte.
Scopri di più: PDO Pool
Cosa Leggere Dopo?
- Architettura di Async\Pool -- internals, diagrammi, API C
- PDO Pool -- pool trasparente per PDO
- Coroutine -- come funzionano le coroutine
- Canali -- scambio di dati tra coroutine