从predis组件开始了解如何与redis服务交互

之前有一段时间我很好奇,使用编程语言编写的诸如mysql、redis、memcached等服务的客户端组件是如何进行通讯的,直到前段时间有闲暇,分析了一下一个使用php编写的redis组件predis的源码,发现了一些好玩的东西。

  • predis是如何与redis服务进行通讯的

predis的客户端类Client (~/predis/predis/src/Client.php)

连接

    /**
     * Creates single or aggregate connections from different types of arguments
     * (string, array) or returns the passed argument if it is an instance of a
     * class implementing Predis\Connection\ConnectionInterface.
     *
     * Accepted types for connection parameters are:
     *
     *  - Instance of Predis\Connection\ConnectionInterface.
     *  - Instance of Predis\Connection\ParametersInterface.
     *  - Array
     *  - String
     *  - Callable
     *
     * @param mixed $parameters Connection parameters or connection instance.
     *
     * @throws \InvalidArgumentException
     *
     * @return ConnectionInterface
     */
    protected function createConnection($parameters)
    {
        if ($parameters instanceof ConnectionInterface) {
            return $parameters;
        }
        if ($parameters instanceof ParametersInterface || is_string($parameters)) {
            return $this->options->connections->create($parameters);
        }
        if (is_array($parameters)) {
            if (!isset($parameters[0])) {
                return $this->options->connections->create($parameters);
            }
            $options = $this->options;
            if ($options->defined('aggregate')) {
                $initializer = $this->getConnectionInitializerWrapper($options->aggregate);
                $connection = $initializer($parameters, $options);
            } elseif ($options->defined('replication')) {
                $replication = $options->replication;
                if ($replication instanceof AggregateConnectionInterface) {
                    $connection = $replication;
                    $options->connections->aggregate($connection, $parameters);
                } else {
                    $initializer = $this->getConnectionInitializerWrapper($replication);
                    $connection = $initializer($parameters, $options);
                }
            } else {
                $connection = $options->cluster;
                $options->connections->aggregate($connection, $parameters);
            }
            return $connection;
        }
        if (is_callable($parameters)) {
            $initializer = $this->getConnectionInitializerWrapper($parameters);
            $connection = $initializer($this->options);
            return $connection;
        }
        throw new \InvalidArgumentException('Invalid type for connection parameters.');
    }

连接工厂类 (~/predis/predis/src/Connection/Factory.php)

predis实现了以下连接方式进行redis连接

protected $schemes = array(
        'tcp' => 'Predis\Connection\StreamConnection',
        'unix' => 'Predis\Connection\StreamConnection',
        'tls' => 'Predis\Connection\StreamConnection',
        'redis' => 'Predis\Connection\StreamConnection',
        'rediss' => 'Predis\Connection\StreamConnection',
        'http' => 'Predis\Connection\WebdisConnection',
    );

我们就选择其中一个进行分析:TCP方式

流连接处理类 (~/predis/predis/src/Connection/StreamConnection.php)

/**
 * {@inheritdoc}
 */
protected function createResource()
{
    switch ($this->parameters->scheme) {
        case 'tcp':
        case 'redis':
            return $this->tcpStreamInitializer($this->parameters);
        case 'unix':
            return $this->unixStreamInitializer($this->parameters);
        case 'tls':
        case 'rediss':
            return $this->tlsStreamInitializer($this->parameters);
        default:
            throw new \InvalidArgumentException("Invalid scheme: '{$this->parameters->scheme}'.");
    }
}

