transComm.h 9.3 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
dengyihao's avatar
dengyihao 已提交
15 16
#ifndef _TD_TRANSPORT_COMM_H
#define _TD_TRANSPORT_COMM_H
dengyihao's avatar
dengyihao 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20 21
#ifdef __cplusplus
extern "C" {
#endif

dengyihao's avatar
dengyihao 已提交
22 23 24
#include <uv.h>
#include "os.h"
#include "taoserror.h"
dengyihao's avatar
dengyihao 已提交
25
#include "theap.h"
dengyihao's avatar
dengyihao 已提交
26
#include "transLog.h"
dengyihao's avatar
dengyihao 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
#include "transportInt.h"
#include "trpc.h"
#include "tutil.h"

typedef void* queue[2];
/* Private macros. */
#define QUEUE_NEXT(q) (*(queue**)&((*(q))[0]))
#define QUEUE_PREV(q) (*(queue**)&((*(q))[1]))

#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Initialize an empty queue. */
#define QUEUE_INIT(q)    \
  {                      \
    QUEUE_NEXT(q) = (q); \
    QUEUE_PREV(q) = (q); \
  }

/* Return true if the queue has no element. */
#define QUEUE_IS_EMPTY(q) ((const queue*)(q) == (const queue*)QUEUE_NEXT(q))

/* Insert an element at the back of a queue. */
#define QUEUE_PUSH(q, e)           \
  {                                \
    QUEUE_NEXT(e) = (q);           \
    QUEUE_PREV(e) = QUEUE_PREV(q); \
    QUEUE_PREV_NEXT(e) = (e);      \
    QUEUE_PREV(q) = (e);           \
  }

/* Remove the given element from the queue. Any element can be removed at any *
 * time. */
#define QUEUE_REMOVE(e)                 \
  {                                     \
    QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
    QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
  }
dengyihao's avatar
dengyihao 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
#define QUEUE_SPLIT(h, q, n)       \
  do {                             \
    QUEUE_PREV(n) = QUEUE_PREV(h); \
    QUEUE_PREV_NEXT(n) = (n);      \
    QUEUE_NEXT(n) = (q);           \
    QUEUE_PREV(h) = QUEUE_PREV(q); \
    QUEUE_PREV_NEXT(h) = (h);      \
    QUEUE_PREV(q) = (n);           \
  } while (0)

#define QUEUE_MOVE(h, n)        \
  do {                          \
    if (QUEUE_IS_EMPTY(h)) {    \
      QUEUE_INIT(n);            \
    } else {                    \
      queue* q = QUEUE_HEAD(h); \
      QUEUE_SPLIT(h, q, n);     \
    }                           \
  } while (0)
dengyihao's avatar
dengyihao 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95 96

/* Return the element at the front of the queue. */
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))

/* Return the element at the back of the queue. */
#define QUEUE_TAIL(q) (QUEUE_PREV(q))

/* Iterate over the element of a queue. * Mutating the queue while iterating
 * results in undefined behavior. */
#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q))

/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))

97
#define TRANS_RETRY_COUNT_LIMIT 100  // retry count limit
98
#define TRANS_RETRY_INTERVAL    15  // ms retry interval
dengyihao's avatar
dengyihao 已提交
99
#define TRANS_CONN_TIMEOUT      3   // connect timeout
dengyihao's avatar
dengyihao 已提交
100

dengyihao's avatar
formate  
dengyihao 已提交
101
typedef SRpcMsg      STransMsg;
dengyihao's avatar
dengyihao 已提交
102 103
typedef SRpcCtx      STransCtx;
typedef SRpcCtxVal   STransCtxVal;
dengyihao's avatar
formate  
dengyihao 已提交
104
typedef SRpcInfo     STrans;
U
ubuntu 已提交
105 106
typedef SRpcConnInfo STransHandleInfo;

dengyihao's avatar
dengyihao 已提交
107
typedef struct {
dengyihao's avatar
dengyihao 已提交
108 109 110
  SEpSet  epSet;     // ip list provided by app
  void*   ahandle;   // handle provided by app
  tmsg_t  msgType;   // message type
dengyihao's avatar
dengyihao 已提交
111
  int8_t  connType;  // connection type cli/srv
dengyihao's avatar
dengyihao 已提交
112 113
  int64_t rid;       // refId returned by taosAddRef

dengyihao's avatar
dengyihao 已提交
114
  int8_t     retryCount;
dengyihao's avatar
dengyihao 已提交
115 116 117
  STransCtx  appCtx;  //
  STransMsg* pRsp;    // for synchronous API
  tsem_t*    pSem;    // for synchronous API
dengyihao's avatar
dengyihao 已提交
118

dengyihao's avatar
dengyihao 已提交
119
  int hThrdIdx;
dengyihao's avatar
dengyihao 已提交
120 121 122 123 124 125
} STransConnCtx;

#pragma pack(push, 1)

