libwebsockets

简介

libwebsockets 是一个纯 C 语言的轻量级 WebSocket库,它的 CPU、内存占用很小,同时支持作为服务器端/客户端。其特性包括:

  • 支持 ws://wss:// 协议
  • 可以选择和 OpenSSLCyaSSL 或者 WolfSSL 链接
  • 轻量和高速,即使在每个线程处理多达250个连接的情况下
  • 支持事件循环、零拷贝。支持 poll()libev(epoll)libuv

libwebsockets 提供的 API 相当底层,实现简单的功能也需要相当冗长的代码。

构建

git clone git clone https://github.com/warmcat/libwebsockets.git
cd libwebsockets
mkdir build && cd build
cmake -DCMAKE_BUILD_TYPE=Debug -DCMAKE_INSTALL_PREFIX=/home/alex/CPP/lib/libwebsockets ..
make && make install

Echo 示例

CMake 项目配置

cmake_minimum_required(VERSION 2.8.9)
project(libws-study C)

include_directories(/home/alex/CPP/lib/libwebsockets/include)

set(CMAKE_CXX_FLAGS "-w -pthread")

set(SF_CLIENT client.c)
set(SF_SERVER server.c)

add_executable(client ${SF_CLIENT})
target_link_libraries(client /home/alex/CPP/lib/libwebsockets/lib/libwebsockets.so)

add_executable(server ${SF_SERVER})
target_link_libraries(server /home/alex/CPP/lib/libwebsockets/lib/libwebsockets.so)

客户端

#include "libwebsockets.h"
#include <signal.h>

static volatile int exit_sig = 0;
#define MAX_PAYLOAD_SIZE  10 * 1024

void sighdl( int sig ) {
    lwsl_notice( "%d traped", sig );
    exit_sig = 1;
}

/**
 * 会话上下文对象,结构根据需要自定义
 */
struct session_data {
    int msg_count;
    unsigned char buf[LWS_PRE + MAX_PAYLOAD_SIZE];
    int len;
};

/**
 * 某个协议下的连接发生事件时,执行的回调函数
 *
 * wsi:指向WebSocket实例的指针
 * reason:导致回调的事件
 * user 库为每个WebSocket会话分配的内存空间
 * in 某些事件使用此参数,作为传入数据的指针
 * len 某些事件使用此参数,说明传入数据的长度
 */
int callback( struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len ) {
    struct session_data *data = (struct session_data *) user;
    switch ( reason ) {
        case LWS_CALLBACK_CLIENT_ESTABLISHED:   // 连接到服务器后的回调
            lwsl_notice( "Connected to server\n" );
            break;

        case LWS_CALLBACK_CLIENT_RECEIVE:       // 接收到服务器数据后的回调,数据为in,其长度为len
            lwsl_notice( "Rx: %s\n", (char *) in );
            break;
        case LWS_CALLBACK_CLIENT_WRITEABLE:     // 当此客户端可以发送数据时的回调
            if ( data->msg_count < 3 ) {
                // 前面LWS_PRE个字节必须留给LWS
                memset( data->buf, 0, sizeof( data->buf ));
                char *msg = (char *) &data->buf[ LWS_PRE ];
                data->len = sprintf( msg, "你好 %d", ++data->msg_count );
                lwsl_notice( "Tx: %s\n", msg );
                // 通过WebSocket发送文本消息
                lws_write( wsi, &data->buf[ LWS_PRE ], data->len, LWS_WRITE_TEXT );
            }
            break;
    }
    return 0;
}

/**
 * 支持的WebSocket子协议数组
 * 子协议即JavaScript客户端WebSocket(url, protocols)第2参数数组的元素
 * 你需要为每种协议提供回调函数
 */
struct lws_protocols protocols[] = {
    {
        //协议名称,协议回调,接收缓冲区大小
        "", callback, sizeof( struct session_data ), MAX_PAYLOAD_SIZE,
    },
    {
        NULL, NULL,   0 // 最后一个元素固定为此格式
    }
};

