php使用mysqlnd扩展进行mysql异步并行查询

吃水不忘挖井人,thanks toEric Don

示例代码如下:

class AnsycQuery
{
    public $connection_timout = 3;

    public function __construct($worlds)
    {
        $this->_batch_connections = ['logdb'=>collect()];
        //初始化连接实例
        foreach ($worlds as $world) {
            if (!$world->status)
            {
                //未启用的world不允许实时查询
                continue;
            }
            $logdb_connection = $world->getLogDBConnection()->getDB();
            $logdb_config = [
                'host' => $logdb_connection->getConfig('host'),
                'port' => $logdb_connection->getConfig('port'),
                'user' => $logdb_connection->getConfig('username'),
                'password' => $logdb_connection->getConfig('password'),
                'dbname' => $logdb_connection->getConfig('database')
            ];
            $retry=0;
            while(++$retry)
            {
                try {
                    $logdb = mysqli_init();
                    $logdb->options(MYSQLI_OPT_CONNECT_TIMEOUT, $this->connection_timout);
                    $logdb->real_connect($logdb_config['host'], $logdb_config['user'], $logdb_config['password'], $logdb_config['dbname'], $logdb_config['port']);
                    $logdb->query("set names utf8");
                    $this->_batch_connections['logdb']->put($world->id,$logdb);
                    break;
                }catch (\Exception $e)
                {
                    if($retry>5)
                    {
                        \App::abort(500,"服务器:".$world->id."的logdb数据库连接超时({$this->connection_timout}秒*5次尝试)");
                    }
                }
            }

        }
        return $this->_batch_connections;
    }

    public function query($from="gamedb", $sql)
    {
        switch ($from)
        {
            case "logdb":
                $connections = $this->_batch_connections['logdb'];
                break;
            default:
                return false;
                break;
        }

        foreach ($connections as $world_id=>$connection)
        {
            $replaced_sql = str_replace('{world_id}', $world_id, $sql);
            //文档地址:http://php.net/manual/zh/mysqli.query.php
            $connection->query($replaced_sql, MYSQLI_ASYNC);
        }

        $processed = 0;
        $ret = collect();
        //开始执行并行查询
        do {
            $links = $errors = $reject = array();
            foreach ($connections as $link) {
                $links[] = $errors[] = $reject[] = $link;
            }
            if (!mysqli_poll($links, $errors, $reject, 1)) {
                continue;
            }
            foreach ($links as $link) {
                if($result = $link->reap_async_query()) {
                    if(is_object($result)){
                        while($row = $result->fetch_object())
                        {
                            $ret->push($row);
                        }
                        mysqli_free_result($result);
                    }else if (false !== $result) {
                        //据文档说,insert, delete之类的,会在这里返回
                        $ret->push($result);
                    } else {
                        throw new \mysqli_sql_exception(mysqli_error($link));
                    }
                }
                $processed++;
            }
        } while ($processed < $connections->count());
        return $ret;
    }
}

注意:并行批量查询,需要服务器的php安装mysqlnd扩展,ubuntu下执行apt-get install php5-mysqlnd,然后执行php5enmod mysqlnd,mysqli也是需要开启的

由于是并行查询,每次返回结果顺序可能都会有差异。

转载请注明原文地址:https://blog.keepchen.com/a/php-mysqlnd.html