typedef struct {
  char version : 4;  // RPC version
dengyihao's avatar
dengyihao 已提交
126 127 128 129
  char comp : 2;     // compression algorithm, 0:no compression 1:lz4
  char noResp : 2;   // noResp bits, 0: resp, 1: resp
  char persist : 2;  // persist handle,0: no persit, 1: persist handle
  char release : 2;
dengyihao's avatar
dengyihao 已提交
130
  char secured : 2;
dengyihao's avatar
dengyihao 已提交
131
  char spi : 2;
dengyihao's avatar
dengyihao 已提交
132

dengyihao's avatar
dengyihao 已提交
133
  char     user[TSDB_UNI_LEN];
dengyihao's avatar
dengyihao 已提交
134 135
  uint64_t ahandle;  // ahandle assigned by client
  uint32_t code;     // del later
dengyihao's avatar
dengyihao 已提交
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
  uint32_t msgType;
  int32_t  msgLen;
  uint8_t  content[0];  // message body starts from here
} STransMsgHead;

typedef struct {
  int32_t reserved;
  int32_t contLen;
} STransCompMsg;

typedef struct {
  uint32_t timeStamp;
  uint8_t  auth[TSDB_AUTH_LEN];
} STransDigestMsg;

dengyihao's avatar
dengyihao 已提交
151 152
typedef struct {
  uint8_t user[TSDB_UNI_LEN];
dengyihao's avatar
dengyihao 已提交
153
  uint8_t secret[TSDB_PASSWORD_LEN];
dengyihao's avatar
dengyihao 已提交
154 155
} STransUserMsg;

dengyihao's avatar
dengyihao 已提交
156 157
#pragma pack(pop)

dengyihao's avatar
dengyihao 已提交
158
typedef enum { Normal, Quit, Release, Register } STransMsgType;
dengyihao's avatar
dengyihao 已提交
159
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
dengyihao's avatar
dengyihao 已提交
160

dengyihao's avatar
dengyihao 已提交
161
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
dengyihao's avatar
dengyihao 已提交
162
#define RPC_RESERVE_SIZE                (sizeof(STranConnCtx))
dengyihao's avatar
dengyihao 已提交
163

dengyihao's avatar
dengyihao 已提交
164
#define rpcIsReq(type) (type & 1U)
dengyihao's avatar
dengyihao 已提交
165

dengyihao's avatar
dengyihao 已提交
166 167
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))

dengyihao's avatar
dengyihao 已提交
168 169 170
#define TRANS_MSG_OVERHEAD           (sizeof(STransMsgHead))
#define transHeadFromCont(cont)      ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg)       (msg + sizeof(STransMsgHead))
dengyihao's avatar
dengyihao 已提交
171
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
dengyihao's avatar
dengyihao 已提交
172 173
#define transContLenFromMsg(msgLen)  (msgLen - sizeof(STransMsgHead));
#define transIsReq(type)             (type & 1U)
dengyihao's avatar
dengyihao 已提交
174

dengyihao's avatar
dengyihao 已提交
175 176 177 178 179 180 181 182
// int  rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
// void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
//// int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
//
// int  transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
// void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
// bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
// bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
dengyihao's avatar
dengyihao 已提交
183

dengyihao's avatar
dengyihao 已提交
184
void transFreeMsg(void* msg);
dengyihao's avatar
dengyihao 已提交
185 186

//
dengyihao's avatar
dengyihao 已提交
187 188 189 190
typedef struct SConnBuffer {
  char* buf;
  int   len;
  int   cap;
dengyihao's avatar
fix bug  
dengyihao 已提交
191
  int   total;
dengyihao's avatar
dengyihao 已提交
192 193
} SConnBuffer;

dengyihao's avatar
dengyihao 已提交
194 195
typedef void (*AsyncCB)(uv_async_t* handle);

dengyihao's avatar
dengyihao 已提交
196
typedef struct {
dengyihao's avatar
dengyihao 已提交
197 198
  void*         pThrd;
  queue         qmsg;
wafwerar's avatar
wafwerar 已提交
199
  TdThreadMutex mtx;  // protect qmsg;
dengyihao's avatar
dengyihao 已提交
200 201
} SAsyncItem;

dengyihao's avatar
dengyihao 已提交
202 203 204 205 206 207
typedef struct {
  int         index;
  int         nAsync;
  uv_async_t* asyncs;
} SAsyncPool;

dengyihao's avatar
dengyihao 已提交
208
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
dengyihao's avatar
dengyihao 已提交
209
void        transDestroyAsyncPool(SAsyncPool* pool);
dengyihao's avatar
dengyihao 已提交
210
int         transSendAsync(SAsyncPool* pool, queue* mq);
dengyihao's avatar
dengyihao 已提交
211

dengyihao's avatar
dengyihao 已提交
212 213 214 215 216
int  transInitBuffer(SConnBuffer* buf);
int  transClearBuffer(SConnBuffer* buf);
int  transDestroyBuffer(SConnBuffer* buf);
int  transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
bool transReadComplete(SConnBuffer* connBuf);
dengyihao's avatar
dengyihao 已提交
217

