当使用 tcp:// transport 地址时,ZeroMQ 的 ZMQ_STREAM socket 可以用于在 non-ZMQ socket 发送和接收TCP数据。一个 ZMQ_STREAM socket 能作为客户端和/或服务器,异步的发送和/或接收TCP数据。

  • 当接收TCP数据时,ZMQ_STREAM socket 会在源消息之前预添加一个包含源端身份认证(identity )消息。收到的消息是以公平队列的形式发出的。
  • 当发送TCP数据时,ZMQ_STREAM socket 会将包含源端身份认证(identity )消息移除。当消息不能被发送时将会触发EHOSTUNREACH或EAGAIN错误。
  • 要关闭一个指定的客户端连接,作为服务器,发送身份标识帧后跟一个长度为零的消息即可(见例部分)。

ZMQ_MSGMORE 标识在数据帧是被忽略的。你必须发送一个标识帧后面接一个数据帧。另外,请注意,省略ZMQ_MSGMORE 标志将防止同一socket上发送更多的数据(从任何客户端)。

示例

这是一个使用 ZMQ_STREAM 创建简单的HTTP服务器的示例:

int rc = 0;
void *context = zmq_ctx_new();
if (context != NULL)
{
    void *socket = zmq_socket(context, ZMQ_STREAM);
    if (socket != NULL)
    {
        rc = zmq_bind (socket, "tcp://*:5555");
        //assert (rc == 0);
        zmq_pollitem_t items[1];
        /* First item refers to 0MQ socket 'socket'*/
        items[0].socket = socket;
        items[0].events = ZMQ_POLLIN;
        do
        {
            /* Returned events will be stored in items[].revents */
            /* Poll for events indefinitely */
            rc = zmq_poll(items, 1, -1);
            //ssert (rc >= 0);
            if ((items[0].revents & ZMQ_POLLIN) == ZMQ_POLLIN)
            {
                /* Get HTTP request; ID frame and then request */
                uint8_t id[256];
                size_t id_size = 256;
                id_size = zmq_recv (socket, id, 256, 0);
                zmq_msg_t msg;
                rc = zmq_msg_init(&msg);
                rc = zmq_recvmsg(socket, &msg, 0);
                char* data = (char*)zmq_msg_data(&msg);
                size_t size = zmq_msg_size(&msg);
                qDebug() << data;
                qDebug() << size;
                char http_response [] =
                    "HTTP/1.0 200 OK\r\n"
                    "Content-Type: text/plain\r\n"
                    "\r\n"
                    "Hello, World!";
                zmq_send(socket, id, id_size, ZMQ_SNDMORE);
                zmq_send(socket, http_response, strlen(http_response), ZMQ_SNDMORE);

                /* Closes the connection by sending the ID frame followed by a zero response */
                zmq_send (socket, id, id_size, ZMQ_SNDMORE);
                zmq_send(socket, 0, 0, ZMQ_SNDMORE);
                /* NOTE: If we don't use ZMQ_SNDMORE, then we won't be able to send more */
                /* message to any client */
                rc = zmq_msg_close(&msg);
            }
        }
        while (1);
        
        rc = zmq_close(socket);
        socket = NULL;
    }

    rc = zmq_ctx_term(context);
    context = NULL;
}

标签: ZeroMQ, tcp, 通信, non-ZMQ

添加新评论