transComm.h 7.9 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifdef USE_UV

#include <uv.h>
#include "lz4.h"
#include "os.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
#include "tmsg.h"
#include "transportInt.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"

typedef void* queue[2];
/* Private macros. */
#define QUEUE_NEXT(q) (*(queue**)&((*(q))[0]))
#define QUEUE_PREV(q) (*(queue**)&((*(q))[1]))

#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Initialize an empty queue. */
#define QUEUE_INIT(q)    \
  {                      \
    QUEUE_NEXT(q) = (q); \
    QUEUE_PREV(q) = (q); \
  }

/* Return true if the queue has no element. */
#define QUEUE_IS_EMPTY(q) ((const queue*)(q) == (const queue*)QUEUE_NEXT(q))

/* Insert an element at the back of a queue. */
#define QUEUE_PUSH(q, e)           \
  {                                \
    QUEUE_NEXT(e) = (q);           \
    QUEUE_PREV(e) = QUEUE_PREV(q); \
    QUEUE_PREV_NEXT(e) = (e);      \
    QUEUE_PREV(q) = (e);           \
  }

/* Remove the given element from the queue. Any element can be removed at any *
 * time. */
#define QUEUE_REMOVE(e)                 \
  {                                     \
    QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
    QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
  }
dengyihao's avatar
dengyihao 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
#define QUEUE_SPLIT(h, q, n)       \
  do {                             \
    QUEUE_PREV(n) = QUEUE_PREV(h); \
    QUEUE_PREV_NEXT(n) = (n);      \
    QUEUE_NEXT(n) = (q);           \
    QUEUE_PREV(h) = QUEUE_PREV(q); \
    QUEUE_PREV_NEXT(h) = (h);      \
    QUEUE_PREV(q) = (n);           \
  } while (0)

#define QUEUE_MOVE(h, n)        \
  do {                          \
    if (QUEUE_IS_EMPTY(h)) {    \
      QUEUE_INIT(n);            \
    } else {                    \
      queue* q = QUEUE_HEAD(h); \
      QUEUE_SPLIT(h, q, n);     \
    }                           \
  } while (0)
dengyihao's avatar
dengyihao 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104

/* Return the element at the front of the queue. */
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))

/* Return the element at the back of the queue. */
#define QUEUE_TAIL(q) (QUEUE_PREV(q))

/* Iterate over the element of a queue. * Mutating the queue while iterating
 * results in undefined behavior. */
#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q))

/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))

typedef struct {
dengyihao's avatar
dengyihao 已提交
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
  SRpcInfo* pRpc;     // associated SRpcInfo
  SEpSet    epSet;    // ip list provided by app
  void*     ahandle;  // handle provided by app
  // struct SRpcConn* pConn;     // pConn allocated
  tmsg_t   msgType;  // message type
  uint8_t* pCont;    // content provided by app
  int32_t  contLen;  // content length
  // int32_t  code;     // error code
  // int16_t  numOfTry;  // number of try for different servers
  // int8_t   oldInUse;  // server EP inUse passed by app
  // int8_t   redirect;  // flag to indicate redirect
  int8_t   connType;  // connection type
  int64_t  rid;       // refId returned by taosAddRef
  SRpcMsg* pRsp;      // for synchronous API
  tsem_t*  pSem;      // for synchronous API
  char*    ip;
  uint32_t port;
  // SEpSet*          pSet;      // for synchronous API
dengyihao's avatar
dengyihao 已提交
123 124
} SRpcReqContext;

dengyihao's avatar
dengyihao 已提交
125
typedef struct {
dengyihao's avatar
dengyihao 已提交
126 127 128
  SRpcInfo* pTransInst;  // associated SRpcInfo
  SEpSet    epSet;       // ip list provided by app
  void*     ahandle;     // handle provided by app
dengyihao's avatar
dengyihao 已提交
129 130 131 132 133 134 135 136
  // struct SRpcConn* pConn;     // pConn allocated
  tmsg_t   msgType;  // message type
  uint8_t* pCont;    // content provided by app
  int32_t  contLen;  // content length
  // int32_t  code;     // error code
  // int16_t  numOfTry;  // number of try for different servers
  // int8_t   oldInUse;  // server EP inUse passed by app
  // int8_t   redirect;  // flag to indicate redirect
dengyihao's avatar
dengyihao 已提交
137 138 139 140 141 142
  int8_t  connType;  // connection type
  int64_t rid;       // refId returned by taosAddRef

  SRpcMsg* pRsp;  // for synchronous API
  tsem_t*  pSem;  // for synchronous API

dengyihao's avatar
dengyihao 已提交
143 144 145 146 147 148 149 150 151 152 153
  char*    ip;
  uint32_t port;
  // SEpSet*          pSet;      // for synchronous API
} STransConnCtx;