int main() {
    // 信号处理函数
    signal( SIGTERM, sighdl );

    // 用于创建vhost或者context的参数
    struct lws_context_creation_info ctx_info = { 0 };
    ctx_info.port = CONTEXT_PORT_NO_LISTEN;
    ctx_info.iface = NULL;
    ctx_info.protocols = protocols;
    ctx_info.gid = -1;
    ctx_info.uid = -1;

    // 创建一个WebSocket处理器
    struct lws_context *context = lws_create_context( &ctx_info );

    char *address = "192.168.0.89";
    int port = 9090;
    char addr_port[256] = { 0 };
    sprintf( addr_port, "%s:%u", address, port & 65535 );

    // 客户端连接参数
    struct lws_client_connect_info conn_info = { 0 };
    conn_info.context = context;
    conn_info.address = address;
    conn_info.port = port;
    conn_info.ssl_connection = 0;
    conn_info.path = "/h264src";
    conn_info.host = addr_port;
    conn_info.origin = addr_port;
    conn_info.protocol = protocols[ 0 ].name;

    // 下面的调用触发LWS_CALLBACK_PROTOCOL_INIT事件
    // 创建一个客户端连接
    struct lws *wsi = lws_client_connect_via_info( &conn_info );
    while ( !exit_sig ) {
        // 执行一次事件循环(Poll),最长等待1000毫秒
        lws_service( context, 1000 );
        /**
         * 下面的调用的意义是:当连接可以接受新数据时,触发一次WRITEABLE事件回调
         * 当连接正在后台发送数据时,它不能接受新的数据写入请求,所有WRITEABLE事件回调不会执行
         */
        lws_callback_on_writable( wsi );
    }
    // 销毁上下文对象
    lws_context_destroy( context );

    return 0;
}

服务器

static int protocol0_callback( struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len ) {
    struct session_data *data = (struct session_data *) user;
    switch ( reason ) {
        case LWS_CALLBACK_ESTABLISHED:       // 当服务器和客户端完成握手后
            break;
        case LWS_CALLBACK_RECEIVE:           // 当接收到客户端发来的帧以后
            // 判断是否最后一帧
            data->fin = lws_is_final_fragment( wsi );
            // 判断是否二进制消息
            data->bin = lws_frame_is_binary( wsi );
            // 对服务器的接收端进行流量控制,如果来不及处理,可以控制之
            // 下面的调用禁止在此连接上接收数据
            lws_rx_flow_control( wsi, 0 );

            // 业务处理部分,为了实现Echo服务器,把客户端数据保存起来
            memcpy( &data->buf[ LWS_PRE ], in, len );
            data->len = len;

            // 需要给客户端应答时,触发一次写回调
            lws_callback_on_writable( wsi );
            break;
        case LWS_CALLBACK_SERVER_WRITEABLE:   // 当此连接可写时
            lws_write( wsi, &data->buf[ LWS_PRE ], data->len, LWS_WRITE_TEXT );
            // 下面的调用允许在此连接上接收数据
            lws_rx_flow_control( wsi, 1 );
            break;
    }
    // 回调函数最终要返回0,否则无法创建服务器
    return 0;
}

int main() {
    // 信号处理函数
    signal( SIGTERM, sighdl );

    struct lws_context_creation_info ctx_info = { 0 };
    ctx_info.port = 9090;
    ctx_info.iface = NULL; // 在所有网络接口上监听
    ctx_info.protocols = protocols;
    ctx_info.gid = -1;
    ctx_info.uid = -1;
    ctx_info.options = LWS_SERVER_OPTION_VALIDATE_UTF8;
    struct lws_context *context = lws_create_context( &ctx_info );
    while ( !exit_sig ) {
        lws_service( context, 1000 );
    }
    lws_context_destroy( context );
}

封装

为了简化编程复杂度,应该考虑对libwebsockets进行适当封装。本节给出一个简单封装的例子。

客户端封装

#ifndef LIVE555_WSCLIENT_H
#define LIVE555_WSCLIENT_H

#include "libwebsockets.h"

#ifndef LWS_MAX_PAYLOAD_SIZE
#define LWS_MAX_PAYLOAD_SIZE  1024 * 1024
#endif

#ifndef SPDLOG_CONST
#define SPDLOG_CONST
const auto LOGGER = spdlog::stdout_color_st( "console" );
#endif

/**
 * 通用回调函数签名
 */
typedef void (*lws_callback)( struct lws *wsi, void *user, void *in, size_t len );

// 用户数据对象
typedef struct lws_user_data {
    // 缓冲区
    unsigned char *buf;
    // 缓冲区有效字节数
    int len;
    // 用户自定义数据
    void *user;
    // 读写缓冲区之前需要加锁
    volatile bool locked;
    // 指示当前缓冲区的数据的重要性,如果为真,发送之前不得被覆盖
    volatile bool critical;
    // 本次数据发送类型
    lws_write_protocol type;
    // 回调函数
    lws_callback esta_callback;
    lws_callback recv_callback;
    lws_callback writ_callback;
};

