Async\Pool: Universeller Ressourcenpool
Warum ein Pool benötigt wird
Bei der Arbeit mit Koroutinen entsteht das Problem der gemeinsamen Nutzung von I/O-Deskriptoren.
Wenn derselbe Socket von zwei Koroutinen verwendet wird, die gleichzeitig verschiedene Pakete schreiben oder lesen,
werden die Daten durcheinander gebracht und das Ergebnis ist unvorhersehbar.
Daher können Sie nicht einfach dasselbe PDO-Objekt in verschiedenen Koroutinen verwenden!
Andererseits ist das wiederholte Erstellen einer separaten Verbindung für jede Koroutine eine sehr verschwenderische Strategie. Dadurch werden die Vorteile der gleichzeitigen I/O zunichte gemacht. Deshalb werden üblicherweise Verbindungspools für die Interaktion mit externen APIs, Datenbanken und anderen Ressourcen verwendet.
Ein Pool löst dieses Problem: Ressourcen werden im Voraus erstellt, auf Anfrage an Koroutinen vergeben und zur Wiederverwendung zurückgegeben.
use Async\Pool;
// HTTP-Verbindungspool
$pool = new Pool(
factory: fn() => new HttpConnection('api.example.com'),
destructor: fn($conn) => $conn->close(),
min: 2,
max: 10,
);
// Eine Koroutine nimmt eine Verbindung, nutzt sie und gibt sie zurück
$conn = $pool->acquire();
$response = $conn->request('GET', '/users');
$pool->release($conn);
Pool erstellen
$pool = new Pool(
factory: fn() => createResource(), // Wie eine Ressource erstellt wird
destructor: fn($r) => $r->close(), // Wie eine Ressource zerstört wird
healthcheck: fn($r) => $r->ping(), // Lebt die Ressource noch?
beforeAcquire: fn($r) => $r->isValid(), // Prüfung vor der Ausgabe
beforeRelease: fn($r) => !$r->isBroken(), // Prüfung vor der Rückgabe
min: 2, // 2 Ressourcen vorab erstellen
max: 10, // Maximal 10 Ressourcen
healthcheckInterval: 30000, // Alle 30 Sek. prüfen
);
| Parameter | Zweck | Standard |
|---|---|---|
factory |
Erstellt eine neue Ressource. Erforderlich | – |
destructor |
Zerstört eine Ressource beim Entfernen aus dem Pool | null |
healthcheck |
Periodische Prüfung: Lebt die Ressource noch? | null |
beforeAcquire |
Prüfung vor der Ausgabe. false – zerstören und nächste nehmen |
null |
beforeRelease |
Prüfung vor der Rückgabe. false – zerstören, nicht zurückgeben |
null |
min |
Wie viele Ressourcen vorab erstellt werden (Pre-Warming) | 0 |
max |
Maximale Ressourcen (frei + in Verwendung) | 10 |
healthcheckInterval |
Intervall der Hintergrund-Gesundheitsprüfung (ms, 0 = deaktiviert) | 0 |
Acquire und Release
Blockierendes Acquire
// Warten, bis eine Ressource verfügbar ist (unbegrenzt)
$resource = $pool->acquire();
// Maximal 5 Sekunden warten
$resource = $pool->acquire(timeout: 5000);
Wenn der Pool voll ist (alle Ressourcen sind in Verwendung und max ist erreicht), wird die Koroutine angehalten
und wartet, bis eine andere Koroutine eine Ressource zurückgibt. Andere Koroutinen laufen weiter.
Bei Timeout wird eine PoolException geworfen.
Nicht-blockierendes tryAcquire
$resource = $pool->tryAcquire();
if ($resource === null) {
echo "Alle Ressourcen sind belegt, versuchen wir es später\n";
} else {
// Ressource verwenden
$pool->release($resource);
}
tryAcquire() gibt sofort null zurück, wenn keine Ressource verfügbar ist. Die Koroutine wird nicht angehalten.
Release
$resource = $pool->acquire();
try {
doWork($resource);
} finally {
// WICHTIG: Ressource immer an den Pool zurückgeben!
$pool->release($resource);
}
Wenn beforeRelease gesetzt ist und false zurückgibt, gilt die Ressource als beschädigt
und wird zerstört, anstatt an den Pool zurückgegeben zu werden.
Statistiken
echo $pool->count(); // Gesamtressourcen (frei + in Verwendung)
echo $pool->idleCount(); // Frei, bereit zur Ausgabe
echo $pool->activeCount(); // Derzeit von Koroutinen verwendet
Pool schließen
$pool->close();
Beim Schließen:
- Alle wartenden Koroutinen erhalten eine
PoolException - Alle freien Ressourcen werden über den
destructorzerstört - Belegte Ressourcen werden beim anschließenden
releasezerstört
Healthcheck: Hintergrundprüfung
Wenn healthcheckInterval gesetzt ist, prüft der Pool regelmäßig die freien Ressourcen.
Tote Ressourcen werden zerstört und durch neue ersetzt (wenn die Anzahl unter min gefallen ist).
$pool = new Pool(
factory: fn() => new DatabaseConnection($dsn),
destructor: fn($conn) => $conn->close(),
healthcheck: fn($conn) => $conn->ping(), // Prüfung: Lebt die Verbindung noch?
min: 3,
max: 10,
healthcheckInterval: 10000, // Alle 10 Sekunden
);
Healthcheck funktioniert nur für freie Ressourcen. Belegte Ressourcen werden nicht geprüft.
Circuit Breaker
Der Pool implementiert das Circuit Breaker-Muster zur Verwaltung der Dienstverfügbarkeit.
Drei Zustände
| Zustand | Verhalten |
|---|---|
ACTIVE |
Alles funktioniert, Anfragen werden durchgelassen |
INACTIVE |
Dienst nicht verfügbar, acquire() wirft eine Ausnahme |
RECOVERING |
Testmodus, begrenzte Anfragen |
use Async\CircuitBreakerState;
// Zustand prüfen
$state = $pool->getState(); // CircuitBreakerState::ACTIVE
// Manuelle Steuerung
$pool->deactivate(); // Auf INACTIVE umschalten
$pool->recover(); // Auf RECOVERING umschalten
$pool->activate(); // Auf ACTIVE umschalten
Automatische Verwaltung über Strategie
use Async\CircuitBreakerStrategy;
class MyStrategy implements CircuitBreakerStrategy
{
private int $failures = 0;
public function reportSuccess(mixed $source): void {
$this->failures = 0;
$source->activate();
}
public function reportFailure(mixed $source, \Throwable $error): void {
$this->failures++;
if ($this->failures >= 5) {
$source->deactivate();
}
}
}
$pool->setCircuitBreakerStrategy(new MyStrategy());
Die Strategie wird automatisch aufgerufen:
reportSuccess()– bei erfolgreicher Rückgabe der Ressource an den PoolreportFailure()– wennbeforeReleasefalsezurückgibt (Ressource ist beschädigt)
Ressourcenlebenszyklus
Praxisbeispiel: Redis-Verbindungspool
use Async\Pool;
use function Async\spawn;
use function Async\await;
$redis = new Pool(
factory: function() {
$conn = new Redis();
$conn->connect('127.0.0.1', 6379);
return $conn;
},
destructor: fn($conn) => $conn->close(),
healthcheck: fn($conn) => $conn->ping(),
min: 2,
max: 20,
healthcheckInterval: 15000,
);
// 100 Koroutinen lesen gleichzeitig aus Redis über 20 Verbindungen
$coroutines = [];
for ($i = 0; $i < 100; $i++) {
$coroutines[] = spawn(function() use ($redis, $i) {
$conn = $redis->acquire(timeout: 3000);
try {
return $conn->get("key:$i");
} finally {
$redis->release($conn);
}
});
}
$results = array_map(fn($c) => await($c), $coroutines);
$redis->close();
PDO Pool
Für PDO gibt es eine integrierte Integration mit Async\Pool, die das Pooling vollständig transparent macht.
Anstelle von manuellem acquire/release wird der Pool automatisch im Hintergrund verwaltet.
Mehr erfahren: PDO Pool
Wie geht es weiter?
- Async\Pool-Architektur – Interna, Diagramme, C API
- PDO Pool – transparenter Pool für PDO
- Koroutinen – wie Koroutinen funktionieren
- Channels – Datenaustausch zwischen Koroutinen