dengyihao's avatar
dengyihao 已提交
218
int transSetConnOption(uv_tcp_t* stream);
dengyihao's avatar
dengyihao 已提交
219

dengyihao's avatar
dengyihao 已提交
220 221 222 223 224
void transRefSrvHandle(void* handle);
void transUnrefSrvHandle(void* handle);

void transRefCliHandle(void* handle);
void transUnrefCliHandle(void* handle);
dengyihao's avatar
dengyihao 已提交
225

dengyihao's avatar
dengyihao 已提交
226 227 228
void transReleaseCliHandle(void* handle);
void transReleaseSrvHandle(void* handle);

dengyihao's avatar
dengyihao 已提交
229 230
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
dengyihao's avatar
dengyihao 已提交
231 232
void transSendResponse(const STransMsg* msg);
void transRegisterMsg(const STransMsg* msg);
dengyihao's avatar
formate  
dengyihao 已提交
233
int  transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
U
ubuntu 已提交
234 235 236 237

void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);

dengyihao's avatar
formate  
dengyihao 已提交
238 239
void transCloseClient(void* arg);
void transCloseServer(void* arg);
U
ubuntu 已提交
240

dengyihao's avatar
dengyihao 已提交
241
void  transCtxInit(STransCtx* ctx);
dengyihao's avatar
dengyihao 已提交
242
void  transCtxCleanup(STransCtx* ctx);
dengyihao's avatar
dengyihao 已提交
243 244 245
void  transCtxClear(STransCtx* ctx);
void  transCtxMerge(STransCtx* dst, STransCtx* src);
void* transCtxDumpVal(STransCtx* ctx, int32_t key);
dengyihao's avatar
dengyihao 已提交
246
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType);
dengyihao's avatar
dengyihao 已提交
247

dengyihao's avatar
dengyihao 已提交
248 249 250
// queue sending msgs
typedef struct {
  SArray* q;
wafwerar's avatar
wafwerar 已提交
251
  void (*freeFunc)(const void* arg);
dengyihao's avatar
dengyihao 已提交
252 253 254 255 256 257
} STransQueue;

/*
 * init queue
 * note: queue'size is small, default 1
 */
wafwerar's avatar
wafwerar 已提交
258
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg));
dengyihao's avatar
dengyihao 已提交
259 260 261 262 263 264

/*
 * put arg into queue
 * if queue'size > 1, return false; else return true
 */
bool transQueuePush(STransQueue* queue, void* arg);
dengyihao's avatar
dengyihao 已提交
265 266 267 268
/*
 * the size of queue
 */
int32_t transQueueSize(STransQueue* queue);
dengyihao's avatar
dengyihao 已提交
269 270 271 272 273
/*
 * pop head from queue
 */
void* transQueuePop(STransQueue* queue);
/*
dengyihao's avatar
dengyihao 已提交
274
 * get ith from queue
dengyihao's avatar
dengyihao 已提交
275
 */
dengyihao's avatar
dengyihao 已提交
276 277 278 279 280
void* transQueueGet(STransQueue* queue, int i);
/*
 * rm ith from queue
 */
void* transQueueRm(STransQueue* queue, int i);
dengyihao's avatar
dengyihao 已提交
281 282 283 284 285 286 287 288 289 290 291 292 293
/*
 * queue empty or not
 */
bool transQueueEmpty(STransQueue* queue);
/*
 * clear queue
 */
void transQueueClear(STransQueue* queue);
/*
 * destroy queue
 */
void transQueueDestroy(STransQueue* queue);

dengyihao's avatar
dengyihao 已提交
294 295 296 297 298 299 300 301
/*
 * delay queue based on uv loop and uv timer, and only used in retry
 */
typedef struct STaskArg {
  void* param1;
  void* param2;
} STaskArg;

dengyihao's avatar
dengyihao 已提交
302 303 304 305 306 307 308 309 310 311 312 313 314
typedef struct SDelayTask {
  void (*func)(void* arg);
  void*    arg;
  uint64_t execTime;
  HeapNode node;
} SDelayTask;

typedef struct SDelayQueue {
  uv_timer_t* timer;
  Heap*       heap;
  uv_loop_t*  loop;
} SDelayQueue;

dengyihao's avatar
dengyihao 已提交
315
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
dengyihao's avatar
dengyihao 已提交
316

dengyihao's avatar
dengyihao 已提交
317
void transDQDestroy(SDelayQueue* queue);
dengyihao's avatar
dengyihao 已提交
318

dengyihao's avatar
dengyihao 已提交
319
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
dengyihao's avatar
dengyihao 已提交
320

dengyihao's avatar
dengyihao 已提交
321 322 323 324
/*
 * init global func
 */
void transThreadOnce();
dengyihao's avatar
dengyihao 已提交
325

dengyihao's avatar
dengyihao 已提交
326 327 328 329
#ifdef __cplusplus
}
#endif

dengyihao's avatar
dengyihao 已提交
330
#endif  // _TD_TRANSPORT_COMM_H