transComm.h 8.6 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifdef USE_UV

#include <uv.h>
#include "lz4.h"
#include "os.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
#include "tmsg.h"
#include "transportInt.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"

typedef void* queue[2];
/* Private macros. */
#define QUEUE_NEXT(q) (*(queue**)&((*(q))[0]))
#define QUEUE_PREV(q) (*(queue**)&((*(q))[1]))

#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Initialize an empty queue. */
#define QUEUE_INIT(q)    \
  {                      \
    QUEUE_NEXT(q) = (q); \
    QUEUE_PREV(q) = (q); \
  }

/* Return true if the queue has no element. */
#define QUEUE_IS_EMPTY(q) ((const queue*)(q) == (const queue*)QUEUE_NEXT(q))

/* Insert an element at the back of a queue. */
#define QUEUE_PUSH(q, e)           \
  {                                \
    QUEUE_NEXT(e) = (q);           \
    QUEUE_PREV(e) = QUEUE_PREV(q); \
    QUEUE_PREV_NEXT(e) = (e);      \
    QUEUE_PREV(q) = (e);           \
  }

/* Remove the given element from the queue. Any element can be removed at any *
 * time. */
#define QUEUE_REMOVE(e)                 \
  {                                     \
    QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
    QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
  }
dengyihao's avatar
dengyihao 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
#define QUEUE_SPLIT(h, q, n)       \
  do {                             \
    QUEUE_PREV(n) = QUEUE_PREV(h); \
    QUEUE_PREV_NEXT(n) = (n);      \
    QUEUE_NEXT(n) = (q);           \
    QUEUE_PREV(h) = QUEUE_PREV(q); \
    QUEUE_PREV_NEXT(h) = (h);      \
    QUEUE_PREV(q) = (n);           \
  } while (0)

#define QUEUE_MOVE(h, n)        \
  do {                          \
    if (QUEUE_IS_EMPTY(h)) {    \
      QUEUE_INIT(n);            \
    } else {                    \
      queue* q = QUEUE_HEAD(h); \
      QUEUE_SPLIT(h, q, n);     \
    }                           \
  } while (0)
dengyihao's avatar
dengyihao 已提交
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102

/* Return the element at the front of the queue. */
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))

/* Return the element at the back of the queue. */
#define QUEUE_TAIL(q) (QUEUE_PREV(q))

/* Iterate over the element of a queue. * Mutating the queue while iterating
 * results in undefined behavior. */
#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q))

/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))

typedef struct {
dengyihao's avatar
dengyihao 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  SRpcInfo* pRpc;     // associated SRpcInfo
  SEpSet    epSet;    // ip list provided by app
  void*     ahandle;  // handle provided by app
  // struct SRpcConn* pConn;     // pConn allocated
  tmsg_t   msgType;  // message type
  uint8_t* pCont;    // content provided by app
  int32_t  contLen;  // content length
  // int32_t  code;     // error code
  // int16_t  numOfTry;  // number of try for different servers
  // int8_t   oldInUse;  // server EP inUse passed by app
  // int8_t   redirect;  // flag to indicate redirect
  int8_t   connType;  // connection type
  int64_t  rid;       // refId returned by taosAddRef
  SRpcMsg* pRsp;      // for synchronous API
  tsem_t*  pSem;      // for synchronous API
  char*    ip;
  uint32_t port;
  // SEpSet*          pSet;      // for synchronous API
dengyihao's avatar
dengyihao 已提交
121 122
} SRpcReqContext;

dengyihao's avatar
formate  
dengyihao 已提交
123 124
typedef SRpcMsg      STransMsg;
typedef SRpcInfo     STrans;
U
ubuntu 已提交
125 126
typedef SRpcConnInfo STransHandleInfo;

dengyihao's avatar
dengyihao 已提交
127
typedef struct {
dengyihao's avatar
dengyihao 已提交
128 129
  SEpSet epSet;    // ip list provided by app
  void*  ahandle;  // handle provided by app
dengyihao's avatar
dengyihao 已提交
130 131 132 133 134 135 136 137
  // struct SRpcConn* pConn;     // pConn allocated
  tmsg_t   msgType;  // message type
  uint8_t* pCont;    // content provided by app
  int32_t  contLen;  // content length
  // int32_t  code;     // error code
  // int16_t  numOfTry;  // number of try for different servers
  // int8_t   oldInUse;  // server EP inUse passed by app
  // int8_t   redirect;  // flag to indicate redirect
dengyihao's avatar
dengyihao 已提交
138 139 140
  int8_t  connType;  // connection type
  int64_t rid;       // refId returned by taosAddRef

U
ubuntu 已提交
141
  STransMsg* pRsp;  // for synchronous API
dengyihao's avatar
formate  
dengyihao 已提交
142
  tsem_t*    pSem;  // for synchronous API
dengyihao's avatar
dengyihao 已提交
143

dengyihao's avatar
dengyihao 已提交
144
  int      hThrdIdx;
dengyihao's avatar
dengyihao 已提交
145 146 147 148 149 150 151 152 153 154 155
  char*    ip;
  uint32_t port;
  // SEpSet*          pSet;      // for synchronous API
} STransConnCtx;