#pragma pack(push, 1)

typedef struct {
  char version : 4;  // RPC version
  char comp : 4;     // compression algorithm, 0:no compression 1:lz4
  char resflag : 2;  // reserved bits
dengyihao's avatar
dengyihao 已提交
154 155
  char spi : 1;      // security parameter index
  char secured : 2;
dengyihao's avatar
dengyihao 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
  char encrypt : 3;  // encrypt algorithm, 0: no encryption

  uint32_t code;  // del later
  uint32_t msgType;
  int32_t  msgLen;
  uint8_t  content[0];  // message body starts from here
} STransMsgHead;

typedef struct {
  int32_t reserved;
  int32_t contLen;
} STransCompMsg;

typedef struct {
  uint32_t timeStamp;
  uint8_t  auth[TSDB_AUTH_LEN];
} STransDigestMsg;

dengyihao's avatar
dengyihao 已提交
174 175 176 177
typedef struct {
  uint8_t user[TSDB_UNI_LEN];
} STransUserMsg;

dengyihao's avatar
dengyihao 已提交
178 179
#pragma pack(pop)

dengyihao's avatar
dengyihao 已提交
180
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
dengyihao's avatar
dengyihao 已提交
181
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
dengyihao's avatar
dengyihao 已提交
182

dengyihao's avatar
dengyihao 已提交
183
#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest))
dengyihao's avatar
dengyihao 已提交
184 185 186 187 188 189
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)

dengyihao's avatar
dengyihao 已提交
190 191
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))

dengyihao's avatar
dengyihao 已提交
192
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
dengyihao's avatar
dengyihao 已提交
193 194 195 196 197 198
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)

dengyihao's avatar
dengyihao 已提交
199 200 201 202 203
int       rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
void      rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
int32_t   rpcCompressRpcMsg(char* pCont, int32_t contLen);
SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead);

dengyihao's avatar
dengyihao 已提交
204 205 206 207 208
int  transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);

dengyihao's avatar
dengyihao 已提交
209 210
void transConnCtxDestroy(STransConnCtx* ctx);

dengyihao's avatar
dengyihao 已提交
211
void transFreeMsg(void* msg);
dengyihao's avatar
dengyihao 已提交
212 213

//
dengyihao's avatar
dengyihao 已提交
214 215 216 217 218 219 220
typedef struct SConnBuffer {
  char* buf;
  int   len;
  int   cap;
  int   left;
} SConnBuffer;

dengyihao's avatar
dengyihao 已提交
221 222
typedef void (*AsyncCB)(uv_async_t* handle);

dengyihao's avatar
dengyihao 已提交
223 224 225 226 227 228
typedef struct {
  void*           pThrd;
  queue           qmsg;
  pthread_mutex_t mtx;  // protect qmsg;
} SAsyncItem;

dengyihao's avatar
dengyihao 已提交
229 230 231 232 233 234 235 236
typedef struct {
  int         index;
  int         nAsync;
  uv_async_t* asyncs;
} SAsyncPool;

SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb);
void        transDestroyAsyncPool(SAsyncPool* pool);
dengyihao's avatar
dengyihao 已提交
237
int         transSendAsync(SAsyncPool* pool, queue* mq);
dengyihao's avatar
dengyihao 已提交
238

dengyihao's avatar
dengyihao 已提交
239 240 241 242 243
int transInitBuffer(SConnBuffer* buf);
int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf);
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);

dengyihao's avatar
dengyihao 已提交
244 245 246 247
// int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen);

// int transUnpackMsg(char *msg, SRpcMsg *pMsg, bool );

dengyihao's avatar
dengyihao 已提交
248
#endif