ws_server.h 3.9 KB
Newer Older
O
overweight 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
/******************************************************************************
 * Copyright (c) Huawei Technologies Co., Ltd. 2019-2019. All rights reserved.
 * iSulad licensed under the Mulan PSL v1.
 * You can use this software according to the terms and conditions of the Mulan PSL v1.
 * You may obtain a copy of Mulan PSL v1 at:
 *     http://license.coscl.org.cn/MulanPSL
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
 * PURPOSE.
 * See the Mulan PSL v1 for more details.
 * Description: websockets server implementation
 * Author: wujing
 * Create: 2019-01-02
 ******************************************************************************/

#ifndef __WEBSOCKET_SERVER_H_
#define __WEBSOCKET_SERVER_H_
#include <vector>
#include <map>
#include <string>
#include <mutex>
#include <atomic>
#include <memory>
#include <thread>
#include <libwebsockets.h>
#include "route_callback_register.h"
#include "url.h"
#include "errors.h"

#define MAX_ECHO_PAYLOAD 1024
#define MAX_ARRAY_LEN 2
#define MAX_BUF_LEN 256
#define MAX_PROTOCOL_NUM 2
#define MAX_HTTP_HEADER_POOL 8
#define MIN_VEC_SIZE 3
#define PIPE_FD_NUM 2
#define BUF_BASE_SIZE 1024
#define LWS_TIMEOUT 50

struct per_session_data__echo {
    size_t rx, tx;
    unsigned char buf[LWS_PRE + MAX_ECHO_PAYLOAD + 1];
    unsigned int len;
    unsigned int index;
    int final;
    int continuation;
    int binary;
};

enum WebsocketChannel {
    STDINCHANNEL = 0,
    STDOUTCHANNEL,
    STDERRCHANNEL
};

struct session_data {
    std::array<int, MAX_ARRAY_LEN> pipes;
    unsigned char *buf;
    volatile bool sended { false };
    volatile bool close { false };
    volatile bool in_processing { false };
    std::mutex *buf_mutex;
    std::mutex *sended_mutex;

    void SetProcessingStatus(bool status)
    {
        in_processing = status;
    }
    bool GetProcessingStatus() const
    {
        return in_processing;
    }
};

class WebsocketServer {
public:
    static WebsocketServer *GetInstance() noexcept;
    static std::atomic<WebsocketServer *> m_instance;
    void Start(Errors &err);
    void Wait();
    void Shutdown();
    void RegisterCallback(const std::string &path, std::shared_ptr<StreamingServeInterface> callback);
    url::URLDatum GetWebsocketUrl();
    std::map<struct lws *, session_data> &GetWsisData();
    void SetLwsSendedFlag(struct lws *wsi, bool sended);
    void LockAllWsSession();
    void UnlockAllWsSession();

private:
    WebsocketServer();
    WebsocketServer(const WebsocketServer &) = delete;
    WebsocketServer &operator=(const WebsocketServer &) = delete;
Z
zhuchunyi 已提交
93
    virtual ~WebsocketServer();
O
overweight 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
    int InitRWPipe(int read_fifo[]);
    std::vector<std::string> split(std::string str, char r);
    static void EmitLog(int level, const char *line);
    int CreateContext();
    inline void Receive(struct lws *client, void *user, void *in, size_t len);
    int  Wswrite(struct lws *wsi, void *in, size_t len);
    inline int DumpHandshakeInfo(struct lws *wsi) noexcept;
    static int Callback(struct lws *wsi, enum lws_callback_reasons reason,
                        void *user, void *in, size_t len);
    void ServiceWorkThread(int threadid);
    void CloseWsSession(struct lws *wsi);
    void CloseAllWsSession();

private:
    static std::mutex m_mutex;
    static struct lws_context *m_context;
    volatile int m_force_exit = 0;
    std::thread m_pthread_service;
    const struct lws_protocols m_protocols[MAX_PROTOCOL_NUM] = {
        {  "channel.k8s.io", Callback, sizeof(struct per_session_data__echo), MAX_ECHO_PAYLOAD, },
        { NULL, NULL, 0, 0 }
    };
    RouteCallbackRegister m_handler;
    static std::map<struct lws *, session_data> m_wsis;
    url::URLDatum m_url;
    int m_listenPort = 10251;
};

ssize_t WsWriteToClient(void *context, const void *data, size_t len);
int closeWsConnect(void *context, char **err);

#endif /*__WEBSOCKET_SERVER_H_*/