transComm.h 7.0 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifdef USE_UV

#include <uv.h>
#include "lz4.h"
#include "os.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
#include "tmsg.h"
#include "transportInt.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"

typedef void* queue[2];
/* Private macros. */
#define QUEUE_NEXT(q) (*(queue**)&((*(q))[0]))
#define QUEUE_PREV(q) (*(queue**)&((*(q))[1]))

#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Initialize an empty queue. */
#define QUEUE_INIT(q)    \
  {                      \
    QUEUE_NEXT(q) = (q); \
    QUEUE_PREV(q) = (q); \
  }

/* Return true if the queue has no element. */
#define QUEUE_IS_EMPTY(q) ((const queue*)(q) == (const queue*)QUEUE_NEXT(q))

/* Insert an element at the back of a queue. */
#define QUEUE_PUSH(q, e)           \
  {                                \
    QUEUE_NEXT(e) = (q);           \
    QUEUE_PREV(e) = QUEUE_PREV(q); \
    QUEUE_PREV_NEXT(e) = (e);      \
    QUEUE_PREV(q) = (e);           \
  }

/* Remove the given element from the queue. Any element can be removed at any *
 * time. */
#define QUEUE_REMOVE(e)                 \
  {                                     \
    QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
    QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
  }
dengyihao's avatar
dengyihao 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
#define QUEUE_SPLIT(h, q, n)       \
  do {                             \
    QUEUE_PREV(n) = QUEUE_PREV(h); \
    QUEUE_PREV_NEXT(n) = (n);      \
    QUEUE_NEXT(n) = (q);           \
    QUEUE_PREV(h) = QUEUE_PREV(q); \
    QUEUE_PREV_NEXT(h) = (h);      \
    QUEUE_PREV(q) = (n);           \
  } while (0)

#define QUEUE_MOVE(h, n)        \
  do {                          \
    if (QUEUE_IS_EMPTY(h)) {    \
      QUEUE_INIT(n);            \
    } else {                    \
      queue* q = QUEUE_HEAD(h); \
      QUEUE_SPLIT(h, q, n);     \
    }                           \
  } while (0)
dengyihao's avatar
dengyihao 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104

/* Return the element at the front of the queue. */
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))

/* Return the element at the back of the queue. */
#define QUEUE_TAIL(q) (QUEUE_PREV(q))

/* Iterate over the element of a queue. * Mutating the queue while iterating
 * results in undefined behavior. */
#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q))

/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))

typedef struct {
dengyihao's avatar
dengyihao 已提交
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
  SRpcInfo* pRpc;     // associated SRpcInfo
  SEpSet    epSet;    // ip list provided by app
  void*     ahandle;  // handle provided by app
  // struct SRpcConn* pConn;     // pConn allocated
  tmsg_t   msgType;  // message type
  uint8_t* pCont;    // content provided by app
  int32_t  contLen;  // content length
  // int32_t  code;     // error code
  // int16_t  numOfTry;  // number of try for different servers
  // int8_t   oldInUse;  // server EP inUse passed by app
  // int8_t   redirect;  // flag to indicate redirect
  int8_t   connType;  // connection type
  int64_t  rid;       // refId returned by taosAddRef
  SRpcMsg* pRsp;      // for synchronous API
  tsem_t*  pSem;      // for synchronous API
  char*    ip;
  uint32_t port;
  // SEpSet*          pSet;      // for synchronous API
dengyihao's avatar
dengyihao 已提交
123 124
} SRpcReqContext;

dengyihao's avatar
dengyihao 已提交
125
typedef struct {
dengyihao's avatar
dengyihao 已提交
126 127 128
  SRpcInfo* pTransInst;  // associated SRpcInfo
  SEpSet    epSet;       // ip list provided by app
  void*     ahandle;     // handle provided by app
dengyihao's avatar
dengyihao 已提交
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
  // struct SRpcConn* pConn;     // pConn allocated
  tmsg_t   msgType;  // message type
  uint8_t* pCont;    // content provided by app
  int32_t  contLen;  // content length
  // int32_t  code;     // error code
  // int16_t  numOfTry;  // number of try for different servers
  // int8_t   oldInUse;  // server EP inUse passed by app
  // int8_t   redirect;  // flag to indicate redirect
  int8_t   connType;  // connection type
  int64_t  rid;       // refId returned by taosAddRef
  SRpcMsg* pRsp;      // for synchronous API
  tsem_t*  pSem;      // for synchronous API
  char*    ip;
  uint32_t port;
  // SEpSet*          pSet;      // for synchronous API
} STransConnCtx;

#pragma pack(push, 1)

typedef struct {
  char version : 4;  // RPC version
  char comp : 4;     // compression algorithm, 0:no compression 1:lz4
  char resflag : 2;  // reserved bits
  char spi : 3;      // security parameter index
  char encrypt : 3;  // encrypt algorithm, 0: no encryption

  uint32_t code;  // del later
  uint32_t msgType;
  int32_t  msgLen;
  uint8_t  content[0];  // message body starts from here
} STransMsgHead;

typedef struct {
  int32_t reserved;
  int32_t contLen;
} STransCompMsg;

typedef struct {
  uint32_t timeStamp;
  uint8_t  auth[TSDB_AUTH_LEN];
} STransDigestMsg;

#pragma pack(pop)

dengyihao's avatar
dengyihao 已提交
173
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
dengyihao's avatar
dengyihao 已提交
174
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
dengyihao's avatar
dengyihao 已提交
175

dengyihao's avatar
dengyihao 已提交
176
#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest))
dengyihao's avatar
dengyihao 已提交
177 178 179 180 181 182
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)

dengyihao's avatar
dengyihao 已提交
183 184
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))

dengyihao's avatar
dengyihao 已提交
185
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
dengyihao's avatar
dengyihao 已提交
186 187 188 189 190 191
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)

dengyihao's avatar
dengyihao 已提交
192 193 194 195 196
int       rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
void      rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
int32_t   rpcCompressRpcMsg(char* pCont, int32_t contLen);
SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead);

dengyihao's avatar
dengyihao 已提交
197 198 199 200 201
int  transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);

dengyihao's avatar
dengyihao 已提交
202 203
void transConnCtxDestroy(STransConnCtx* ctx);

dengyihao's avatar
dengyihao 已提交
204
void transFreeMsg(void* msg);
dengyihao's avatar
dengyihao 已提交
205 206 207 208 209 210 211
typedef struct SConnBuffer {
  char* buf;
  int   len;
  int   cap;
  int   left;
} SConnBuffer;

dengyihao's avatar
dengyihao 已提交
212
#endif