void writ_callback_send_buf( struct lws *wsi, void *user, void *in, size_t len ) {
    struct lws_user_data *data = (struct lws_user_data *) user;
    if ( __sync_bool_compare_and_swap( &data->locked, 0, 1 )) {
        unsigned char *buf;
        char hex[128]= { 0 };
        int writ_count;
 
        int len = data->len;
        if ( len == 0 ) goto cleanup;
 
        buf = data->buf + LWS_PRE;
        writ_count = lws_write( wsi, buf, len, data->type );
        if ( data->type == LWS_WRITE_BINARY ) {
            char *phex = hex;
            for ( int i = 0; i < 16; i++ ) {
                unsigned char c = *buf++;
                sprintf( phex, "%02x ", c );
                phex += 3;
            }
        }
        LOGGER->debug( "lws_write {} bytes: {}...", writ_count, hex );
        cleanup:
        data->locked = 0;
        data->critical = 0;
        data->len = 0;
    }
}

static int lws_protocol_0_callback( struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len ) {
    struct lws_user_data *data = (struct lws_user_data *) user;
    switch ( reason ) {
        case LWS_CALLBACK_CLIENT_ESTABLISHED:
            if ( data->esta_callback )data->esta_callback( wsi, user, in, len );
            break;
        case LWS_CALLBACK_CLIENT_RECEIVE:
            if ( data->recv_callback )data->recv_callback( wsi, user, in, len );
            break;
        case LWS_CALLBACK_CLIENT_WRITEABLE:
            if ( data->writ_callback )data->writ_callback( wsi, user, in, len );
            break;
    }
    return 0;
}

typedef struct lws_client {
    struct lws *wsi;
    struct lws_context *context;
    lws_user_data *data;
    int *cycle;

    // 连接参数
    char *address;
    char *path;
    int port;

    void (*fill_buf)( lws_client *client, void *buf, int len, lws_write_protocol type );

    void (*fire_writable)( lws_client *client );
};

void fill_buf( lws_client *client, void *buf, int len, lws_write_protocol type ) {
    lws_user_data *data = client->data;
    data->type = type;
    data->len = len;
    memcpy( data->buf + LWS_PRE, buf, len );
}

void fire_writable( lws_client *client ) {
    lws_callback_on_writable( client->wsi );
    // 停止当前事件循环等待
    lws_cancel_service( client->context );
}

void *lws_service_thread_func( void *arg ) {
    lws_client *client = (lws_client *) arg;

    struct lws_context_creation_info ctx_info = { 0 };
    ctx_info.port = CONTEXT_PORT_NO_LISTEN;
    ctx_info.iface = NULL;
    const struct lws_protocols protocols[] = {
        {
            "", lws_protocol_0_callback, sizeof( struct lws_user_data ), LWS_MAX_PAYLOAD_SIZE, 0, 0, LWS_MAX_PAYLOAD_SIZE
        },
        {
            NULL, NULL,                  0
        }
    };
    static const struct lws_extension exts[] = {
        {
            "permessage-deflate",
            lws_extension_callback_pm_deflate,
            "permessage-deflate; client_no_context_takeover; client_max_window_bits"
        },
        { NULL, NULL, NULL /* terminator */ }
    };
    ctx_info.protocols = protocols;
    ctx_info.extensions = exts;
    ctx_info.options = LWS_SERVER_OPTION_VALIDATE_UTF8;
    ctx_info.gid = -1;
    ctx_info.uid = -1;

    struct lws_context *context = lws_create_context( &ctx_info );
    client->context = context;

    char addr_port[256] = { 0 };
    sprintf( addr_port, "%s:%u", client->address, client->port & 65535 );

    struct lws_client_connect_info conn_info = { 0 };
    conn_info.context = context;
    conn_info.address = client->address;
    conn_info.port = client->port;
    conn_info.ssl_connection = 0;
    conn_info.path = client->path;
    conn_info.host = addr_port;
    conn_info.origin = addr_port;
    conn_info.protocol = protocols[ 0 ].name;
    // 用户数据对象由调用者提供,因为需要提供回调
    conn_info.userdata = client->data;

    struct lws *wsi = lws_client_connect_via_info( &conn_info );
    client->wsi = wsi;

    int *loop_cycle = client->cycle;
    int cycle = *loop_cycle;
    while ( *loop_cycle >= 0 ) {
        lws_service( context, cycle );
    }
    lws_context_destroy( context );
}