#pragma pack(push, 1)

typedef struct {
  char version : 4;  // RPC version
  char comp : 4;     // compression algorithm, 0:no compression 1:lz4
  char resflag : 2;  // reserved bits
dengyihao's avatar
dengyihao 已提交
156 157
  char spi : 1;      // security parameter index
  char secured : 2;
dengyihao's avatar
dengyihao 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
  char encrypt : 3;  // encrypt algorithm, 0: no encryption

  uint32_t code;  // del later
  uint32_t msgType;
  int32_t  msgLen;
  uint8_t  content[0];  // message body starts from here
} STransMsgHead;

typedef struct {
  int32_t reserved;
  int32_t contLen;
} STransCompMsg;

typedef struct {
  uint32_t timeStamp;
  uint8_t  auth[TSDB_AUTH_LEN];
} STransDigestMsg;

dengyihao's avatar
dengyihao 已提交
176 177
typedef struct {
  uint8_t user[TSDB_UNI_LEN];
dengyihao's avatar
dengyihao 已提交
178
  uint8_t secret[TSDB_PASSWORD_LEN];
dengyihao's avatar
dengyihao 已提交
179 180
} STransUserMsg;

dengyihao's avatar
dengyihao 已提交
181 182
#pragma pack(pop)

dengyihao's avatar
dengyihao 已提交
183
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
dengyihao's avatar
dengyihao 已提交
184
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
dengyihao's avatar
dengyihao 已提交
185

dengyihao's avatar
dengyihao 已提交
186
#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest))
dengyihao's avatar
dengyihao 已提交
187 188 189 190 191 192
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)

dengyihao's avatar
dengyihao 已提交
193 194
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))

dengyihao's avatar
dengyihao 已提交
195
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
dengyihao's avatar
dengyihao 已提交
196 197 198 199 200 201
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)

dengyihao's avatar
dengyihao 已提交
202 203 204 205 206
int       rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
void      rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
int32_t   rpcCompressRpcMsg(char* pCont, int32_t contLen);
SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead);

dengyihao's avatar
dengyihao 已提交
207 208 209 210 211
int  transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);

dengyihao's avatar
dengyihao 已提交
212 213
void transConnCtxDestroy(STransConnCtx* ctx);

dengyihao's avatar
dengyihao 已提交
214
void transFreeMsg(void* msg);
dengyihao's avatar
dengyihao 已提交
215 216

//
dengyihao's avatar
dengyihao 已提交
217 218 219 220
typedef struct SConnBuffer {
  char* buf;
  int   len;
  int   cap;
dengyihao's avatar
fix bug  
dengyihao 已提交
221
  int   total;
dengyihao's avatar
dengyihao 已提交
222 223
} SConnBuffer;

dengyihao's avatar
dengyihao 已提交
224 225
typedef void (*AsyncCB)(uv_async_t* handle);

dengyihao's avatar
dengyihao 已提交
226 227 228 229 230 231
typedef struct {
  void*           pThrd;
  queue           qmsg;
  pthread_mutex_t mtx;  // protect qmsg;
} SAsyncItem;

dengyihao's avatar
dengyihao 已提交
232 233 234 235 236 237
typedef struct {
  int         index;
  int         nAsync;
  uv_async_t* asyncs;
} SAsyncPool;

dengyihao's avatar
dengyihao 已提交
238
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
dengyihao's avatar
dengyihao 已提交
239
void        transDestroyAsyncPool(SAsyncPool* pool);
dengyihao's avatar
dengyihao 已提交
240
int         transSendAsync(SAsyncPool* pool, queue* mq);
dengyihao's avatar
dengyihao 已提交
241

dengyihao's avatar
dengyihao 已提交
242 243 244 245 246
int  transInitBuffer(SConnBuffer* buf);
int  transClearBuffer(SConnBuffer* buf);
int  transDestroyBuffer(SConnBuffer* buf);
int  transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
bool transReadComplete(SConnBuffer* connBuf);
dengyihao's avatar
dengyihao 已提交
247

dengyihao's avatar
dengyihao 已提交
248
int transSetConnOption(uv_tcp_t* stream);
dengyihao's avatar
dengyihao 已提交
249

dengyihao's avatar
dengyihao 已提交
250 251 252 253 254
void transRefSrvHandle(void* handle);
void transUnrefSrvHandle(void* handle);

void transRefCliHandle(void* handle);
void transUnrefCliHandle(void* handle);
dengyihao's avatar
dengyihao 已提交
255

dengyihao's avatar
formate  
dengyihao 已提交
256 257
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg);
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp);
U
ubuntu 已提交
258
void transSendResponse(const STransMsg* pMsg);
dengyihao's avatar
formate  
dengyihao 已提交
259
int  transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
U
ubuntu 已提交
260 261 262 263

void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);

dengyihao's avatar
formate  
dengyihao 已提交
264 265
void transCloseClient(void* arg);
void transCloseServer(void* arg);
U
ubuntu 已提交
266

dengyihao's avatar
dengyihao 已提交
267
#endif