ZeroMQ和non-ZMQ程序之间使用tcp通讯
当使用 tcp:// transport
地址时,ZeroMQ 的 ZMQ_STREAM socket 可以用于在 non-ZMQ socket 发送和接收TCP数据。一个 ZMQ_STREAM socket 能作为客户端和/或服务器,异步的发送和/或接收TCP数据。
- 当接收TCP数据时,ZMQ_STREAM socket 会在源消息之前预添加一个包含源端身份认证(identity )消息。收到的消息是以公平队列的形式发出的。
- 当发送TCP数据时,ZMQ_STREAM socket 会将包含源端身份认证(identity )消息移除。当消息不能被发送时将会触发EHOSTUNREACH或EAGAIN错误。
- 要关闭一个指定的客户端连接,作为服务器,发送身份标识帧后跟一个长度为零的消息即可(见例部分)。
ZMQ_MSGMORE
标识在数据帧是被忽略的。你必须发送一个标识帧后面接一个数据帧。另外,请注意,省略ZMQ_MSGMORE
标志将防止同一socket上发送更多的数据(从任何客户端)。
示例
这是一个使用 ZMQ_STREAM
创建简单的HTTP服务器的示例:
int rc = 0;
void *context = zmq_ctx_new();
if (context != NULL)
{
void *socket = zmq_socket(context, ZMQ_STREAM);
if (socket != NULL)
{
rc = zmq_bind (socket, "tcp://*:5555");
//assert (rc == 0);
zmq_pollitem_t items[1];
/* First item refers to 0MQ socket 'socket'*/
items[0].socket = socket;
items[0].events = ZMQ_POLLIN;
do
{
/* Returned events will be stored in items[].revents */
/* Poll for events indefinitely */
rc = zmq_poll(items, 1, -1);
//ssert (rc >= 0);
if ((items[0].revents & ZMQ_POLLIN) == ZMQ_POLLIN)
{
/* Get HTTP request; ID frame and then request */
uint8_t id[256];
size_t id_size = 256;
id_size = zmq_recv (socket, id, 256, 0);
zmq_msg_t msg;
rc = zmq_msg_init(&msg);
rc = zmq_recvmsg(socket, &msg, 0);
char* data = (char*)zmq_msg_data(&msg);
size_t size = zmq_msg_size(&msg);
qDebug() << data;
qDebug() << size;
char http_response [] =
"HTTP/1.0 200 OK\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"Hello, World!";
zmq_send(socket, id, id_size, ZMQ_SNDMORE);
zmq_send(socket, http_response, strlen(http_response), ZMQ_SNDMORE);
/* Closes the connection by sending the ID frame followed by a zero response */
zmq_send (socket, id, id_size, ZMQ_SNDMORE);
zmq_send(socket, 0, 0, ZMQ_SNDMORE);
/* NOTE: If we don't use ZMQ_SNDMORE, then we won't be able to send more */
/* message to any client */
rc = zmq_msg_close(&msg);
}
}
while (1);
rc = zmq_close(socket);
socket = NULL;
}
rc = zmq_ctx_term(context);
context = NULL;
}
1