吃水不忘挖井人,thanks to
Eric Don
示例代码如下:
class AnsycQuery
{
public $connection_timout = 3;
public function __construct($worlds)
{
$this->_batch_connections = ['logdb'=>collect()];
//初始化连接实例
foreach ($worlds as $world) {
if (!$world->status)
{
//未启用的world不允许实时查询
continue;
}
$logdb_connection = $world->getLogDBConnection()->getDB();
$logdb_config = [
'host' => $logdb_connection->getConfig('host'),
'port' => $logdb_connection->getConfig('port'),
'user' => $logdb_connection->getConfig('username'),
'password' => $logdb_connection->getConfig('password'),
'dbname' => $logdb_connection->getConfig('database')
];
$retry=0;
while(++$retry)
{
try {
$logdb = mysqli_init();
$logdb->options(MYSQLI_OPT_CONNECT_TIMEOUT, $this->connection_timout);
$logdb->real_connect($logdb_config['host'], $logdb_config['user'], $logdb_config['password'], $logdb_config['dbname'], $logdb_config['port']);
$logdb->query("set names utf8");
$this->_batch_connections['logdb']->put($world->id,$logdb);
break;
}catch (\Exception $e)
{
if($retry>5)
{
\App::abort(500,"服务器:".$world->id."的logdb数据库连接超时({$this->connection_timout}秒*5次尝试)");
}
}
}
}
return $this->_batch_connections;
}
public function query($from="gamedb", $sql)
{
switch ($from)
{
case "logdb":
$connections = $this->_batch_connections['logdb'];
break;
default:
return false;
break;
}
foreach ($connections as $world_id=>$connection)
{
$replaced_sql = str_replace('{world_id}', $world_id, $sql);
//文档地址:http://php.net/manual/zh/mysqli.query.php
$connection->query($replaced_sql, MYSQLI_ASYNC);
}
$processed = 0;
$ret = collect();
//开始执行并行查询
do {
$links = $errors = $reject = array();
foreach ($connections as $link) {
$links[] = $errors[] = $reject[] = $link;
}
if (!mysqli_poll($links, $errors, $reject, 1)) {
continue;
}
foreach ($links as $link) {
if($result = $link->reap_async_query()) {
if(is_object($result)){
while($row = $result->fetch_object())
{
$ret->push($row);
}
mysqli_free_result($result);
}else if (false !== $result) {
//据文档说,insert, delete之类的,会在这里返回
$ret->push($result);
} else {
throw new \mysqli_sql_exception(mysqli_error($link));
}
}
$processed++;
}
} while ($processed < $connections->count());
return $ret;
}
}
注意:并行批量查询,需要服务器的php安装mysqlnd扩展,ubuntu下执行apt-get install php5-mysqlnd,然后执行php5enmod mysqlnd,mysqli也是需要开启的
由于是并行查询,每次返回结果顺序可能都会有差异。