Asynchronous Operations: Extensions
Async in and of itself is a highly useful construct that can provide possible time-saving through its cooperative multitasking infrastructure. Async is especially useful with database access and caching, web resource access, and dealing with streams.
MySQL
The async MySQL extension is similar to the mysqli
extension that comes with HHVM. The async
MySQL extension is primarily used for asynchronously creating connections and querying MySQL databases.
The full API contains all of the classes and methods available for accessing MySQL via async; we will only cover a few of the more common scenarios here.
The primary class for connecting to a MySQL database is AsyncMysqlConnectionPool
and its
primary method is the async
connect
.
The primary class for querying a database is AsyncMysqlConnection
with the two main
query methods, query
and queryf
,
both of which are async
. There is also a function to ensure that queries to be executed are safe called
escapeString
.
The primary class for retrieving results from a query is an abstract class called AsyncMysqlResult
, which itself has two concrete
subclasses called AsyncMysqlQueryResult
and
AsyncMysqlErrorResult
. The main methods on these classes are
vectorRows
and mapRows
, both non-async.
Here is a simple example that shows how to get a user name from a database using this extension:
require __DIR__.'/async_mysql_connect.inc.php';
use \Hack\UserDocumentation\AsyncOps\Extensions\Examples\AsyncMysql\ConnectionInfo as CI
;
async function get_connection(): Awaitable<\AsyncMysqlConnection> {
// Get a connection pool with default options
$pool = new \AsyncMysqlConnectionPool(darray[]);
// Change credentials to something that works in order to test this code
return await $pool->connect(
CI::$host,
CI::$port,
CI::$db,
CI::$user,
CI::$passwd,
);
}
async function fetch_user_name(
\AsyncMysqlConnection $conn,
int $user_id,
): Awaitable<?string> {
// Your table and column may differ, of course
$result = await $conn->queryf(
'SELECT name from test_table WHERE userID = %d',
$user_id,
);
// There shouldn't be more than one row returned for one user id
invariant($result->numRows() === 1, 'one row exactly');
// A vector of vector objects holding the string values of each column
// in the query
$vector = $result->vectorRows();
return $vector[0][0]; // We had one column in our query
}
async function get_user_info(
\AsyncMysqlConnection $conn,
string $user,
): Awaitable<Vector<Map<string, ?string>>> {
$result = await $conn->queryf(
'SELECT * from test_table WHERE name = %s',
$conn->escapeString($user),
);
// A vector of map objects holding the string values of each column
// in the query, and the keys being the column names
$map = $result->mapRows();
return $map;
}
async function async_mysql_tutorial(): Awaitable<void> {
$conn = await get_connection();
if ($conn !== null) {
$result = await fetch_user_name($conn, 2);
\var_dump($result);
$info = await get_user_info($conn, 'Fred Emmott');
\var_dump($info is vec<_>);
\var_dump($info[0] is dict<_, _>);
}
}
<<__EntryPoint>>
function main(): void {
\HH\Asio\join(async_mysql_tutorial());
}
string(11) "Fred Emmott"
bool(true)
bool(true)
Connection Pools
The async MySQL extension does not support multiplexing; each concurrent query requires its own connection. However, the extension does support connection pooling.
The async MySQL extension provides a mechanism to pool connection objects so we don't have to create a new connection every time we
want to make a query. The class is AsyncMysqlConnectionPool
and one can be created like this:
require __DIR__.'/async_mysql_connect.inc.php';
use \Hack\UserDocumentation\AsyncOps\Extensions\Examples\AsyncMysql\ConnectionInfo as CI
;
function get_pool(): \AsyncMysqlConnectionPool {
return new \AsyncMysqlConnectionPool(
array('pool_connection_limit' => 100),
); // See API for more pool options
}
async function get_connection(): Awaitable<\AsyncMysqlConnection> {
$pool = get_pool();
$conn = await $pool->connect(
CI::$host,
CI::$port,
CI::$db,
CI::$user,
CI::$passwd,
);
return $conn;
}
async function run(): Awaitable<void> {
$conn = await get_connection();
\var_dump($conn);
}
<<__EntryPoint>>
function main(): void {
\HH\Asio\join(run());
}
object(AsyncMysqlConnection)#6 (0)
}
It is highly recommended that connection pools are used for MySQL connections; if for some reason we really need one, single asynchronous
client, there is an AsyncMysqlClient
class that provides a
connect
method.
MCRouter
MCRouter is a memcached protocol-routing library. To help with memcached deployment, it provides features such as connection pooling and prefix-based routing.
The async MCRouter extension is basically an async subset of the Memcached extension that is part of HHVM. The primary class is
MCRouter
. There are two ways to create an instance of an MCRouter object. The
createSimple
method takes a vector of server addresses where memcached is running. The
more configurable __construct
method allows for more option tweaking. After getting an object,
we can use the async
versions of the core memcached protocol methods like add
,
get
and del
.
Here is a simple example showing how one might get a user name from memcached:
require __DIR__."/../../../../vendor/hh_autoload.php"; // For wrap()
function get_mcrouter_object(): \MCRouter {
$servers = Vector {\getenv('HHVM_TEST_MCROUTER')};
$mc = \MCRouter::createSimple($servers);
return $mc;
}
async function add_user_name(
\MCRouter $mcr,
int $id,
string $value,
): Awaitable<void> {
$key = 'name:'.$id;
await $mcr->set($key, $value);
}
async function get_user_name(\MCRouter $mcr, int $user_id): Awaitable<string> {
$key = 'name:'.$user_id;
try {
$res = await \HH\Asio\wrap($mcr->get($key));
if ($res->isSucceeded()) {
return $res->getResult();
}
return "";
} catch (\MCRouterException $ex) {
echo $ex->getKey().\PHP_EOL.$ex->getOp();
return "";
}
}
async function run(): Awaitable<void> {
$mcr = get_mcrouter_object();
await add_user_name($mcr, 1, 'Joel');
$name = await get_user_name($mcr, 1);
\var_dump($name); // Should print "Joel"
}
<<__EntryPoint>>
function main(): void {
\HH\Asio\join(run());
}
string(4) "Joel"
If an issue occurs when using this protocol, two possible exceptions can be thrown: MCRouterException
when something goes wrong with
a core option, like deleting a key; MCRouterOptionException
when the provide option list can't be parsed.
cURL
Hack currently provides two async functions for cURL.
curl_multi_await
cURL provides a data transfer library for URLs. The async cURL extension provides two functions, one of which is a wrapper around the
other. curl_multi_await
is the async version of HHVM's curl_multi_select
. It waits until there is activity on the cURL handle and
when it completes, we use curl_multi_exec
to process the result, just as we would in the non-async situation.
async function curl_multi_await(resource $mh, float $timeout = 1.0): Awaitable<int>;
curl_exec
The function HH\Asio\curl_exec
is a wrapper around curl_multi_await
. It is easy to use as we don't necessarily have to worry about
resource creation since we can just pass a string URL to it.
namespace HH\Asio {
async function curl_exec(mixed $urlOrHandle): Awaitable<string>;
}
Here is an example of getting a vector of URL contents, using a lambda expression to cut down on the code verbosity that would come with full closure syntax:
function get_urls(): vec<string> {
return vec[
"http://example.com",
"http://example.net",
"http://example.org",
];
}
async function get_combined_contents(
vec<string> $urls,
): Awaitable<vec<string>> {
// Use lambda shorthand syntax here instead of full closure syntax
$handles = \HH\Lib\Vec\map_with_key(
$urls,
($idx, $url) ==> \HH\Asio\curl_exec($url),
);
$contents = await \HH\Lib\Vec\from_async($handles);
echo \HH\Lib\C\count($contents)."\n";
return $contents;
}
<<__EntryPoint>>
function main(): void {
\HH\Asio\join(get_combined_contents(get_urls()));
}
3
Streams
The async stream extension has one function, stream_await
, which is functionally similar
to HHVM's stream_select
. It waits for a stream to enter a state (e.g.,
STREAM_AWAIT_READY
), but without the multiplexing functionality of stream_select
. We
can use HH\Lib\Vec\from_async to await multiple stream handles, but the resulting combined awaitable won't be complete
until all of the underlying streams have completed.
async function stream_await(resource $fp, int $events, float $timeout = 0.0): Awaitable<int>;
The following example shows how to use stream_await
to write to resources:
function get_resources(): vec<resource> {
$r1 = \fopen('php://stdout', 'w');
$r2 = \fopen('php://stdout', 'w');
$r3 = \fopen('php://stdout', 'w');
return vec[$r1, $r2, $r3];
}
async function write_all(vec<resource> $resources): Awaitable<void> {
$write_single_resource = async function(resource $r) {
$status = await \stream_await($r, \STREAM_AWAIT_WRITE, 1.0);
if ($status === \STREAM_AWAIT_READY) {
\fwrite($r, \str_shuffle('ABCDEF').\PHP_EOL);
}
};
// You will get 3 shuffled strings, each on a separate line.
await Vec\from_async(\array_map($write_single_resource, $resources));
}
<<__EntryPoint>>
function main(): void {
\HH\Asio\join(write_all(get_resources()));
}
CFEADB
EDBACF
CBDEFA