...


    /**
     * Initializes a TCP stream resource.
     *
     * @param ParametersInterface $parameters Initialization parameters for the connection.
     *
     * @return resource
     */
    protected function tcpStreamInitializer(ParametersInterface $parameters)
    {
        if (!filter_var($parameters->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) {
            $address = "tcp://$parameters->host:$parameters->port";
        } else {
            $address = "tcp://[$parameters->host]:$parameters->port";
        }
        $flags = STREAM_CLIENT_CONNECT;
        if (isset($parameters->async_connect) && $parameters->async_connect) {
            $flags |= STREAM_CLIENT_ASYNC_CONNECT;
        }
        if (isset($parameters->persistent)) {
            if (false !== $persistent = filter_var($parameters->persistent, FILTER_VALIDATE_BOOLEAN, FILTER_NULL_ON_FAILURE)) {
                $flags |= STREAM_CLIENT_PERSISTENT;
                if ($persistent === null) {
                    $address = "{$address}/{$parameters->persistent}";
                }
            }
        }
        $resource = $this->createStreamSocket($parameters, $address, $flags);
        return $resource;
    }

...


    /**
     * Creates a connected stream socket resource.
     *
     * @param ParametersInterface $parameters Connection parameters.
     * @param string              $address    Address for stream_socket_client().
     * @param int                 $flags      Flags for stream_socket_client().
     *
     * @return resource
     */
    protected function createStreamSocket(ParametersInterface $parameters, $address, $flags)
    {
        $timeout = (isset($parameters->timeout) ? (float) $parameters->timeout : 5.0);
        if (!$resource = @stream_socket_client($address, $errno, $errstr, $timeout, $flags)) {
            $this->onConnectionError(trim($errstr), $errno);
        }
        if (isset($parameters->read_write_timeout)) {
            $rwtimeout = (float) $parameters->read_write_timeout;
            $rwtimeout = $rwtimeout > 0 ? $rwtimeout : -1;
            $timeoutSeconds = floor($rwtimeout);
            $timeoutUSeconds = ($rwtimeout - $timeoutSeconds) * 1000000;
            stream_set_timeout($resource, $timeoutSeconds, $timeoutUSeconds);
        }
        if (isset($parameters->tcp_nodelay) && function_exists('socket_import_stream')) {
            $socket = socket_import_stream($resource);
            socket_set_option($socket, SOL_TCP, TCP_NODELAY, (int) $parameters->tcp_nodelay);
        }
        return $resource;
    }

读写数据

StreamConnection (~/predis/predis/src/Connection/StreamConnection.php)

/**
     * Performs a write operation over the stream of the buffer containing a
     * command serialized with the Redis wire protocol.
     *
     * @param string $buffer Representation of a command in the Redis wire protocol.
     */
    protected function write($buffer)
    {
        $socket = $this->getResource();
        while (($length = strlen($buffer)) > 0) {
            $written = @fwrite($socket, $buffer);
            if ($length === $written) {
                return;
            }
            if ($written === false || $written === 0) {
                $this->onConnectionError('Error while writing bytes to the server.');
            }
            $buffer = substr($buffer, $written);
        }
    }
    /**
     * {@inheritdoc}
     */
    public function read()
    {
        $socket = $this->getResource();
        $chunk = fgets($socket);
        if ($chunk === false || $chunk === '') {
            $this->onConnectionError('Error while reading line from the server.');
        }
        $prefix = $chunk[0];
        $payload = substr($chunk, 1, -2);
        switch ($prefix) {
            case '+':
                return StatusResponse::get($payload);
            case '$':
                $size = (int) $payload;
                if ($size === -1) {
                    return;
                }
                $bulkData = '';
                $bytesLeft = ($size += 2);
                do {
                    $chunk = fread($socket, min($bytesLeft, 4096));
                    if ($chunk === false || $chunk === '') {
                        $this->onConnectionError('Error while reading bytes from the server.');
                    }
                    $bulkData .= $chunk;
                    $bytesLeft = $size - strlen($bulkData);
                } while ($bytesLeft > 0);
                return substr($bulkData, 0, -2);
            case '*':
                $count = (int) $payload;
                if ($count === -1) {
                    return;
                }
                $multibulk = array();
                for ($i = 0; $i < $count; ++$i) {
                    $multibulk[$i] = $this->read();
                }
                return $multibulk;
            case ':':
                $integer = (int) $payload;
                return $integer == $payload ? $integer : $payload;
            case '-':
                return new ErrorResponse($payload);
            default:
                $this->onProtocolError("Unknown response prefix: '$prefix'.");
                return;
        }
    }

在读取和写入方法中,发现predis是从stream流中读取和写入数据的,在读取方法中,能看到从stream流中读取的最先进行了前缀分析,我想应该是和redis的通讯协议有关,因此,先了解一下redis的通讯协议。


Redis 通讯协议分析

1.协议简介

Redis的客户端与服务端采用一种叫做 RESP(REdis Serialization Protocol) 的网络通信协议交换数据。 这种协议采用明文传输,易读也易解析。Redis客户端采用此协议格式来对服务端发送不同的命令,服务端会根据具体的操作而返回具体的答复。客户端和服务端采用的是简单的请求-响应模型进行通信的。

2.协议格式

协议的第一个字符就表示当前包的类型,数据是以\r\n进行换行的,现在有如下几种类型:

2.1.状态消息(+),一般表示正确的状态消息。字符后面是具体消息。例如操作OK对应的消息格式为:+OK\r\n

2.2.错误消息(-),一般表示操作出错消息。字符后面则是消息内容。 例如一个key不存在对应的消息格式为:-No such key\r\n

2.3.整数(:),表示数字。例如strlen命令的操作返回则是此种类型。例如key为a的字符串值为abc,则strlen命令返回的消息格式为::3\r\n

2.4.字符串($),表示字符串。例如key为a的字符串值为abc,则get a命令返回的是 $3\r\nabc\r\n,字符后面的数字表示字符串长度,后面则是数据内容

2.5.批量字符串(),表示多个字符串。例如key为a的字符串值为abc,key为b的字符串值为xyz,则 mget a b 命令返回的消息格式是:2\r\n$3\r\nabc\r\n$3\r\nxyz\r\n。字符后面的数字表示后面有多少个字符串。

3.命令测试

set name admin Request: *3\r\n $3\r\n set\r\n $4\r\n name\r\n $5\r\n admin\r\n

Response: +OK\r\n

get name Request: *2\r\n $3\r\n get\r\n $4\r\n name\r\n

Response $5\r\n admin\r\n

get age Request: *2\r\n $3\r\n get\r\n $3\r\n age\r\n

Response: :-1\r\n


StreamConnection (~/predis/predis/src/Connection/SteamConnection.php)

    /**
     * {@inheritdoc}
     */
    public function writeRequest(CommandInterface $command)
    {
        $commandID = $command->getId();
        $arguments = $command->getArguments();
        $cmdlen = strlen($commandID);
        $reqlen = count($arguments) + 1;
        $buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandID}\r\n";
        foreach ($arguments as $argument) {
            $arglen = strlen($argument);
            $buffer .= "\${$arglen}\r\n{$argument}\r\n";
        }
        $this->write($buffer);
    }

以上方法则是对操作命令进行拼接,并向连接句柄中写入请求命令。

总结

通过以上一系列的代码分析,再结合协议分析文章,最终了解到了通过编程语言实现(其他语言类似)与redis服务的交互流程:
1.使用如tcp等连接方式连接到redis服务
2.向连接句柄中写入/读取数据
3.解析数据

predis项目github地址:nrk/predis

转载请注明原文地址:https://blog.keepchen.com/a/predis-connect-to-redis.html