zmq笔记四: tcp的connect操作
int major, minor, patch;
zmq_version(&major, &minor, &patch); //4.2.0
本文主要是分析代码,方便自己日后查阅.
=========================================
本文以REQ/REP为例,分析一下tcp的connect的实现过程.
void *context = zmq_ctx_new();
void *requester = zmq_socket(context, ZMQ_REQ);
zmq_connect(requester, "tcp://localhost:6666"); //如果是进程间通信,改为"ipc://xxx"
int zmq_connect (void *s_, const char *addr_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->connect (addr_);
return result;
}
socket_base_t:connect(addr_)一个函数就能完成ipc,tcp,udp,inproc等等常用的进程通信的连接工作,本文只针对tcp连接进行分析.
进入connect()函数后,首先对连接类型(tcp)和目标地址(localhost:6666)进行合法性解析,不同的连接类型,目标地址有不同的格式.tcp连接必须要有ip和端口.
int zmq::socket_base_t::connect (const char *addr_)
{
ENTER_MUTEX ();
// Process pending commands, if any.
int rc = process_commands (0, false); //不忘先处理一下命令队列..
if (unlikely (rc != 0)) {
EXIT_MUTEX ();
return -1;
}
// Parse addr_ string.
std::string protocol;
std::string address;
if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
EXIT_MUTEX ();
return -1;
}
....
bool is_single_connect = (options.type == ZMQ_DEALER ||
options.type == ZMQ_SUB ||
options.type == ZMQ_REQ); //这三种类型的socket只能有一个连接
if (unlikely (is_single_connect)) {
const endpoints_t::iterator it = endpoints.find (addr_);
if (it != endpoints.end ()) {
EXIT_MUTEX ();
return 0;
}
}
// Choose the I/O thread to run the session in.
io_thread_t *io_thread = choose_io_thread (options.affinity); //查找负载最小的I/O线程
if (!io_thread) {
errno = EMTHREAD;
EXIT_MUTEX ();
return -1;
}
// Create session. 创建session对象,以后发送给它的命令消息就会放进这个I/O线程的邮箱里
session_base_t *session = session_base_t::create (io_thread, true, this,
options, paddr);
errno_assert (session);
// PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe. (same for NORM, currently?)
bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp";
pipe_t *newpipe = NULL;
if (options.immediate != 1 || subscribe_to_all) {
// Create a bi-directional pipe.
object_t *parents [2] = {this, session};
pipe_t *new_pipes [2] = {NULL, NULL};
bool conflate = options.conflate &&
(options.type == ZMQ_DEALER ||
options.type == ZMQ_PULL ||
options.type == ZMQ_PUSH ||
options.type == ZMQ_PUB ||
options.type == ZMQ_SUB);
int hwms [2] = {conflate? -1 : options.sndhwm,
conflate? -1 : options.rcvhwm};
bool conflates [2] = {conflate, conflate};
rc = pipepair (parents, new_pipes, hwms, conflates); //创建一对双向"管道",一个pipe_t对象有两个ypipe_t,分别是作为inpipe/outpipe队列,其中inpipe->read,outpipe->write
errno_assert (rc == 0);
// Attach local end of the pipe to the socket object.
attach_pipe (new_pipes [0], subscribe_to_all); //第一个pipe_t放进socket对象的pipes集合里
newpipe = new_pipes [0];
// Attach remote end of the pipe to the session object later on.
session->attach_pipe (new_pipes [1]); //第二个pipe_t放到session的pipes集合里,注意,new_pipes的两个pipe_t在pipepairs()生成时已经互为peer
}
// Save last endpoint URI
paddr->to_string (last_endpoint);
add_endpoint (addr_, (own_t *) session, newpipe); //开始进行connect
EXIT_MUTEX ();
return 0;
}
首先来看一下attach_pipe()做了什么: (socket和session各有不同的attach_pipe()函数,但功能差不多)
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// First, register the pipe so that we can terminate it later on.
pipe_->set_event_sink (this); //只会被设置一次,当这个pipe有消息要处理时,实际上是由这个this对象来处理的.
pipes.push_back (pipe_);
// Let the derived socket type know about new pipe.
xattach_pipe (pipe_, subscribe_to_all_); //还会加入到socket的fair-queue和load-balance-queue,这个先不在这里分析
// If the socket is already being closed, ask any new pipes to terminate
// straight away.
if (is_terminating ()) {
register_term_acks (1);
pipe_->terminate (false);
}
}
再看下add_endpoint (addr_, (own_t *) session, newpipe);
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
{
// Activate the session. Make it a child of this socket.
launch_child (endpoint_); //激活session,把它加入到socket的owned集合
endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe))); //当前endpoints表示socket包含所有对端的信息,endpoints是以目标地址为key的multimap
}
void zmq::own_t::launch_child (own_t *object_)
{
// Specify the owner of the object.
object_->set_owner (this);
// Plug the object into the I/O thread.
send_plug (object_); //把session对象plug到socket,给socket一个plug类型的命令消息,目标对象是session
// Take ownership of the object.
send_own (this, object_);
}
void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::plug;
send_command (cmd);
}
笔记三简略介绍了一下mailbox和命令消息队列,在这里看一下实际的消息发送过程.实际上继承自object_t的类,object_t类实现了各种send_xxx函数,封装好了发送特定类型的命令消息.最基本的命令消息,必须包括destination和type.消息发送的调用过程如下:
void zmq::object_t::send_command (command_t &cmd_)
{
ctx->send_command (cmd_.destination->get_tid (), cmd_); //请注意这里的tid
}
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
slots [tid_]->send (command_);
}
void zmq::mailbox_t::send (const command_t &cmd_)
{
sync.lock ();
cpipe.write (cmd_, false);
const bool ok = cpipe.flush ();
sync.unlock ();
if (!ok)
signaler.send ();
}
inline void write (const T &value_, bool incomplete_) //ypipe_t
{
// Place the value to the queue, add new terminator element.
queue.back () = value_;
queue.push ();
// Move the "flush up to here" poiter.
if (!incomplete_)
f = &queue.back ();
}
session的get_tid()返回的其实就是创建session时选择的I/O thread的tid. 回顾一下笔记一的create_socket, context创建的I/O线程都有一个tid, 并且tid是作为context->slot邮箱管理数组的下标. session的基类object_t的构造函数:
zmq::object_t::object_t (object_t *parent_) : //parent正是I/O thread
ctx (parent_->ctx),
tid (parent_->tid)
{
}
综上所述,socket对象调用launch_child(p)时,其实就是把p放进socket的owned集合,把p打包成plug命令消息,并发送到它的I/O线程里处理这个消息.
通过笔记二,三可知道,I/O thread的邮箱有消息处理时,是通过邮箱个fd通知的,而这个fd刚好就是mailbox的signaler的r句柄,也就是说, I/O thread的轮询select会在mailbox->send()的siangler->send()之后激活邮箱消息可读.消息读出来后,经过void zmq::object_t::process_command (command_t &cmd_),这是由cmd.destination.process_command (cmd)调用的,所以处理函数还是根据destination来定义:
void zmq::object_t::process_command (command_t &cmd_)
{
switch (cmd_.type) {
......
case command_t::plug:
process_plug ();
process_seqnum ();
break;
......
}
void zmq::session_base_t::process_plug ()
{
if (active)
start_connecting (false);
}
void zmq::session_base_t::start_connecting (bool wait_)
{
zmq_assert (active);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity); //到这里,上一个plug消息算完成了,新建connecter对象相当于一个新的消息需求.首先寻找一个负载小的I/O线程.
zmq_assert (io_thread);
// Create the connecter object.
if (addr->protocol == "tcp") {
if (!options.socks_proxy_address.empty()) {
......
}
else {
tcp_connecter_t *connecter = new (std::nothrow)
tcp_connecter_t (io_thread, this, options, addr, wait_);
alloc_assert (connecter);
launch_child (connecter); //这一次launch_child调用的是在session对象里调用,过程和上面一样,命令消息发送到io_thread的邮箱去了
}
return;
}
......
}
当I/O thread收到处理消息时,调用的是tcp_connecter_t的函数了:
void zmq::tcp_connecter_t::process_plug ()
{
if (delayed_start)
add_reconnect_timer ();
else
start_connecting ();
}
当delayed_start为true时,只是加了个timer延迟connect操作,最终还是调用start_connecting ():
void zmq::tcp_connecter_t::timer_event (int id_)
{
zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
if (id_ == connect_timer_id) { //connecter的timer只有两个timer id,
connect_timer_started = false;
rm_fd (handle);
handle_valid = false;
close ();
add_reconnect_timer ();
}
else if (id_ == reconnect_timer_id) {
reconnect_timer_started = false;
start_connecting (); //最终的调用还是这个函数入口
}
}
在这里必须先说明非阻塞connect()如何完成三次握手的问题:
//////////以下这段文字摘自http://kenby.iteye.com/blog/1183579//////////
步骤1: 设置非阻塞,启动连接
实现非阻塞 connect ,首先把 sockfd 设置成非阻塞的。这样调用
connect 可以立刻返回,根据返回值和 errno 处理三种情况:
(1) 如果返回 0,表示 connect 成功。
(2) 如果返回值小于 0, errno 为 EINPROGRESS, 表示连接
建立已经启动但是尚未完成。这是期望的结果,不是真正的错误。
(3) 如果返回值小于0,errno 不是 EINPROGRESS,则连接出错了。
步骤2:判断可读和可写
然后把 sockfd 加入 select 的读写监听集合,通过 select 判断 sockfd
是否可写,处理三种情况:
(1) 如果连接建立好了,对方没有数据到达,那么 sockfd 是可写的
(2) 如果在 select 之前,连接就建立好了,而且对方的数据已到达,
那么 sockfd 是可读和可写的。
(3) 如果连接发生错误,sockfd 也是可读和可写的。
判断 connect 是否成功,就得区别 (2) 和 (3),这两种情况下 sockfd 都是
可读和可写的,区分的方法是,调用 getsockopt 检查是否出错。
步骤3:使用 getsockopt 函数检查错误
getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len)
在 sockfd 都是可读和可写的情况下,我们使用 getsockopt 来检查连接
是否出错。但这里有一个可移植性的问题。
如果发生错误,getsockopt 源自 Berkeley 的实现将在变量 error 中
返回错误,getsockopt 本身返回0;然而 Solaris 却让 getsockopt 返回 -1,
并把错误保存在 errno 变量中。所以在判断是否有错误的时候,要处理
这两种情况。
//////////以上这段文字摘自http://kenby.iteye.com/blog/1183579//////////
现在来看下start_connecting()到底做了什么:
void zmq::tcp_connecter_t::start_connecting ()
{
// Open the connecting socket.
const int rc = open ();
// Connect may succeed in synchronous manner.
if (rc == 0) { //条件1
handle = add_fd (s);
handle_valid = true;
out_event ();
}
// Connection establishment may be delayed. Poll for its completion.
else
if (rc == -1 && errno == EINPROGRESS) { //条件2
handle = add_fd (s);
handle_valid = true;
set_pollout (handle);
socket->event_connect_delayed (endpoint, zmq_errno());
// add userspace connect timeout
add_connect_timer ();
}
// Handle any other error condition by eventual reconnect.
else { //条件3
if (s != retired_fd)
close ();
add_reconnect_timer ();
}
}
open()函数的主要工作是创建新套接字句柄s,并设置为noblock,然后调用 ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ()); 由于是非阻塞的,所以connect()调用立即返回-1,并且设置errno错误代码为EINPROGRESS表示连接操作还在进行中,而同时三次握手还是在进行中的,握手是否完成可以在poller的select()调用里知道结果.
#ifdef ZMQ_HAVE_WINDOWS
const int last_error = WSAGetLastError();
if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
errno = EINPROGRESS;
else
errno = wsa_error_to_errno (last_error);
#else
if (errno == EINTR)
errno = EINPROGRESS;
#endif
假如client发起连接时,对端还没启动listen.那么进入start_connecting()的条件2,把s加入到connecter的poller里error( add_fd(s)只是加入到error集合,见笔记二)和write的fd集合,s还没完成三次握手.如果options.connect_timeout >0的话,再给它加一个connect_timer_id的timer. 然后等待I/O线程poller轮询select(). 由于对端还没有listen,套接字s会发生错误,导致触发tcp_connecter_t:in_event()(如果是连接成功,则触发tcp_connecter_t:out_event()). 然而对于tcp_connecter_t来说,in_event()调用的还是out_event(), 所以s的可写或出错都是会调用同一个函数.
void zmq::tcp_connecter_t::out_event ()
{
if (connect_timer_started) { //如果存在connect timer就去掉
cancel_timer (connect_timer_id);
connect_timer_started = false;
}
rm_fd (handle); //从poller里去掉s
handle_valid = false;
const fd_t fd = connect (); //查看s的状态,判断三次握手是否成功,返回适当的fd值
// Handle the error condition by attempt to reconnect.
if (fd == retired_fd) { //从这次结果看来s三次握手失败了
close ();//关闭套接字s
add_reconnect_timer (); //并加一个reconnect timer
return;
}
//到达这里说明s三次握手成功了,连接完成
tune_tcp_socket (fd);
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
tune_tcp_maxrt (fd, options.tcp_maxrt);
// remember our fd for ZMQ_SRCFD in messages
socket->set_fd (fd);
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow)
stream_engine_t (fd, options, endpoint);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
send_attach (session, engine);
// Shut the connecter down.
terminate ();
socket->event_connected (endpoint, (int) fd);
}
三次握手是否成功是由tcp_connecter_t::connect ()判断并返回fd,如果成功了,就执行后面的代码; 如果失败就加一个重连的timer,这个timer的处理函数上文已经给出了,最终还是调用start_connecting (),不断循环,直到连接成功为止.