/**
 * 连接到WebSocket服务器
 * @param address  IP地址
 * @param path  上下文路径URL
 * @param port 端口
 * @param data 用户数据
 * @param loop_cycle 事件循环周期,如果大于等于0则启动事件循环,后续将其置为-1则导致循环终止
 * @return
 */
lws_client *lws_connect( char *address, char *path, int port, lws_user_data *data, int loop_cycle ) {
    lws_client *client = (lws_client *) malloc( sizeof( lws_client ));
    client->data = data;
    client->cycle = (int *) malloc( sizeof( int ));
    *client->cycle = loop_cycle;
    client->address = address;
    client->path = path;
    client->port = port;
    client->fill_buf = fill_buf;
    client->fire_writable = fire_writable;
    pthread_t *lws_service_thread = (pthread_t *) malloc( sizeof( pthread_t ));
    pthread_create( lws_service_thread, NULL, lws_service_thread_func, client );
    return client;

}

#endif

使用客户端封装

// 创建用户数据对象
lws_user_data *data = new lws_user_data();
data->buf = new unsigned char[LWS_PRE + LWS_MAX_PAYLOAD_SIZE];
data->writ_callback = writ_callback_send_buf_bin;  // 注册回调

// 创建客户端
lws_client *ws_client = lws_connect( "192.168.0.89", "/h264src", 9090, data, 10 );

// 发送数据,需要同步
lws_user_data *data = client->data;
// GCC内置CAS语义
if ( __sync_bool_compare_and_swap( &data->locked, 0, 1 )) {
    client->fill_buf( client, sink->recvBuf, frameSize );
    client->fire_writable( client );
    data->locked = 0;
}

常见问题

error on reading from skt : 104 错误代码104的含义是连接被重置,我遇到这个问题的原因是,Spring的WebSocket消息缓冲区大小不足。

WebSocket++

简介

WebSocket++ 是一个仅仅由头文件构成的 C++ 库,它实现了 WebSocket 协议(RFC6455),通过它,你可以在 C++ 项目中使用 WebSocket 客户端或者服务器。WebSocket++ 使用两个可以相互替换的网络传输模块,其中一个基于 C++ I/O 流,另一个基于 Asio。

WebSocket++ 的主要特性包括:

  • 事件驱动的接口
  • 支持 WSS、IPv6
  • 灵活的依赖管理 —— Boost或者C++ 11标准库
  • 可移植性:Posix/Windows、32/64bit、Intel/ARM/PPC
  • 线程安全

构建

git clone https://github.com/zaphoyd/websocketpp.git
cd websocketpp
mkdir build && cd build
cmake -DCMAKE_BUILD_TYPE=Debug -DCMAKE_INSTALL_PREFIX=/home/alex/CPP/lib/websocketpp ..
make && make install

Echo 示例

CMake 项目配置

cmake_minimum_required(VERSION 3.6)
project(websocket__)

set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_FLAGS "-pthread")
add_definitions(-D_WEBSOCKETPP_CPP11_FUNCTIONAL_)
add_definitions(-D_WEBSOCKETPP_CPP11_THREAD_)
add_definitions(-D_WEBSOCKETPP_CPP11_SYSTEM_ERROR_)
add_definitions(-D_WEBSOCKETPP_CPP11_MEMORY_)


include_directories(/home/alex/CPP/lib/websocketpp/include /home/alex/CPP/lib/boost/1.65.1/include/)

set(SF_CLIENT client.cpp)
add_executable(client ${SF_CLIENT})
target_link_libraries(client /home/alex/CPP/lib/boost/1.65.1/lib/libboost_system.so)

set(SF_SERVER server.cpp)
add_executable(server ${SF_SERVER})
target_link_libraries(server /home/alex/CPP/lib/boost/1.65.1/lib/libboost_system.so)

客户端

#include <websocketpp/config/asio_no_tls_client.hpp>
#include <websocketpp/client.hpp>
#include <iostream>

typedef websocketpp::client<websocketpp::config::asio_client> client;

using websocketpp::lib::placeholders::_1;
using websocketpp::lib::placeholders::_2;
using websocketpp::lib::bind;

