之前有一段时间我很好奇,使用编程语言编写的诸如mysql、redis、memcached等服务的客户端组件是如何进行通讯的,直到前段时间有闲暇,分析了一下一个使用php编写的redis组件predis的源码,发现了一些好玩的东西。
- predis是如何与redis服务进行通讯的
predis的客户端类Client (~/predis/predis/src/Client.php)
连接
/**
* Creates single or aggregate connections from different types of arguments
* (string, array) or returns the passed argument if it is an instance of a
* class implementing Predis\Connection\ConnectionInterface.
*
* Accepted types for connection parameters are:
*
* - Instance of Predis\Connection\ConnectionInterface.
* - Instance of Predis\Connection\ParametersInterface.
* - Array
* - String
* - Callable
*
* @param mixed $parameters Connection parameters or connection instance.
*
* @throws \InvalidArgumentException
*
* @return ConnectionInterface
*/
protected function createConnection($parameters)
{
if ($parameters instanceof ConnectionInterface) {
return $parameters;
}
if ($parameters instanceof ParametersInterface || is_string($parameters)) {
return $this->options->connections->create($parameters);
}
if (is_array($parameters)) {
if (!isset($parameters[0])) {
return $this->options->connections->create($parameters);
}
$options = $this->options;
if ($options->defined('aggregate')) {
$initializer = $this->getConnectionInitializerWrapper($options->aggregate);
$connection = $initializer($parameters, $options);
} elseif ($options->defined('replication')) {
$replication = $options->replication;
if ($replication instanceof AggregateConnectionInterface) {
$connection = $replication;
$options->connections->aggregate($connection, $parameters);
} else {
$initializer = $this->getConnectionInitializerWrapper($replication);
$connection = $initializer($parameters, $options);
}
} else {
$connection = $options->cluster;
$options->connections->aggregate($connection, $parameters);
}
return $connection;
}
if (is_callable($parameters)) {
$initializer = $this->getConnectionInitializerWrapper($parameters);
$connection = $initializer($this->options);
return $connection;
}
throw new \InvalidArgumentException('Invalid type for connection parameters.');
}
连接工厂类 (~/predis/predis/src/Connection/Factory.php)
predis实现了以下连接方式进行redis连接
protected $schemes = array(
'tcp' => 'Predis\Connection\StreamConnection',
'unix' => 'Predis\Connection\StreamConnection',
'tls' => 'Predis\Connection\StreamConnection',
'redis' => 'Predis\Connection\StreamConnection',
'rediss' => 'Predis\Connection\StreamConnection',
'http' => 'Predis\Connection\WebdisConnection',
);
我们就选择其中一个进行分析:TCP方式
流连接处理类 (~/predis/predis/src/Connection/StreamConnection.php)
/**
* {@inheritdoc}
*/
protected function createResource()
{
switch ($this->parameters->scheme) {
case 'tcp':
case 'redis':
return $this->tcpStreamInitializer($this->parameters);
case 'unix':
return $this->unixStreamInitializer($this->parameters);
case 'tls':
case 'rediss':
return $this->tlsStreamInitializer($this->parameters);
default:
throw new \InvalidArgumentException("Invalid scheme: '{$this->parameters->scheme}'.");
}
}
...
/**
* Initializes a TCP stream resource.
*
* @param ParametersInterface $parameters Initialization parameters for the connection.
*
* @return resource
*/
protected function tcpStreamInitializer(ParametersInterface $parameters)
{
if (!filter_var($parameters->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) {
$address = "tcp://$parameters->host:$parameters->port";
} else {
$address = "tcp://[$parameters->host]:$parameters->port";
}
$flags = STREAM_CLIENT_CONNECT;
if (isset($parameters->async_connect) && $parameters->async_connect) {
$flags |= STREAM_CLIENT_ASYNC_CONNECT;
}
if (isset($parameters->persistent)) {
if (false !== $persistent = filter_var($parameters->persistent, FILTER_VALIDATE_BOOLEAN, FILTER_NULL_ON_FAILURE)) {
$flags |= STREAM_CLIENT_PERSISTENT;
if ($persistent === null) {
$address = "{$address}/{$parameters->persistent}";
}
}
}
$resource = $this->createStreamSocket($parameters, $address, $flags);
return $resource;
}
...
/**
* Creates a connected stream socket resource.
*
* @param ParametersInterface $parameters Connection parameters.
* @param string $address Address for stream_socket_client().
* @param int $flags Flags for stream_socket_client().
*
* @return resource
*/
protected function createStreamSocket(ParametersInterface $parameters, $address, $flags)
{
$timeout = (isset($parameters->timeout) ? (float) $parameters->timeout : 5.0);
if (!$resource = @stream_socket_client($address, $errno, $errstr, $timeout, $flags)) {
$this->onConnectionError(trim($errstr), $errno);
}
if (isset($parameters->read_write_timeout)) {
$rwtimeout = (float) $parameters->read_write_timeout;
$rwtimeout = $rwtimeout > 0 ? $rwtimeout : -1;
$timeoutSeconds = floor($rwtimeout);
$timeoutUSeconds = ($rwtimeout - $timeoutSeconds) * 1000000;
stream_set_timeout($resource, $timeoutSeconds, $timeoutUSeconds);
}
if (isset($parameters->tcp_nodelay) && function_exists('socket_import_stream')) {
$socket = socket_import_stream($resource);
socket_set_option($socket, SOL_TCP, TCP_NODELAY, (int) $parameters->tcp_nodelay);
}
return $resource;
}
读写数据
StreamConnection (~/predis/predis/src/Connection/StreamConnection.php)
/**
* Performs a write operation over the stream of the buffer containing a
* command serialized with the Redis wire protocol.
*
* @param string $buffer Representation of a command in the Redis wire protocol.
*/
protected function write($buffer)
{
$socket = $this->getResource();
while (($length = strlen($buffer)) > 0) {
$written = @fwrite($socket, $buffer);
if ($length === $written) {
return;
}
if ($written === false || $written === 0) {
$this->onConnectionError('Error while writing bytes to the server.');
}
$buffer = substr($buffer, $written);
}
}
/**
* {@inheritdoc}
*/
public function read()
{
$socket = $this->getResource();
$chunk = fgets($socket);
if ($chunk === false || $chunk === '') {
$this->onConnectionError('Error while reading line from the server.');
}
$prefix = $chunk[0];
$payload = substr($chunk, 1, -2);
switch ($prefix) {
case '+':
return StatusResponse::get($payload);
case '$':
$size = (int) $payload;
if ($size === -1) {
return;
}
$bulkData = '';
$bytesLeft = ($size += 2);
do {
$chunk = fread($socket, min($bytesLeft, 4096));
if ($chunk === false || $chunk === '') {
$this->onConnectionError('Error while reading bytes from the server.');
}
$bulkData .= $chunk;
$bytesLeft = $size - strlen($bulkData);
} while ($bytesLeft > 0);
return substr($bulkData, 0, -2);
case '*':
$count = (int) $payload;
if ($count === -1) {
return;
}
$multibulk = array();
for ($i = 0; $i < $count; ++$i) {
$multibulk[$i] = $this->read();
}
return $multibulk;
case ':':
$integer = (int) $payload;
return $integer == $payload ? $integer : $payload;
case '-':
return new ErrorResponse($payload);
default:
$this->onProtocolError("Unknown response prefix: '$prefix'.");
return;
}
}
在读取和写入方法中,发现predis是从stream流中读取和写入数据的,在读取方法中,能看到从stream流中读取的最先进行了前缀分析,我想应该是和redis的通讯协议有关,因此,先了解一下redis的通讯协议。
1.协议简介
Redis的客户端与服务端采用一种叫做 RESP(REdis Serialization Protocol) 的网络通信协议交换数据。 这种协议采用明文传输,易读也易解析。Redis客户端采用此协议格式来对服务端发送不同的命令,服务端会根据具体的操作而返回具体的答复。客户端和服务端采用的是简单的请求-响应模型进行通信的。
2.协议格式
协议的第一个字符就表示当前包的类型,数据是以\r\n进行换行的,现在有如下几种类型:
2.1.状态消息(+),一般表示正确的状态消息。字符后面是具体消息。例如操作OK对应的消息格式为:+OK\r\n
2.2.错误消息(-),一般表示操作出错消息。字符后面则是消息内容。 例如一个key不存在对应的消息格式为:-No such key\r\n
2.3.整数(:),表示数字。例如strlen命令的操作返回则是此种类型。例如key为a的字符串值为abc,则strlen命令返回的消息格式为::3\r\n
2.4.字符串($),表示字符串。例如key为a的字符串值为abc,则get a命令返回的是 $3\r\nabc\r\n,字符后面的数字表示字符串长度,后面则是数据内容
2.5.批量字符串(),表示多个字符串。例如key为a的字符串值为abc,key为b的字符串值为xyz,则 mget a b 命令返回的消息格式是:2\r\n$3\r\nabc\r\n$3\r\nxyz\r\n。字符后面的数字表示后面有多少个字符串。
3.命令测试
set name admin Request: *3\r\n $3\r\n set\r\n $4\r\n name\r\n $5\r\n admin\r\n
Response: +OK\r\n
get name Request: *2\r\n $3\r\n get\r\n $4\r\n name\r\n
Response $5\r\n admin\r\n
get age Request: *2\r\n $3\r\n get\r\n $3\r\n age\r\n
Response: :-1\r\n
StreamConnection (~/predis/predis/src/Connection/SteamConnection.php)
/**
* {@inheritdoc}
*/
public function writeRequest(CommandInterface $command)
{
$commandID = $command->getId();
$arguments = $command->getArguments();
$cmdlen = strlen($commandID);
$reqlen = count($arguments) + 1;
$buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandID}\r\n";
foreach ($arguments as $argument) {
$arglen = strlen($argument);
$buffer .= "\${$arglen}\r\n{$argument}\r\n";
}
$this->write($buffer);
}
以上方法则是对操作命令进行拼接,并向连接句柄中写入请求命令。
总结
通过以上一系列的代码分析,再结合协议分析文章,最终了解到了通过编程语言实现(其他语言类似)与redis服务的交互流程:
1.使用如tcp等连接方式连接到redis服务
2.向连接句柄中写入/读取数据
3.解析数据
predis项目github地址:nrk/predis