Channels

Channels are more useful for communication in a multithreaded environment than in a single-threaded one. They serve for safe data transfer from one coroutine to another. If you need to modify shared data, in a single-threaded environment it’s simpler to pass an object to different coroutines than to create a channel.

However, channels are useful in the following scenarios:

For example, there are many URLs to crawl, but no more than N connections simultaneously:

use Async\Channel;
use Async\Scope;

const MAX_CONNECTIONS = 10;
const MAX_QUEUE = 100;

$tasks = new Scope();
$channel = new Channel(MAX_QUEUE);

for($i = 0; $i < MAX_CONNECTIONS; $i++) {
    $tasks->spawn(function() use ($channel) {
        while (!$channel->isClosed()) {
            $url = $channel->recv();
            $content = file_get_contents($url);
            echo "Fetched page {$url}, length: " . strlen($content) . "\n";
        }
    });
}

// Fill the channel with values
for($i = 0; $i < MAX_CONNECTIONS * 2; $i++) {
    $channel->send("https://example.com/{$i}");
}

The MAX_QUEUE constant in this example acts as a limiter for the producer, creating backpressure – a situation where the producer cannot send data until the consumer frees up space in the channel.

Unbuffered Channel (Rendezvous)

A channel with buffer size 0 works in rendezvous mode: send() blocks until another coroutine calls recv(), and vice versa. This ensures strict synchronization:

use Async\Channel;

$ch = new Channel(0); // Rendezvous channel

spawn(function() use ($ch) {
    echo "Sender: before send\n";
    $ch->send("hello");
    echo "Sender: send completed\n"; // Only after recv()
});

spawn(function() use ($ch) {
    echo "Receiver: before recv\n";
    $value = $ch->recv();
    echo "Receiver: got $value\n";
});

Cancellation

The recv() and send() methods accept an optional cancellation token (Completable) that allows cancelling the wait on any condition. This is more flexible than a fixed timeout – you can cancel an operation from another coroutine, on a signal, on an event, or by time:

use Async\Channel;
use Async\CancelledException;

$ch = new Channel(0);

// Cancellation by timeout
spawn(function() use ($ch) {
    try {
        $ch->recv(Async\timeout(50)); // Wait no more than 50 ms
    } catch (CancelledException $e) {
        echo "Nobody sent data within 50 ms\n";
    }
});

// Cancellation by a custom condition
spawn(function() use ($ch) {
    $cancel = new \Async\Future();

    spawn(function() use ($cancel) {
        // Cancel after 50 ms
        Async\delay(50);
        $cancel->complete(null);
    });

    try {
        $ch->send("data", $cancel);
    } catch (CancelledException $e) {
        echo "Nobody received the data -- operation cancelled\n";
    }
});

Competing Receivers

If multiple coroutines are waiting on recv() on the same channel, each value is received by only one of them. Values are not duplicated:

use Async\Channel;

$ch = new Channel(0);

// Sender
spawn(function() use ($ch) {
    for ($i = 1; $i <= 3; $i++) {
        $ch->send($i);
    }
    $ch->close();
});

// Receiver A
spawn(function() use ($ch) {
    try {
        while (true) {
            $v = $ch->recv();
            echo "A received: $v\n";
        }
    } catch (\Async\ChannelException) {}
});

// Receiver B
spawn(function() use ($ch) {
    try {
        while (true) {
            $v = $ch->recv();
            echo "B received: $v\n";
        }
    } catch (\Async\ChannelException) {}
});

// Each value (1, 2, 3) will be received by only A or B, but not both

This pattern is useful for implementing worker pools, where multiple coroutines compete for tasks from a shared queue.