// 消息指针
typedef websocketpp::config::asio_client::message_type::ptr message_ptr;

// 打开连接时的回调
void on_open( client *c, websocketpp::connection_hdl hdl ) {
    std::string msg = "Hello 1";
    // 发送文本消息
    c->send( hdl, msg, websocketpp::frame::opcode::text );
    c->get_alog().write( websocketpp::log::alevel::app, "Tx: " + msg );

}

// 连接失败时的回调
void on_fail( client *c, websocketpp::connection_hdl hdl ) {
    c->get_alog().write( websocketpp::log::alevel::app, "Connection Failed" );
}

// 接收到服务器发来的WebSocket消息后的回调
void on_message( client *c, websocketpp::connection_hdl hdl, message_ptr msg ) {
    c->get_alog().write( websocketpp::log::alevel::app, "Rx: " + msg->get_payload());
    // 关闭连接,导致事件循环退出
    c->close( hdl, websocketpp::close::status::normal, "" );
}

// 关闭连接时的回调
void on_close( client *c, websocketpp::connection_hdl hdl ) {
}

int main( int argc, char *argv[] ) {
    client echo_client;

    // 调整日志策略
    echo_client.clear_access_channels( websocketpp::log::alevel::frame_header );
    echo_client.clear_access_channels( websocketpp::log::alevel::frame_payload );

    std::string uri = "ws://192.168.0.89:9090/h264src";

    try {
        // 初始化ASIO ASIO
        echo_client.init_asio();

        // 注册回调函数
        echo_client.set_open_handler( std::bind( &on_open, &echo_client, ::_1 ));
        echo_client.set_fail_handler( std::bind( &on_fail, &echo_client, ::_1 ));
        echo_client.set_message_handler( std::bind( &on_message, &echo_client, ::_1, ::_2 ));
        echo_client.set_close_handler( std::bind( &on_close, &echo_client, ::_1 ));

        // 在事件循环启动前创建一个连接对象
        websocketpp::lib::error_code ec;
        client::connection_ptr con = echo_client.get_connection( uri, ec );
        echo_client.connect( con );
        con->get_handle(); // 连接句柄,发送消息时必须要传入

        // 启动事件循环(ASIO的io_service),当前线程阻塞
        echo_client.run();
    } catch ( const std::exception &e ) {
        std::cout << e.what() << std::endl;
    } catch ( websocketpp::lib::error_code e ) {
        std::cout << e.message() << std::endl;
    } catch ( ... ) {
        std::cout << "other exception" << std::endl;
    }
}

服务器

#include <iostream>
#include <websocketpp/config/asio_no_tls.hpp>
#include <websocketpp/server.hpp>

typedef websocketpp::server<websocketpp::config::asio> server;
typedef websocketpp::config::asio::message_type::ptr message_ptr;
using websocketpp::lib::placeholders::_1;
using websocketpp::lib::placeholders::_2;
using websocketpp::lib::bind;

void on_open( server *s, websocketpp::connection_hdl hdl ) {
    // 根据连接句柄获得连接对象
    server::connection_ptr con = s->get_con_from_hdl( hdl );
    // 获得URL路径
    std::string path = con->get_resource();
    s->get_alog().write( websocketpp::log::alevel::app, "Connected to path " + path );
}

void on_message( server *s, websocketpp::connection_hdl hdl, message_ptr msg ) {
    s->send( hdl, msg->get_payload(), websocketpp::frame::opcode::text );
}

int main() {
    server echo_server;
    // 调整日志策略
    echo_server.set_access_channels( websocketpp::log::alevel::all );
    echo_server.clear_access_channels( websocketpp::log::alevel::frame_payload );

    try {
        echo_server.init_asio();

        echo_server.set_open_handler( bind( &on_open, &echo_server, ::_1 ));
        echo_server.set_message_handler( bind( &on_message, &echo_server, ::_1, ::_2 ));
        // 在所有网络接口的9090上监听
        echo_server.listen( 9090 );

        // 启动服务器端Accept事件循环
        echo_server.start_accept();

        // 启动事件循环(ASIO的io_service),当前线程阻塞
        echo_server.run();
    } catch ( websocketpp::exception const &e ) {
        std::cout << e.what() << std::endl;
    } catch ( ... ) {
        std::cout << "other exception" << std::endl;
    }
}

转载自:基于C/C++的WebSocket库

标签: none

添加新评论