ws_server.h 3.9 KB
Newer Older
O
overweight 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
/******************************************************************************
 * Copyright (c) Huawei Technologies Co., Ltd. 2019-2019. All rights reserved.
 * iSulad licensed under the Mulan PSL v1.
 * You can use this software according to the terms and conditions of the Mulan PSL v1.
 * You may obtain a copy of Mulan PSL v1 at:
 *     http://license.coscl.org.cn/MulanPSL
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
 * PURPOSE.
 * See the Mulan PSL v1 for more details.
 * Description: websockets server implementation
 * Author: wujing
 * Create: 2019-01-02
 ******************************************************************************/

#ifndef __WEBSOCKET_SERVER_H_
#define __WEBSOCKET_SERVER_H_
#include <vector>
#include <map>
#include <string>
#include <mutex>
#include <atomic>
#include <memory>
#include <thread>
#include <libwebsockets.h>
#include "route_callback_register.h"
#include "url.h"
#include "errors.h"

#define MAX_ECHO_PAYLOAD 1024
#define MAX_ARRAY_LEN 2
#define MAX_BUF_LEN 256
#define MAX_PROTOCOL_NUM 2
#define MAX_HTTP_HEADER_POOL 8
#define MIN_VEC_SIZE 3
#define PIPE_FD_NUM 2
#define BUF_BASE_SIZE 1024
#define LWS_TIMEOUT 50

struct per_session_data__echo {
    size_t rx, tx;
    unsigned char buf[LWS_PRE + MAX_ECHO_PAYLOAD + 1];
    unsigned int len;
    unsigned int index;
    int final;
    int continuation;
    int binary;
};

enum WebsocketChannel {
    STDINCHANNEL = 0,
    STDOUTCHANNEL,
    STDERRCHANNEL
};

struct session_data {
    std::array<int, MAX_ARRAY_LEN> pipes;
    unsigned char *buf;
    volatile bool sended { false };
    volatile bool close { false };
    volatile bool in_processing { false };
    std::mutex *buf_mutex;
    std::mutex *sended_mutex;

    void SetProcessingStatus(bool status)
    {
        in_processing = status;
    }
    bool GetProcessingStatus() const
    {
        return in_processing;
    }
};

class WebsocketServer {
public:
    static WebsocketServer *GetInstance() noexcept;
    static std::atomic<WebsocketServer *> m_instance;
    void Start(Errors &err);
    void Wait();
    void Shutdown();
    void RegisterCallback(const std::string &path, std::shared_ptr<StreamingServeInterface> callback);
    url::URLDatum GetWebsocketUrl();
    std::map<struct lws *, session_data> &GetWsisData();
    void SetLwsSendedFlag(struct lws *wsi, bool sended);
    void LockAllWsSession();
    void UnlockAllWsSession();

private:
    WebsocketServer();
    WebsocketServer(const WebsocketServer &) = delete;
    WebsocketServer &operator=(const WebsocketServer &) = delete;
    ~WebsocketServer();
    int InitRWPipe(int read_fifo[]);
    std::vector<std::string> split(std::string str, char r);
    static void EmitLog(int level, const char *line);
    int CreateContext();
    inline void Receive(struct lws *client, void *user, void *in, size_t len);
    int  Wswrite(struct lws *wsi, void *in, size_t len);
    inline int DumpHandshakeInfo(struct lws *wsi) noexcept;
    static int Callback(struct lws *wsi, enum lws_callback_reasons reason,
                        void *user, void *in, size_t len);
    void ServiceWorkThread(int threadid);
    void CloseWsSession(struct lws *wsi);
    void CloseAllWsSession();

private:
    static std::mutex m_mutex;
    static struct lws_context *m_context;
    volatile int m_force_exit = 0;
    std::thread m_pthread_service;
    const struct lws_protocols m_protocols[MAX_PROTOCOL_NUM] = {
        {  "channel.k8s.io", Callback, sizeof(struct per_session_data__echo), MAX_ECHO_PAYLOAD, },
        { NULL, NULL, 0, 0 }
    };
    RouteCallbackRegister m_handler;
    static std::map<struct lws *, session_data> m_wsis;
    url::URLDatum m_url;
    int m_listenPort = 10251;
};

ssize_t WsWriteToClient(void *context, const void *data, size_t len);
int closeWsConnect(void *context, char **err);

#endif /*__WEBSOCKET_SERVER_H_*/