diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 97a0c905f8c3584fee0de46b064f79a78db19f15..6878d9f3b15bc74b07f2fa7f82ff868b878bd2e1 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -33,83 +33,42 @@ extern "C" { #define TAOS_SOCKET_TYPE_NAME_TCP "tcp" #define TAOS_SOCKET_TYPE_NAME_UDP "udp" -#define TAOS_ID_ASSIGNED 0 -#define TAOS_ID_FREE 1 -#define TAOS_ID_REALLOCATE 2 - #define TAOS_CONN_SOCKET_TYPE_S() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPS:TAOS_CONN_TCPS) #define TAOS_CONN_SOCKET_TYPE_C() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPC:TAOS_CONN_TCPC) -#define taosSendMsgToPeer(x, y, z) taosSendMsgToPeerH(x, y, z, NULL) -#define taosOpenRpcChann(x, y, z) taosOpenRpcChannWithQ(x,y,z,NULL) -#define taosBuildReqMsg(x, y) taosBuildReqMsgWithSize(x, y, 512) -#define taosBuildRspMsg(x, y) taosBuildRspMsgWithSize(x, y, 512) +extern int tsRpcHeadSize; typedef struct { char *localIp; // local IP used - uint16_t localPort; // local port + uint16_t localPort; // local port char *label; // for debug purpose int numOfThreads; // number of threads to handle connections - void *(*fp)(char *, void *, void *); // function to process the incoming msg - void *qhandle; // queue handle - int bits; // number of bits for sessionId - int numOfChanns; // number of channels - int sessionsPerChann; // number of sessions per channel - int idMgmt; // TAOS_ID_ASSIGNED, TAOS_ID_FREE + void *(*fp)(char type, char *pCont, int contLen, void *handle, int index); // function to process the incoming msg + int sessions; // number of sessions allowed int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int idleTime; // milliseconds, 0 means idle timer is disabled - int noFree; // not free buffer - void (*efp)(int cid); // call back function to process not activated chann - int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, - uint8_t *ckey); // call back to retrieve auth info + char *meterId; // meter ID + char spi; // security parameter index + char encrypt; // encrypt algorithm + char *secret; // key for authentication + char *ckey; // ciphering key + int (*afp) (char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // call back to retrieve auth info } SRpcInit; typedef struct { - int cid; // channel ID - int sid; // session ID - char * meterId; // meter ID - uint32_t peerId; // peer link ID - void * shandle; // pointer returned by taosOpenRpc - void * ahandle; // handle provided by app - char * peerIp; // peer IP string - uint16_t peerPort; // peer port - char spi; // security parameter index - char encrypt; // encrypt algorithm - char * secret; // key for authentication - char * ckey; // ciphering key -} SRpcConnInit; - -extern int tsRpcHeadSize; - -void *taosOpenRpc(SRpcInit *pRpc); - -void taosCloseRpc(void *); - -int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle); - -void taosCloseRpcChann(void *handle, int cid); - -void *taosOpenRpcConn(SRpcConnInit *pInit, uint8_t *code); - -void taosCloseRpcConn(void *thandle); - -void taosStopRpcConn(void *thandle); - -int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle); - -char *taosBuildReqHeader(void *param, char type, char *msg); - -char *taosBuildReqMsgWithSize(void *, char type, int size); - -char *taosBuildRspMsgWithSize(void *, char type, int size); - -int taosSendSimpleRsp(void *thandle, char rsptype, char code); - -int taosSetSecurityInfo(int cid, int sid, char *id, int spi, int encrypt, char *secret, char *ckey); - -void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, uint16_t *peerPort, int *cid, int *sid); + int16_t index; + int16_t numOfIps; + uint32_t ip[TSDB_MAX_REPLICA]; +} SRpcIpSet; + +void *rpcOpen(SRpcInit *pRpc); +void rpcClose(void *); +char *rpcMallocCont(int contLen); +void rpcFreeCont(char *pCont); +void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle); +void rpcSendResponse(void *pConn, void *pCont, int contLen); +void rpcSendSimpleRsp(void *pConn, int32_t code); -int taosGetOutType(void *thandle); #ifdef __cplusplus } diff --git a/src/rpc/inc/tcache.h b/src/rpc/inc/tcache.h new file mode 100644 index 0000000000000000000000000000000000000000..4c6acec096c01db64b09c4f0d18f404b8825f7b6 --- /dev/null +++ b/src/rpc/inc/tcache.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TSCCACHE_H +#define TDENGINE_TSCCACHE_H + +#ifdef __cplusplus +extern "C" { +#endif + +void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer); + +void taosCloseConnCache(void *handle); + +void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user); + +void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TSCACHE_H diff --git a/src/rpc/src/tcache.c b/src/rpc/src/tcache.c new file mode 100644 index 0000000000000000000000000000000000000000..666d069a58c936e9028b46f9e6244923ac4be993 --- /dev/null +++ b/src/rpc/src/tcache.c @@ -0,0 +1,264 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" + +#include "tglobalcfg.h" +#include "tlog.h" +#include "tmempool.h" +#include "tsclient.h" +#include "ttime.h" +#include "ttimer.h" +#include "tutil.h" + +typedef struct _c_hash_t { + uint32_t ip; + uint16_t port; + struct _c_hash_t *prev; + struct _c_hash_t *next; + void * data; + uint64_t time; +} SConnHash; + +typedef struct { + SConnHash ** connHashList; + mpool_h connHashMemPool; + int maxSessions; + int total; + int * count; + int64_t keepTimer; + pthread_mutex_t mutex; + void (*cleanFp)(void *); + void *tmrCtrl; + void *pTimer; +} SConnCache; + +int taosHashConn(void *handle, uint32_t ip, uint16_t port, char *user) { + SConnCache *pObj = (SConnCache *)handle; + int hash = 0; + // size_t user_len = strlen(user); + + hash = ip >> 16; + hash += (unsigned short)(ip & 0xFFFF); + hash += port; + while (*user != '\0') { + hash += *user; + user++; + } + + hash = hash % pObj->maxSessions; + + return hash; +} + +void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64_t time) { + if (pNode == NULL) return; + if (time < pObj->keepTimer + pNode->time) return; + + SConnHash *pPrev = pNode->prev, *pNext; + + while (pNode) { + (*pObj->cleanFp)(pNode->data); + pNext = pNode->next; + pObj->total--; + pObj->count[hash]--; + tscTrace("%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode, + pObj->count[hash]); + taosMemPoolFree(pObj->connHashMemPool, (char *)pNode); + pNode = pNext; + } + + if (pPrev) + pPrev->next = NULL; + else + pObj->connHashList[hash] = NULL; +} + +void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) { + int hash; + SConnHash * pNode; + SConnCache *pObj; + + uint64_t time = taosGetTimestampMs(); + + pObj = (SConnCache *)handle; + if (pObj == NULL || pObj->maxSessions == 0) return NULL; + + if (data == NULL) { + tscTrace("data:%p ip:%p:%d not valid, not added in cache", data, ip, port); + return NULL; + } + + hash = taosHashConn(pObj, ip, port, user); + pNode = (SConnHash *)taosMemPoolMalloc(pObj->connHashMemPool); + pNode->ip = ip; + pNode->port = port; + pNode->data = data; + pNode->prev = NULL; + pNode->time = time; + + pthread_mutex_lock(&pObj->mutex); + + pNode->next = pObj->connHashList[hash]; + if (pObj->connHashList[hash] != NULL) (pObj->connHashList[hash])->prev = pNode; + pObj->connHashList[hash] = pNode; + + pObj->total++; + pObj->count[hash]++; + taosRemoveExpiredNodes(pObj, pNode->next, hash, time); + + pthread_mutex_unlock(&pObj->mutex); + + tscTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pObj->count[hash]); + + return pObj; +} + +void taosCleanConnCache(void *handle, void *tmrId) { + int hash; + SConnHash * pNode; + SConnCache *pObj; + + pObj = (SConnCache *)handle; + if (pObj == NULL || pObj->maxSessions == 0) return; + if (pObj->pTimer != tmrId) return; + + uint64_t time = taosGetTimestampMs(); + + for (hash = 0; hash < pObj->maxSessions; ++hash) { + pthread_mutex_lock(&pObj->mutex); + pNode = pObj->connHashList[hash]; + taosRemoveExpiredNodes(pObj, pNode, hash, time); + pthread_mutex_unlock(&pObj->mutex); + } + + // tscTrace("timer, total connections in cache:%d", pObj->total); + taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer); +} + +void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) { + int hash; + SConnHash * pNode; + SConnCache *pObj; + void * pData = NULL; + + pObj = (SConnCache *)handle; + if (pObj == NULL || pObj->maxSessions == 0) return NULL; + + uint64_t time = taosGetTimestampMs(); + + hash = taosHashConn(pObj, ip, port, user); + pthread_mutex_lock(&pObj->mutex); + + pNode = pObj->connHashList[hash]; + while (pNode) { + if (time >= pObj->keepTimer + pNode->time) { + taosRemoveExpiredNodes(pObj, pNode, hash, time); + pNode = NULL; + break; + } + + if (pNode->ip == ip && pNode->port == port) break; + + pNode = pNode->next; + } + + if (pNode) { + taosRemoveExpiredNodes(pObj, pNode->next, hash, time); + + if (pNode->prev) { + pNode->prev->next = pNode->next; + } else { + pObj->connHashList[hash] = pNode->next; + } + + if (pNode->next) { + pNode->next->prev = pNode->prev; + } + + pData = pNode->data; + taosMemPoolFree(pObj->connHashMemPool, (char *)pNode); + pObj->total--; + pObj->count[hash]--; + } + + pthread_mutex_unlock(&pObj->mutex); + + if (pData) { + tscTrace("%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pObj->count[hash]); + } + + return pData; +} + +void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) { + SConnHash **connHashList; + mpool_h connHashMemPool; + SConnCache *pObj; + + connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash)); + if (connHashMemPool == 0) return NULL; + + connHashList = calloc(sizeof(SConnHash *), maxSessions); + if (connHashList == 0) { + taosMemPoolCleanUp(connHashMemPool); + return NULL; + } + + pObj = malloc(sizeof(SConnCache)); + if (pObj == NULL) { + taosMemPoolCleanUp(connHashMemPool); + free(connHashList); + return NULL; + } + memset(pObj, 0, sizeof(SConnCache)); + + pObj->count = calloc(sizeof(int), maxSessions); + pObj->total = 0; + pObj->keepTimer = keepTimer; + pObj->maxSessions = maxSessions; + pObj->connHashMemPool = connHashMemPool; + pObj->connHashList = connHashList; + pObj->cleanFp = cleanFp; + pObj->tmrCtrl = tmrCtrl; + taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer); + + pthread_mutex_init(&pObj->mutex, NULL); + + return pObj; +} + +void taosCloseConnCache(void *handle) { + SConnCache *pObj; + + pObj = (SConnCache *)handle; + if (pObj == NULL || pObj->maxSessions == 0) return; + + pthread_mutex_lock(&pObj->mutex); + + taosTmrStopA(&(pObj->pTimer)); + + if (pObj->connHashMemPool) taosMemPoolCleanUp(pObj->connHashMemPool); + + tfree(pObj->connHashList); + tfree(pObj->count) + + pthread_mutex_unlock(&pObj->mutex); + + pthread_mutex_destroy(&pObj->mutex); + + memset(pObj, 0, sizeof(SConnCache)); + free(pObj); +} diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index fe2ed91ce38762b1ba3c4790c520d6df5beaceaa..a4a93f14ea9e19b24674238eef1687baa91c697a 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -21,7 +21,7 @@ #include "tmd5.h" #include "tmempool.h" #include "trpc.h" -#include "taosdef.h" +#include "tsdb.h" #include "tsocket.h" #include "ttcpclient.h" #include "ttcpserver.h" @@ -31,78 +31,106 @@ #include "tutil.h" #include "lz4.h" -typedef struct _msg_node { - struct _msg_node *next; - void * ahandle; - int msgLen; -} SMsgNode; +#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest)) +#define rpcHeaderFromCont(cont) ((STaosHeader *) (cont - sizeof(SRpcHeader))) +#define rpcContFromHeader(msg) ( msg + sizeof(SRpcHeader)) +#define rpcMsgLenFromCont(contLen) ( contLen + sizeof(SRpcHeader)) +#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHeader)) +#define rpcIsReq(type) (type & 1U) typedef struct { - void * signature; - int chann; // channel ID - int sid; // session ID - uint32_t ownId; // own link ID - uint32_t peerId; // peer link ID - char meterId[TSDB_UNI_LEN]; - char spi; - char encrypt; - uint8_t secret[TSDB_KEY_LEN]; - uint8_t ckey[TSDB_KEY_LEN]; - uint16_t localPort; // for UDP only - uint32_t peerUid; - uint32_t peerIp; // peer IP - uint16_t peerPort; // peer port - char peerIpstr[20]; // peer IP string - uint16_t tranId; // outgoing transcation ID, for build message - uint16_t outTranId; // outgoing transcation ID - uint16_t inTranId; - uint8_t outType; - char inType; - char closing; - char rspReceived; - void * chandle; // handle passed by TCP/UDP connection layer - void * ahandle; // handle returned by upper app layter - int retry; - int tretry; // total retry - void * pTimer; - void * pIdleTimer; - char * pRspMsg; - char * pQuickRsp; - int rspMsgLen; - SMsgNode * pMsgNode; - SMsgNode * pHead, *pTail; - struct rpc_server *pServer; + int sessions; + int numOfThreads; + int type; + int idleTime; // milliseconds; + uint16_t localPort; + char label[12]; + + char *meterId; // meter ID + char spi; // security parameter index + char encrypt; // encrypt algorithm + char *secret; // key for authentication + char *ckey; // ciphering key + + void *(*fp)(char *, void *ahandle, void *thandle); // FP to call the application + int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // FP to retrieve auth info + SRpcConn *connList; + void *idPool; + void *tmrCtrl; + void *hash; + void *shandle; // returned handle from lower layer during initialization + void *pCache; // connection cache + pthread_mutex_t mutex; +} SRpcInfo; + +typedef struct { + void *signature; + int sid; // session ID + uint32_t ownId; // own link ID + uint32_t peerId; // peer link ID + char meterId[TSDB_UNI_LEN]; + char spi; + char encrypt; + uint8_t secret[TSDB_KEY_LEN]; + uint8_t ckey[TSDB_KEY_LEN]; + uint16_t localPort; // for UDP only + uint32_t peerUid; + uint32_t peerIp; // peer IP + uint16_t peerPort; // peer port + char peerIpstr[20]; // peer IP string + uint16_t tranId; // outgoing transcation ID, for build message + uint16_t outTranId; // outgoing transcation ID + uint16_t inTranId; + uint8_t outType; + char inType; + void *chandle; // handle passed by TCP/UDP connection layer + void *ahandle; // handle provided by upper app layter + int retry; + int tretry; // total retry + void *pTimer; + void *pIdleTimer; + char *pRspMsg; // including header + int rspMsgLen; + char *pReqMsg; // including header + int reqMsgLen; + SRpcInfo *pRpc; } SRpcConn; typedef struct { - int sessions; - void * qhandle; // for scheduler - SRpcConn * connList; - void * idPool; - void * tmrCtrl; - void * hash; - pthread_mutex_t mutex; -} SRpcChann; - -typedef struct rpc_server { - void *shandle; // returned handle from lower layer during initialization - void *qhandle; // for scheduler - int bits; // number of bits for session ID - int mask; - int numOfChanns; - int numOfThreads; - int idMgmt; // ID management method - int type; - int idleTime; // milliseconds; - int noFree; // do not free the request msg when rsp is received - int index; // for UDP server, next thread for new connection - uint16_t localPort; - char label[12]; - void *(*fp)(char *, void *ahandle, void *thandle); - void (*efp)(int); // FP to report error - int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // FP to retrieve auth info - SRpcChann *channList; -} STaosRpc; + SRpcIpSet ipSet; + void *ahandle; + SRpcInfo *pRpc; + char type; + char *pCont; + int contLen; + int numOfRetry; + char msg[]; +} SRpcReqContext; + +typedef struct { + char version : 4; + char comp : 4; + char tcp : 2; + char spi : 3; + char encrypt : 3; + uint16_t tranId; + uint32_t uid; // for unique ID inside a client + uint32_t sourceId; + + uint32_t destId; + uint32_t destIp; + char meterId[TSDB_UNI_LEN]; + uint16_t port; // for UDP only + char empty[1]; + uint8_t msgType; + int32_t msgLen; + uint8_t content[0]; +} SRpcHeader; + +typedef struct { + uint32_t timeStamp; + uint8_t auth[TSDB_AUTH_LEN]; +} SRpcDigest; int tsRpcProgressTime = 10; // milliseocnds @@ -111,13 +139,25 @@ int tsRpcMaxRetry; int tsRpcHeadSize; void *(*taosInitConn[])(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = { - taosInitUdpServer, taosInitUdpClient, taosInitTcpServer, taosInitTcpClient}; + taosInitUdpServer, + taosInitUdpClient, + taosInitTcpServer, + taosInitTcpClient +}; -void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer, - taosCleanUpTcpClient}; +void (*taosCleanUpConn[])(void *thandle) = { + taosCleanUpUdpConnection, + taosCleanUpUdpConnection, + taosCleanUpTcpServer, + taosCleanUpTcpClient +}; int (*taosSendData[])(uint32_t ip, uint16_t port, char *data, int len, void *chandle) = { - taosSendUdpData, taosSendUdpData, taosSendTcpServerData, taosSendTcpClientData}; + taosSendUdpData, + taosSendUdpData, + taosSendTcpServerData, + taosSendTcpClientData +}; void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = { taosOpenUdpConnection, @@ -126,27 +166,30 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = taosOpenTcpClientConnection, }; -void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpServerConnection, taosCloseTcpClientConnection}; - -int taosReSendRspToPeer(SRpcConn *pConn); -void taosProcessTaosTimer(void *, void *); -void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, - void *chandle); -int taosSendDataToPeer(SRpcConn *pConn, char *data, int dataLen); -void taosProcessSchedMsg(SSchedMsg *pMsg); -int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); -int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); - -static int32_t taosCompressRpcMsg(char* pCont, int32_t contLen) { - STaosHeader* pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader)); - int32_t overhead = sizeof(int32_t) * 2; - int32_t finalLen = 0; +void (*taosCloseConn[])(void *chandle) = { + NULL, + NULL, + taosCloseTcpServerConnection, + taosCloseTcpClientConnection +}; + +int rpcResendRspToPeer(SRpcConn *pConn); +void rpcProcessRetryTimer(void *, void *); +void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle); +int rpcSendDataToPeer(SRpcConn *pConn, char *data, int dataLen); +int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); +int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); + +static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { + SRpcHeader* pHeader = rpcHeaderFromCont(pCont); + int32_t overhead = sizeof(int32_t) * 2; + int32_t finalLen = 0; if (!NEEDTO_COMPRESSS_MSG(contLen)) { return contLen; } - char *buf = malloc (contLen + overhead + 8); // 16 extra bytes + char *buf = malloc (contLen + overhead+8); // 16 extra bytes if (buf == NULL) { tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno)); return contLen; @@ -181,131 +224,97 @@ static int32_t taosCompressRpcMsg(char* pCont, int32_t contLen) { return finalLen; } -static STaosHeader* taosDecompressRpcMsg(STaosHeader* pHeader, SSchedMsg* pSchedMsg, int32_t msgLen) { +static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) { int overhead = sizeof(int32_t) * 2; + SRpcHeader *pNewHeader = NULL; + char *pCont = pHeader->content; + + if (pHeader->comp) { + // decompress the content + assert(GET_INT32_VAL(pHeader->content) == 0); - if (pHeader->comp == 0) { - pSchedMsg->msg = (char *)(&(pHeader->destId)); - return pHeader; - } - - // decompress the content - assert(GET_INT32_VAL(pHeader->content) == 0); - - // contLen is original message length before compression applied - int contLen = htonl(GET_INT32_VAL(pHeader->content + sizeof(int32_t))); - - // prepare the temporary buffer to decompress message - char *buf = malloc(sizeof(STaosHeader) + contLen); + // contLen is original message length before compression applied + int contLen = htonl(GET_INT32_VAL(pCont + sizeof(int32_t))); - //tDump(pHeader->content, msgLen); + // prepare the temporary buffer to decompress message + char *buf = rpcMallocCont(contLen); - if (buf) { - int32_t originalLen = LZ4_decompress_safe((const char*)(pHeader->content + overhead), buf + sizeof(STaosHeader), - msgLen - overhead, contLen); + if (buf) { + pNewHeader = rpcHeaderFromCont(buf); + int compLen = rpcContLenFromMsg(pHeader->msgLen) - overhead; + int32_t originalLen = LZ4_decompress_safe((const char*)(pCont + overhead), buf, compLen, contLen); + assert(originalLen == contLen); - memcpy(buf, pHeader, sizeof(STaosHeader)); - free(pHeader); // free the compressed message buffer - - STaosHeader* pNewHeader = (STaosHeader *) buf; - pNewHeader->msgLen = originalLen + (int) sizeof(SIntMsg); - assert(originalLen == contLen); - - pSchedMsg->msg = (char *)(&(pNewHeader->destId)); - //tDump(pHeader->content, contLen); - return pNewHeader; - } else { - tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno)); - pSchedMsg->msg = NULL; + memcpy(pNewHeader, pHeader, sizeof(SRpcHeader)); + pNewHeader->msgLen = rpcMsgLenFromCont(originalLen); + free(pHeader); // free the compressed message buffer + pHeader = pNewHeader; + } else { + tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno)); + } } - return NULL; + return pHeader; } -char *taosBuildReqHeader(void *param, char type, char *msg) { - STaosHeader *pHeader; - SRpcConn * pConn = (SRpcConn *)param; +char *rpcMallocCont(int size) { + char *pMsg = NULL; - if (pConn == NULL || pConn->signature != pConn) { - tError("pConn:%p, connection has to be openned first before building a message", pConn); + size += RPC_MSG_OVERHEAD; + pMsg = (char *)calloc(1, (size_t)size); + if (pMsg == NULL) { + tError("failed to malloc msg, size:%d", size); return NULL; } - pHeader = (STaosHeader *)(msg + sizeof(SMsgNode)); - memset(pHeader, 0, sizeof(STaosHeader)); - pHeader->version = 1; - pHeader->comp = 0; - pHeader->msgType = type; - pHeader->spi = 0; - pHeader->tcp = 0; - pHeader->encrypt = 0; - pHeader->tranId = atomic_add_fetch_16(&pConn->tranId, 1); - if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_16(&pConn->tranId, 1); - - pHeader->sourceId = pConn->ownId; - pHeader->destId = pConn->peerId; - pHeader->port = 0; - - pHeader->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid()); + return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHeader); +} - memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); +void rpcFreeCont(char *cont) { + char *msg = cont - sizeof(SRpcHeader); + free(msg); +} - return (char *)pHeader->content; +static void rpcFreeMsg(char *msg) { + msg -= sizeof(SRpcReqContext); + free(msg); } -char *taosBuildReqMsgWithSize(void *param, char type, int size) { - STaosHeader *pHeader; - char * pMsg; - SRpcConn * pConn = (SRpcConn *)param; +void rpcSendSimpleRsp(void *thandle, int_32 code) { + char *pMsg; + STaosRsp *pRsp; + int msgLen = sizeof(STaosRsp); - if (pConn == NULL || pConn->signature != pConn) { - tError("pConn:%p, connection has to be openned first before building a message", pConn); - return NULL; + if (thandle == NULL) { + tError("connection is gone, response could not be sent"); + return; } - size += sizeof(SMsgNode) + sizeof(STaosHeader) + sizeof(STaosDigest); - pMsg = (char *)malloc((size_t)size); - memset(pMsg, 0, (size_t)size); - pHeader = (STaosHeader *)(pMsg + sizeof(SMsgNode)); - pHeader->version = 1; - pHeader->msgType = type; - pHeader->spi = 0; - pHeader->tcp = 0; - pHeader->encrypt = 0; - pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1); - if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1); - - pHeader->sourceId = pConn->ownId; - pHeader->destId = pConn->peerId; + pMsg = rpcMallocCont(msgLen); + if (pMsg == NULL) return; - pHeader->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid()); + pRsp = (STaosRsp *)pMsg; + pRsp->code = htonl(code); - memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); + taosSendResponse(thandle, pMsg, msgLen); - return (char *)pHeader->content; + return; } -char *taosBuildRspMsgWithSize(void *param, char type, int size) { - STaosHeader *pHeader; - char * pMsg; - SRpcConn * pConn = (SRpcConn *)param; - - if (pConn == NULL || pConn->signature != pConn) { - tError("pConn:%p, connection has to be opened first before building a message", pConn); - return NULL; - } +static void rpcSendQuickRsp(SRpcConn *pConn, char code) { + char msg[RPC_MSG_OVERHEAD + sizeof(STaosRsp)]; + SRpcHeader *pHeader; + int msgLen; - size += sizeof(SMsgNode) + sizeof(STaosHeader) + sizeof(STaosDigest); - pMsg = (char *)malloc((size_t)size); - if (pMsg == NULL) { - tError("pConn:%p, malloc(%d) failed when building a type:%d message", pConn, size, type); - return NULL; - } + pRsp = (STaosRsp *)rpcContFromHeader(msg); + pRsp->code = htonl(code); + msgLen = sizeof(STaosRsp); - memset(pMsg, 0, (size_t)size); - pHeader = (STaosHeader *)pMsg; + // set msg header + memset(msg, 0, sizeof(SRpcHeader)); + pHeader = (SRpcHeader *)msg; pHeader->version = 1; - pHeader->msgType = type; + pHeader->msgType = pConn->inType+1; pHeader->spi = 0; pHeader->tcp = 0; pHeader->encrypt = 0; @@ -315,334 +324,240 @@ char *taosBuildRspMsgWithSize(void *param, char type, int size) { pHeader->uid = 0; memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); - return (char *)pHeader->content; + rpcSendDataToPeer(pConn, (char *)msg, msgLen); } -int taosSendSimpleRsp(void *thandle, char rsptype, char code) { - char *pMsg, *pStart; - int msgLen; - - if (thandle == NULL) { - tError("connection is gone, response could not be sent"); - return -1; - } - - pStart = taosBuildRspMsgWithSize(thandle, rsptype, 32); - if (pStart == NULL) { - tError("build rsp msg error, return null prt"); - return -1; - } - pMsg = pStart; - - *pMsg = code; - pMsg++; - - msgLen = (int)(pMsg - pStart); - taosSendMsgToPeer(thandle, pStart, msgLen); - - return msgLen; -} - -int taosSendQuickRsp(void *thandle, char rsptype, char code) { - char * pCont; - int contLen; - STaosHeader *pHeader; - char * msg; - int msgLen; - SRpcConn * pConn = (SRpcConn *)thandle; +void *rpcOpen(SRpcInit *pInit) { + SRpcInfo *pRpc; - pCont = taosBuildRspMsgWithSize(thandle, rsptype, 32); - if (pCont == NULL) return 0; - - *pCont = code; - contLen = 1; + tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime; + tsRpcHeadSize = RPC_MSG_OVERHEAD; - pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader)); - msg = (char *)pHeader; - msgLen = contLen + (int32_t)sizeof(STaosHeader); + pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); + if (pRpc == NULL) return NULL; - if (pConn->spi) { - // add auth part - pHeader->spi = pConn->spi; - STaosDigest *pDigest = (STaosDigest *)(pCont + contLen); - pDigest->timeStamp = htonl(taosGetTimestampSec()); - msgLen += sizeof(STaosDigest); - pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); - taosBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); - } else { - pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); + strcpy(pRpc->label, pInit->label); + pRpc->fp = pInit->fp; + pRpc->type = pInit->connType; + pRpc->idleTime = pInit->idleTime; + pRpc->numOfThreads = pInit->numOfThreads; + if (pRpc->numOfThreads > TSDB_MAX_RPC_THREADS) { + pRpc->numOfThreads = TSDB_MAX_RPC_THREADS; } - tfree(pConn->pQuickRsp); - pConn->pQuickRsp = msg; - taosSendDataToPeer(pConn, (char *)pHeader, msgLen); - - return msgLen; -} + pRpc->localPort = pInit->localPort; + pRpc->afp = pInit->afp; + pRpc->sessions = pInit->session; + strcpy(pRpc->meterId, pInit->meterId); + pRpc->spi = pInit->spi; + strcpy(pRpc->secret, pInit->secret); + strcpy(pRpc->ckey, pInit->ckey); + pRpc->afp = pInit->afp; + + pRpc->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->numOfThreads, + taosProcessDataFromPeer, pRpc); + if (pRpc->shandle == NULL) { + tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort); + taosCloseRpc(pRpc); + return NULL; + } -void *taosOpenRpc(SRpcInit *pRpc) { - STaosRpc *pServer; + size_t size = sizeof(SRpcConn) * sessions; + pRpc->connList = (SRpcConn *)calloc(1, size); + if (pRpc->connList == NULL) { + tError("%s failed to allocate memory for taos connections, size:%d", pRpc->label, size); + taosCloseRpc(pRpc); + return NULL; + } - tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime; - tsRpcHeadSize = sizeof(STaosHeader) + sizeof(SMsgNode); - - pServer = (STaosRpc *)malloc(sizeof(STaosRpc)); - if (pServer == NULL) return NULL; - memset(pServer, 0, sizeof(STaosRpc)); - - pServer->bits = pRpc->bits; - pServer->mask = (1 << (pRpc->bits)) - 1; - pServer->numOfChanns = pRpc->numOfChanns; - strcpy(pServer->label, pRpc->label); - pServer->fp = pRpc->fp; - pServer->idMgmt = pRpc->idMgmt; - pServer->type = pRpc->connType; - pServer->idleTime = pRpc->idleTime; - pServer->noFree = pRpc->noFree; - pServer->numOfThreads = pRpc->numOfThreads; - if (pServer->numOfThreads > TSDB_MAX_RPC_THREADS) { - pServer->numOfThreads = TSDB_MAX_RPC_THREADS; - pRpc->numOfThreads = TSDB_MAX_RPC_THREADS; + pRpc->idPool = taosInitIdPool(sessions); + if (pRpc->idPool == NULL) { + tError("%s failed to init ID pool", pRpc->label); + taosCloseRpc(pRpc); + return NULL; } - pServer->localPort = pRpc->localPort; - pServer->qhandle = pRpc->qhandle; - pServer->efp = pRpc->efp; - pServer->afp = pRpc->afp; - - int size = (int)sizeof(SRpcChann) * pRpc->numOfChanns; - pServer->channList = (SRpcChann *)malloc((size_t)size); - if (pServer->channList == NULL) { - tError("%s, failed to malloc channList", pRpc->label); - tfree(pServer); + + pRpc->tmrCtrl = taosTmrInit(sessions * 2 + 1, 50, 10000, pRpc->label); + if (pRpc->tmrCtrl == NULL) { + tError("%s failed to init timers", pRpc->label); + taosCloseRpc(pRpc); return NULL; } - memset(pServer->channList, 0, (size_t)size); - pServer->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->numOfThreads, - taosProcessDataFromPeer, pServer); - if (pServer->shandle == NULL) { - tError("%s, failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort); - taosCloseRpc(pServer); + pRpc->hash = taosInitStrHash(sessions, sizeof(pRpc), taosHashString); + if (pRpc->hash == NULL) { + tError("%s failed to init string hash", pRpc->label); + taosCloseRpc(pRpc); return NULL; } - if (pServer->numOfChanns == 1) { - int retVal = taosOpenRpcChann(pServer, 0, pRpc->sessionsPerChann); - if (0 != retVal) { - tError("%s, failed to open rpc chann", pRpc->label); - taosCloseRpc(pServer); - return NULL; - } + pRpc->pCahche = taosOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000); + if ( pRpc->pCache == NULL ) { + tError("%s failed to init connection cache", pRpc->label); + taosCloseRpc(pRpc); + return NULL; } + pthread_mutex_init(&pRpc->mutex, NULL); + tTrace("%s RPC is openned, numOfThreads:%d", pRpc->label, pRpc->numOfThreads); - return pServer; + return pRpc; } -int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle) { - STaosRpc * pServer = (STaosRpc *)handle; - SRpcChann *pChann; - - tTrace("cid:%d, handle:%p open rpc chann", cid, handle); - - if (pServer == NULL) return -1; - if (cid >= pServer->numOfChanns || cid < 0) { - tError("%s: cid:%d, chann is out of range, max:%d", pServer->label, cid, pServer->numOfChanns); - return -1; - } +void rpcClose(void *param) { + SRpcInfo *pRpc = (SRpcInfo *)param; - pChann = pServer->channList + cid; - memset(pChann, 0, sizeof(SRpcChann)); - - size_t size = sizeof(SRpcConn) * sessions; - pChann->connList = (SRpcConn *)calloc(1, size); - if (pChann->connList == NULL) { - tError("%s cid:%d, failed to allocate memory for taos connections, size:%d", pServer->label, cid, size); - return -1; - } + (*taosCleanUpConn[pRpc->type])(pRpc->shandle); - if (pServer->idMgmt == TAOS_ID_FREE) { - pChann->idPool = taosInitIdPool(sessions); - if (pChann->idPool == NULL) { - tError("%s cid:%d, failed to init ID pool", pServer->label, cid); - return -1; + for (int i = 0; i < pRpc->sessions; ++i) { + if (pRpc->connList[i].signature != NULL) { + taosCloseRpcConn((void *)(pRpc->connList + i)); } } - pChann->tmrCtrl = taosTmrInit(sessions * 2 + 1, 50, 10000, pServer->label); - if (pChann->tmrCtrl == NULL) { - tError("%s cid:%d, failed to init timers", pServer->label, cid); - return -1; - } - - pChann->hash = taosInitStrHash(sessions, sizeof(pChann), taosHashString); - if (pChann->hash == NULL) { - tError("%s cid:%d, failed to init string hash", pServer->label, cid); - return -1; - } + taosCleanUpStrHash(pRpc->hash); + taosTmrCleanUp(pRpc->tmrCtrl); + taosIdPoolCleanUp(pRpc->idPool); + taosCloseConnCache(pRpc->pCache); - pthread_mutex_init(&pChann->mutex, NULL); - pChann->sessions = sessions; - - pChann->qhandle = qhandle ? qhandle : pServer->qhandle; - - return TSDB_CODE_SUCCESS; + tfree(pRpc->connList); + pthread_mutex_destroy(&pRpc->mutex); + tfree(pRpc); } -void taosCloseRpcChann(void *handle, int cid) { - STaosRpc * pServer = (STaosRpc *)handle; - SRpcChann *pChann; - - tTrace("cid:%d, handle:%p close rpc chann", cid, handle); +static SRpcConn *rpcOpenConn(SRpcConnInit *pInit) { + SRpcConn *pConn; + SRpcInfo *pRpc = (SRpcInfo *)pInit->shandle; - if (pServer == NULL) return; - if (cid >= pServer->numOfChanns || cid < 0) { - tError("%s cid:%d, chann is out of range, max:%d", pServer->label, cid, pServer->numOfChanns); - return; - } + if ( (uint8_t)(rpcGetConn(pInit->sid, pInit->meterId, pRpc, &pConn, 1, NULL)) != 0 ) + return NULL; - pChann = pServer->channList + cid; + if (pConn->peerId == 0) pConn->peerId = pRpc->peerId; + strcpy(pConn->peerIpstr, pInit->peerIp); + pConn->peerIp = inet_addr(pInit->peerIp); + pConn->peerPort = pInit->peerPort; + pConn->ahandle = pInit->ahandle; + pConn->spi = pRpc->spi; + pConn->encrypt = pRpc->encrypt; + if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); - for (int i = 0; i < pChann->sessions; ++i) { - if (pChann->connList[i].signature != NULL) { - taosCloseRpcConn((void *)(pChann->connList + i)); + // if it is client, it shall set up connection first + if (taosOpenConn[pRpc->type]) { + pConn->chandle = (*taosOpenConn[pRpc->type])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort); + if (pConn->chandle) { + tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label, + pConn, pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort); + } else { + tError("%s pConn:%p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn, + pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort); + terrorno = TSDB_CODE_NETWORK_UNAVAIL; + rpcCloseConn(pConn); + pConn = NULL; } } - taosCleanUpStrHash(pChann->hash); - taosTmrCleanUp(pChann->tmrCtrl); - taosIdPoolCleanUp(pChann->idPool); - tfree(pChann->connList); - pthread_mutex_destroy(&pChann->mutex); - - memset(pChann, 0, sizeof(SRpcChann)); + return pConn; } -void taosCloseRpcConn(void *thandle) { +static void rpcCloseConn(void *thandle) { SRpcConn *pConn = (SRpcConn *)thandle; if (pConn == NULL) return; - STaosRpc *pServer = pConn->pServer; - if (pConn->signature != thandle || pServer == NULL) return; - if (pConn->closing) return; - SRpcChann *pChann = pServer->channList + pConn->chann; + SRpcInfo *pRpc = pConn->pRpc; + if (pConn->signature != thandle || pRpc == NULL) return; - pthread_mutex_lock(&pChann->mutex); + pthread_mutex_lock(&pRpc->mutex); - pConn->closing = 1; pConn->signature = NULL; - if (taosCloseConn[pServer->type]) (*taosCloseConn[pServer->type])(pConn->chandle); + if (taosCloseConn[pRpc->type]) (*taosCloseConn[pRpc->type])(pConn->chandle); taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pIdleTimer); - tfree(pConn->pRspMsg); - - if (pServer->noFree == 0) free(pConn->pMsgNode); - pConn->pMsgNode = NULL; - - tfree(pConn->pQuickRsp); - - SMsgNode *pMsgNode; - while (pConn->pHead) { - pMsgNode = pConn->pHead; - pConn->pHead = pConn->pHead->next; - memset(pMsgNode, 0, sizeof(SMsgNode)); - if (pServer->noFree == 0) free(pMsgNode); - } + rpcFreeMsg(pConn->pRspMsg); + rpcFreeMsg(pConn-pReqMsg); char hashstr[40] = {0}; sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId); - taosDeleteStrHash(pChann->hash, hashstr); + taosDeleteStrHash(pRpc->hash, hashstr); - tTrace("%s cid:%d sid:%d id:%s, TAOS connection closed, pConn:%p", pServer->label, pConn->chann, pConn->sid, + tTrace("%s pConn:%p, TAOS connection closed", pRpc->label, pConn->sid, pConn->meterId, pConn); int freeId = pConn->sid; memset(pConn, 0, sizeof(SRpcConn)); - if (pChann->idPool) taosFreeId(pChann->idPool, freeId); + if (pRpc->idPool) taosFreeId(pRpc->idPool, freeId); - pthread_mutex_unlock(&pChann->mutex); + pthread_mutex_unlock(&pRpc->mutex); } -int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcConn **ppConn, char req, char *hashstr) { +static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr) { SRpcConn * pConn = NULL; - SRpcChann *pChann; - - if (pServer == NULL) return -1; - pChann = pServer->channList + chann; - - if (pServer->idMgmt == TAOS_ID_FREE) { - if (sid == 0) { - if (req) { - int osid = sid; - SRpcConn **ppConn = (SRpcConn **)taosGetStrHashData(pChann->hash, hashstr); - if (ppConn) pConn = *ppConn; - if (pConn == NULL) { - sid = taosAllocateId(pChann->idPool); - if (sid <= 0) { - tError("%s cid:%d, maximum number of sessions:%d is reached", pServer->label, chann, pChann->sessions); - return TSDB_CODE_MAX_SESSIONS; - } else { - tTrace("%s cid:%d sid:%d, ID allocated, used:%d, old id:%d", pServer->label, chann, sid, - taosIdPoolNumOfUsed(pChann->idPool), osid); - } + + if (pRpc == NULL) return -1; + + if (sid == 0) { + if (req) { + int osid = sid; + SRpcConn **ppConn = (SRpcConn **)taosGetStrHashData(pRpc->hash, hashstr); + if (ppConn) pConn = *ppConn; + if (pConn == NULL) { + sid = taosAllocateId(pRpc->idPool); + if (sid <= 0) { + tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions); + return TSDB_CODE_MAX_SESSIONS; } else { - sid = pConn->sid; - tTrace("%s cid:%d sid:%d id:%s, session is already there", pServer->label, pConn->chann, pConn->sid, - pConn->meterId); + tTrace("%s sid:%d, ID allocated, used:%d, old id:%d", pRpc->label, sid, + taosIdPoolNumOfUsed(pRpc->idPool), osid); } } else { - return TSDB_CODE_UNEXPECTED_RESPONSE; + sid = pConn->sid; + tTrace("%s sid:%d id:%s, session is already there", pRpc->label, pConn->sid, + pConn->meterId); } } else { - if (pChann->connList[sid].signature == NULL) { - tError("%s cid:%d, sid:%d session is already released", pServer->label, chann, sid); - return TSDB_CODE_INVALID_VALUE; - } - } - } + return TSDB_CODE_UNEXPECTED_RESPONSE; + } + } else { + if (pRpc->connList[sid].signature == NULL) { + tError("%s sid:%d session is already released", pRpc->label, sid); + return TSDB_CODE_INVALID_VALUE; + } + } - pConn = pChann->connList + sid; - if (pChann == NULL || pChann->connList == NULL) { - tTrace("%s cid:%d sid:%d, connlist is null, received:%s", pServer->label, chann, sid, meterId); - return TSDB_CODE_MISMATCHED_METER_ID; - } + pConn = pRpc->connList + sid; if (pConn->signature == NULL) { memset(pConn, 0, sizeof(SRpcConn)); pConn->signature = pConn; memcpy(pConn->meterId, meterId, tListLen(pConn->meterId)); - pConn->pServer = pServer; - pConn->chann = chann; + pConn->pRpc = pRpc; pConn->sid = sid; pConn->tranId = (uint16_t)(rand() & 0xFFFF); - pConn->ownId = htonl((uint32_t)((pConn->chann << pServer->bits) + pConn->sid)); - if (pServer->afp) { - int ret = (*pServer->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey); + pConn->ownId = htonl(pConn->sid); + if (pRpc->afp) { + int ret = (*pRpc->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey); if (ret != 0) { - tWarn("%s cid:%d sid:%d id:%s, meterId not there pConn:%p", pServer->label, chann, sid, pConn->meterId, pConn); - taosFreeId(pChann->idPool, sid); // sid shall be released + tWarn("%s pConn:%p, meterId not there", pRpc->label, pConn); + taosFreeId(pRpc->idPool, sid); // sid shall be released memset(pConn, 0, sizeof(SRpcConn)); return ret; } } - if ((pServer->type == TAOS_CONN_UDPC || pServer->type == TAOS_CONN_UDPS) && pServer->numOfThreads > 1 && - pServer->localPort) { + if ((pRpc->type == TAOS_CONN_UDPC || pRpc->type == TAOS_CONN_UDPS) && pRpc->numOfThreads > 1 && + pRpc->localPort) { // UDP server, assign to new connection - pServer->index = (pServer->index + 1) % pServer->numOfThreads; - pConn->localPort = (int16_t)(pServer->localPort + pServer->index); + pRpc->index = (pRpc->index + 1) % pRpc->numOfThreads; + pConn->localPort = (int16_t)(pRpc->localPort + pRpc->index); } - taosAddStrHash(pChann->hash, hashstr, (char *)&pConn); - tTrace("%s cid:%d sid:%d id:%s, TAOS connection is allocated, localPort:%d pConn:%p", pServer->label, chann, sid, - pConn->meterId, pConn->localPort, pConn); + taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); + tTrace("%s pConn:%p, TAOS connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid); } else { if (memcmp(pConn->meterId, meterId, tListLen(pConn->meterId)) != 0) { - tTrace("%s cid:%d sid:%d id:%s, meterId is not matched, received:%s", pServer->label, chann, sid, pConn->meterId, - meterId); + tTrace("%s pConn:%p, meterId:%s is not matched, received:%s", pRpc->label, pConn, pConn->meterId, meterId); return TSDB_CODE_MISMATCHED_METER_ID; } } @@ -652,215 +567,159 @@ int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcCon return TSDB_CODE_SUCCESS; } -void *taosOpenRpcConn(SRpcConnInit *pInit, uint8_t *code) { - SRpcConn *pConn; - STaosRpc *pServer = (STaosRpc *)pInit->shandle; - - *code = (uint8_t)(taosGetRpcConn(pInit->cid, pInit->sid, pInit->meterId, pServer, &pConn, 1, NULL)); - if (*code == TSDB_CODE_MAX_SESSIONS) *code = TSDB_CODE_MAX_CONNECTIONS; - if (*code != TSDB_CODE_SUCCESS) return NULL; +static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { + SRpcHeader *pHeader = (SRpcHeader *)msg; + SRpcInfo *pRpc = pConn->pRpc; + int code = 0; - if (pConn->peerId == 0) pConn->peerId = pInit->peerId; + if (pConn->spi == 0 ) return 0; - strcpy(pConn->peerIpstr, pInit->peerIp); - pConn->peerIp = inet_addr(pInit->peerIp); - pConn->peerPort = pInit->peerPort; - pConn->ahandle = pInit->ahandle; - pConn->spi = pInit->spi; - pConn->encrypt = pInit->encrypt; - if (pConn->spi) memcpy(pConn->secret, pInit->secret, TSDB_KEY_LEN); + if (pHeader->spi == pConn->spi) { + // authentication + SRpcDigest *pDigest = (SRpcDigest *)((char *)pHeader + msgLen - sizeof(SRpcDigest)); - // if it is client, it shall set up connection first - if (taosOpenConn[pServer->type]) { - pConn->chandle = (*taosOpenConn[pServer->type])(pServer->shandle, pConn, pConn->peerIpstr, pConn->peerPort); - if (pConn->chandle) { - tTrace("%s cid:%d sid:%d id:%s, nw connection is set up, ip:%s:%hu localPort:%d pConn:%p", pServer->label, - pConn->chann, pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort, pConn); + int32_t delta; + delta = (int32_t)htonl(pDigest->timeStamp); + delta -= (int32_t)taosGetTimestampSec(); + if (abs(delta) > 900) { + tWarn("%s pConn:%p, time diff:%d is too big, msg discarded, timestamp:%d", pRpc->label, pConn, + delta, htonl(pDigest->timeStamp)); + code = TSDB_CODE_INVALID_TIME_STAMP; } else { - tError("%s cid:%d sid:%d id:%s, failed to set up nw connection to ip:%s:%hu", pServer->label, pConn->chann, - pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort); - *code = TSDB_CODE_NETWORK_UNAVAIL; - taosCloseRpcConn(pConn); - pConn = NULL; + if (rpcAuthenticateMsg((uint8_t *)pHeader, dataLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { + char ipstr[24]; + tinet_ntoa(ipstr, ip); + mLError("id:%s from %s, authentication failed", pHeader->meterId, ipstr); + tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn); + code = TSDB_CODE_AUTH_FAILURE; + } else { + pHeader->msgLen -= sizeof(SRpcDigest); + } + } + } else { + // if it is request or response with code 0, msg shall be discarded + if (rpcIsReq(pHeader->msgType) || (pHeader->content[0] == 0)) { + tTrace("%s pConn:%p, auth spi not matched, msg discarded", pRpc->label, pConn); + code = TSDB_CODE_AUTH_FAILURE; } } - return pConn; -} - -void taosCloseRpc(void *param) { - STaosRpc *pServer = (STaosRpc *)param; - - (*taosCleanUpConn[pServer->type])(pServer->shandle); - - for (int cid = 0; cid < pServer->numOfChanns; ++cid) taosCloseRpcChann(pServer, cid); - - tfree(pServer->channList); - tfree(pServer); + return code; } -int taosSetSecurityInfo(int chann, int sid, char *id, int spi, int encrypt, char *secret, char *ckey) { - /* - SRpcConn *pConn; - - pConn = connList[chann*tsSessionsPerChann + sid]; +static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) { + int code = 0; - if ( pConn == NULL ) { - pConn = (SRpcConn *)sizeof(SRpcConn); - - if ( pConn == NULL ) { - tError("failed to allocate memory for taosConn"); - return -1; + if (pConn->peerId == 0) { + pConn->peerId = pHeader->sourceId; + } else { + if (pConn->peerId != pHeader->sourceId) { + tTrace("%s pConn:%p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn, + pConn->peerId, pHeader->sourceId); + return TSDB_CODE_INVALID_VALUE; } - - memset(pConn, 0, sizeof(SRpcConn)); - pConn->chann = chann; - pConn->sid = sid; } - pConn->spi = spi; - pConn->encrypt = encrypt; - memcpy(pConn->secret, pConn->secret, TSDB_KEY_LEN); - memcpy(pConn->cipheringKey, ckey, TSDB_KEY_LEN); - memcpy(pConn->meterId, id, TSDB_TABLE_ID_LEN); - */ - return -1; -} - -int taosSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) { - int writtenLen = 0; - STaosRpc * pServer = pConn->pServer; - STaosHeader *pHeader = (STaosHeader *)data; - - if (pConn->signature != pConn || pServer == NULL) return -1; + if (pConn->inTranId == pHeader->tranId) { + if (pConn->inType == pHeader->msgType) { + tTrace("%s pConn:%p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHeader->msgType]); + taosSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); + } else if (pConn->inType == 0) { + tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn, + taosMsg[pHeader->msgType], pConn->inTranId); + rpcResendRspToPeer(pConn); + } else { + tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHeader->msgType]); + } - if (pHeader->msgType & 1) { - if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace("%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d pConn:%p", - pServer->label, pConn->chann, pConn->sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->peerIpstr, - pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId, pConn); - } else { - if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace( - "%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d pConn:%p", - pServer->label, pConn->chann, pConn->sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->peerIpstr, - pConn->peerPort, (uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId, - pConn); - } + // do not reply any message + return TSDB_CODE_ALREADY_PROCESSED; + } - writtenLen = (*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle); + if (pConn->inType != 0) { + tTrace("%s pConn:%p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn, + pConn->inTranId, pHeader->tranId); + return TSDB_CODE_LAST_SESSION_NOT_FINISHED; + } - if (writtenLen != dataLen) - tError("%s cid:%d sid:%d id:%s, dataLen:%d writtenLen:%d, not good, reason:%s", pServer->label, pConn->chann, - pConn->sid, pConn->meterId, dataLen, writtenLen, strerror(errno)); - // assert ( writtenLen == dataLen ); - tDump(data, dataLen); + pConn->inTranId = pHeader->tranId; + pConn->inType = pHeader->msgType; - return 0; + return 0; } -void taosProcessResponse(SRpcConn *pConn) { - STaosHeader *pHeader; - char * msg = NULL; - int msgLen = 0; +static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) { + pConn->peerId = pHeader->sourceId; - if (pConn == NULL) return; - STaosRpc *pServer = pConn->pServer; - if (pConn->signature != pConn || pServer == NULL) return; - SRpcChann *pChann = pServer->channList + pConn->chann; - - pthread_mutex_lock(&pChann->mutex); - - pConn->outType = 0; - pConn->rspReceived = 0; - if (pServer->noFree == 0) tfree(pConn->pMsgNode); - pConn->pMsgNode = NULL; + if (pConn->outType == 0 || pConn->pContext == NULL) { + return TSDB_CODE_UNEXPECTED_RESPONSE; + } - if (pConn->pHead) { - SMsgNode *pMsgNode = pConn->pHead; - // assert ( pMsgNode->msgLen >= sizeof(STaosHeader) && pMsgNode->msgLen < RPC_MAX_UDP_SIZE); - if (pMsgNode->msgLen >= sizeof(STaosHeader)) { - pConn->pMsgNode = pMsgNode; - pConn->pHead = pMsgNode->next; - if (pMsgNode->ahandle) pConn->ahandle = pMsgNode->ahandle; + if (pHeader->tranId != pConn->outTranId) { + return TSDB_CODE_INVALID_TRAN_ID; + } - pHeader = (STaosHeader *)((char *)pMsgNode + sizeof(SMsgNode)); - pConn->outType = pHeader->msgType; - pConn->outTranId = pHeader->tranId; + if (pHeader->msgType != pConn->outType + 1) { + return TSDB_CODE_INVALID_RESPONSE_TYPE; + } - msg = (char *)pHeader; - msgLen = pMsgNode->msgLen; + if (*pHeader->content == TSDB_CODE_NOT_READY) { + return = TSDB_CODE_ALREADY_PROCESSED; + } + taosTmrStopA(&pConn->pTimer); + pConn->retry = 0; + + if (*pHeader->content == TSDB_CODE_ACTION_IN_PROGRESS || pHeader->tcp) { + if (pConn->tretry <= tsRpcMaxRetry) { + tTrace("%s pConn:%p, peer is still processing the transaction", pRpc->label, pConn); + pConn->tretry++; + taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer); + return TSDB_CODE_ALREADY_PROCESSED; } else { - tError("%s cid:%d sid:%d id:%s, invalid msgLen:%d pConn:%p", pServer->label, pConn->chann, pConn->sid, - pConn->meterId, pMsgNode->msgLen, pConn); - pConn->pHead = NULL; + // peer still in processing, give up + *pHeader->content = TSDB_CODE_TOO_SLOW; } - - if (pConn->pHead == NULL) pConn->pTail = NULL; - } - - if (msg) { - taosSendDataToPeer(pConn, msg, msgLen); - taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer); } - pthread_mutex_unlock(&pChann->mutex); - + pConn->tretry = 0; + pConn->outType = 0; + pConn->pReqMsg = NULL; + pConn->pReqMsgLen = 0; } -int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pServer, int dataLen, uint32_t ip, +static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pRpc, int dataLen, uint32_t ip, uint16_t port, void *chandle) { - int chann, sid, code = 0; + int sid, code = 0; SRpcConn * pConn = NULL; - SRpcChann *pChann; int msgLen; char hashstr[40] = {0}; - // int reSend = 0; *ppConn = NULL; - uint32_t destId = htonl(pHeader->destId); - chann = destId >> pServer->bits; - sid = destId & pServer->mask; + uint32_t sid = htonl(pHeader->destId); if (pHeader->msgType >= TSDB_MSG_TYPE_MAX || pHeader->msgType <= 0) { - tTrace("%s cid:%d sid:%d, invalid message type:%d", pServer->label, chann, sid, pHeader->msgType); + tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHeader->msgType); return TSDB_CODE_INVALID_MSG_TYPE; } - msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen); - if (dataLen != msgLen) { - tTrace("%s cid:%d sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pServer->label, chann, sid, + pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen); + if (dataLen != pHeader->msgLen) { + tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid, taosMsg[pHeader->msgType], dataLen, msgLen); return TSDB_CODE_INVALID_MSG_LEN; } - if (chann < 0 || chann >= pServer->numOfChanns) { - tTrace("%s cid:%d sid:%d, chann is out of range, max:%d, %s discarded", pServer->label, chann, sid, - pServer->numOfChanns, taosMsg[pHeader->msgType]); + if (sid < 0 || sid >= pRpc->sessions) { + tTrace("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, + pRpc->sessions, taosMsg[pHeader->msgType]); return TSDB_CODE_INVALID_SESSION_ID; } - pChann = pServer->channList + chann; - if (pChann->sessions == 0) { - tTrace("%s cid:%d, chann is not activated yet, %s discarded", pServer->label, chann, taosMsg[pHeader->msgType]); - if (pServer->efp) (*(pServer->efp))(chann); - return TSDB_CODE_NOT_ACTIVE_SESSION; - } - - if (sid < 0 || sid >= pChann->sessions) { - tTrace("%s cid:%d sid:%d, sid is out of range, max sid:%d, %s discarded", pServer->label, chann, sid, - pChann->sessions, taosMsg[pHeader->msgType]); - return TSDB_CODE_INVALID_SESSION_ID; - } - - // if ( pHeader->tcp ) return TSDB_CODE_ALREADY_PROCESSED; if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHeader->uid, pHeader->sourceId); - pthread_mutex_lock(&pChann->mutex); - - code = taosGetRpcConn(chann, sid, pHeader->meterId, pServer, &pConn, pHeader->msgType & 1, hashstr); - if (code != TSDB_CODE_SUCCESS) goto _exit; + code = rpcGetConn(sid, pHeader->meterId, pRpc, &pConn, rpcIsReq(pHeader->msgType), hashstr); + if (code != TSDB_CODE_SUCCESS) return code; *ppConn = pConn; sid = pConn->sid; @@ -873,175 +732,44 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer } if (pHeader->uid) pConn->peerUid = pHeader->uid; - if (port) pConn->peerPort = port; - if (pHeader->port) // port maybe changed by the peer pConn->peerPort = pHeader->port; - if (chandle) pConn->chandle = chandle; if (pHeader->tcp) { - tTrace("%s cid:%d sid:%d id:%s, content will be transfered via TCP pConn:%p", pServer->label, chann, sid, - pConn->meterId, pConn); - if (pConn->outType) taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer); - code = TSDB_CODE_ALREADY_PROCESSED; - goto _exit; + tTrace("%s pConn:%p, content will be transfered via TCP", pRpc->label, pConn); + if (pConn->outType) taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); + return TSDB_CODE_ALREADY_PROCESSED; } - if (pConn->spi != 0) { - if (pHeader->spi == pConn->spi) { - // authentication - STaosDigest *pDigest = (STaosDigest *)((char *)pHeader + dataLen - sizeof(STaosDigest)); - - int32_t delta; - delta = (int32_t)htonl(pDigest->timeStamp); - delta -= (int32_t)taosGetTimestampSec(); - if (abs(delta) > 900) { - tWarn("%s cid:%d sid:%d id:%s, time diff:%d is too big, msg discarded pConn:%p, timestamp:%d", pServer->label, - chann, sid, pConn->meterId, delta, pConn, htonl(pDigest->timeStamp)); - // the requirement of goldwind, should not return error in this case - code = TSDB_CODE_INVALID_TIME_STAMP; - goto _exit; - } - - if (taosAuthenticateMsg((uint8_t *)pHeader, dataLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { - char ipstr[24]; - tinet_ntoa(ipstr, ip); - mLError("user:%s login from %s, authentication failed", pHeader->meterId, ipstr); - tError("%s cid:%d sid:%d id:%s, authentication failed, msg discarded pConn:%p", pServer->label, chann, sid, - pConn->meterId, pConn); - code = TSDB_CODE_AUTH_FAILURE; - goto _exit; - } - } else { - // if it is request or response with code 0, msg shall be discarded - if ((pHeader->msgType & 1) || (pHeader->content[0] == 0)) { - tTrace("%s cid:%d sid:%d id:%s, auth spi not matched, msg discarded pConn:%p", pServer->label, chann, sid, - pConn->meterId, pConn); - code = TSDB_CODE_AUTH_FAILURE; - goto _exit; - } - } - } + code = rpcCheckAuthentication(pConn, (char *)pHeader, dataLen); + if ( code != 0 ) return code; if (pHeader->msgType != TSDB_MSG_TYPE_REG && pHeader->encrypt) { // decrypt here } - pHeader->destId = pConn->ownId; // destId maybe 0, it shall be changed - - if (pHeader->msgType & 1) { - if (pConn->peerId == 0) { - pConn->peerId = pHeader->sourceId; - } else { - if (pConn->peerId != pHeader->sourceId) { - tTrace("%s cid:%d sid:%d id:%s, source Id is changed, old:0x%08x new:0x%08x pConn:%p", pServer->label, chann, - sid, pConn->meterId, pConn->peerId, pHeader->sourceId, pConn); - code = TSDB_CODE_INVALID_VALUE; - goto _exit; - } - } - - if (pConn->inTranId == pHeader->tranId) { - if (pConn->inType == pHeader->msgType) { - tTrace("%s cid:%d sid:%d id:%s, %s is retransmitted, pConn:%p", pServer->label, chann, sid, pConn->meterId, - taosMsg[pHeader->msgType], pConn); - taosSendQuickRsp(pConn, (char)(pHeader->msgType + 1), TSDB_CODE_ACTION_IN_PROGRESS); - } else if (pConn->inType == 0) { - tTrace("%s cid:%d sid:%d id:%s, %s is already processed, tranId:%d pConn:%p", pServer->label, chann, sid, - pConn->meterId, taosMsg[pHeader->msgType], pConn->inTranId, pConn); - taosReSendRspToPeer(pConn); - } else { - tTrace("%s cid:%d sid:%d id:%s, mismatched message %s and tranId pConn:%p", pServer->label, chann, sid, - pConn->meterId, taosMsg[pHeader->msgType], pConn); - } - - // do not reply any message - code = TSDB_CODE_ALREADY_PROCESSED; - goto _exit; - } - - if (pConn->inType != 0) { - tTrace("%s cid:%d sid:%d id:%s, last session is not finished, inTranId:%d tranId:%d pConn:%p", pServer->label, - chann, sid, pConn->meterId, pConn->inTranId, pHeader->tranId, pConn); - code = TSDB_CODE_LAST_SESSION_NOT_FINISHED; - goto _exit; - } - - pConn->inTranId = pHeader->tranId; - pConn->inType = pHeader->msgType; - - if (sid == 0) // send a response first - taosSendQuickRsp(pConn, (char)(pConn->inType + 1), TSDB_CODE_ACTION_IN_PROGRESS); - + if ( rpcIsReq(pHeader->msgType) ) { + code = rpcProcessReqHeader(pConn, pHeader); } else { - // response from taos - pConn->peerId = pHeader->sourceId; - - if (pConn->outType == 0) { - code = TSDB_CODE_UNEXPECTED_RESPONSE; - goto _exit; - } - - if (pHeader->tranId != pConn->outTranId) { - code = TSDB_CODE_INVALID_TRAN_ID; - goto _exit; - } - - if (pHeader->msgType != pConn->outType + 1) { - code = TSDB_CODE_INVALID_RESPONSE_TYPE; - goto _exit; - } - - if (*pHeader->content == TSDB_CODE_NOT_READY) { - code = TSDB_CODE_ALREADY_PROCESSED; - goto _exit; - } - - taosTmrStopA(&pConn->pTimer); - pConn->retry = 0; - - if (*pHeader->content == TSDB_CODE_ACTION_IN_PROGRESS || pHeader->tcp) { - if (pConn->tretry <= tsRpcMaxRetry) { - tTrace("%s cid:%d sid:%d id:%s, peer is still processing the transaction, pConn:%p", pServer->label, chann, sid, - pHeader->meterId, pConn); - pConn->tretry++; - taosTmrReset(taosProcessTaosTimer, tsRpcProgressTime, pConn, pChann->tmrCtrl, &pConn->pTimer); - code = TSDB_CODE_ALREADY_PROCESSED; - goto _exit; - } else { - // peer still in processing, give up - *pHeader->content = TSDB_CODE_TOO_SLOW; - } - } - - pConn->tretry = 0; - if (pConn->rspReceived) { - code = TSDB_CODE_UNEXPECTED_RESPONSE; - goto _exit; - } else { - pConn->rspReceived = 1; - } + code = rpcProcessRspHeader(pConn, pHeader); } -_exit: - pthread_mutex_unlock(&pChann->mutex); - - // if (reSend) taosReSendRspToPeer(pConn); - return code; } -int taosBuildErrorMsgToPeer(char *pMsg, int code, char *pReply) { - STaosHeader *pRecvHeader, *pReplyHeader; - char * pContent; +void rpcSendErrorMsgToPeer(char *pMsg, int32 code, uint_32 ip, uint_16 port, void *chandle) { + SRpcHeader *pRecvHeader, *pReplyHeader; + char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(STaosRsp)]; + STaosRsp *pRsp; uint32_t timeStamp; int msgLen; - pRecvHeader = (STaosHeader *)pMsg; - pReplyHeader = (STaosHeader *)pReply; + pRecvHeader = (SRpcHeader *)pMsg; + pReplyHeader = (SRpcHeader *)msg; + memset(msg, 0, sizeof(SRpcHeader)); pReplyHeader->version = pRecvHeader->version; pReplyHeader->msgType = (char)(pRecvHeader->msgType + 1); pReplyHeader->tcp = 0; @@ -1052,424 +780,372 @@ int taosBuildErrorMsgToPeer(char *pMsg, int code, char *pReply) { pReplyHeader->destId = pRecvHeader->sourceId; memcpy(pReplyHeader->meterId, pRecvHeader->meterId, tListLen(pReplyHeader->meterId)); - pContent = (char *)pReplyHeader->content; - *pContent = (char)code; - pContent++; + pRsp = (STaosRsp *)pReplyHeader->content; + pRsp->code = htonl(code); + msgLen = sizeof(STaosRsp); + char *pContent = pRsp->more; if (code == TSDB_CODE_INVALID_TIME_STAMP) { // include a time stamp if client's time is not synchronized well timeStamp = taosGetTimestampSec(); memcpy(pContent, &timeStamp, sizeof(timeStamp)); - pContent += sizeof(timeStamp); + msgLen += sizeof(timeStamp); } - msgLen = (int)(pContent - pReply); pReplyHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); + (*taosSendData[pRpc->type])(ip, port, pReply, msgLen, chandle); - return msgLen; -} - -void taosReportDisconnection(SRpcChann *pChann, SRpcConn *pConn) -{ - SSchedMsg schedMsg; - schedMsg.fp = taosProcessSchedMsg; - schedMsg.msg = NULL; - schedMsg.ahandle = pConn->ahandle; - schedMsg.thandle = pConn; - taosScheduleTask(pChann->qhandle, &schedMsg); + return; } -void taosProcessIdleTimer(void *param, void *tmrId) { +void rpcProcessIdleTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; if (pConn->signature != param) { tError("idle timer pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param); return; } - STaosRpc * pServer = pConn->pServer; - SRpcChann *pChann = pServer->channList + pConn->chann; + SRpcInfo * pRpc = pConn->pRpc; if (pConn->pIdleTimer != tmrId) { - tTrace("%s cid:%d sid:%d id:%s, idle timer:%p already processed pConn:%p", pServer->label, pConn->chann, pConn->sid, - pConn->meterId, tmrId, pConn); + tTrace("%s pConn:%p, idle timer:%p already processed", pRpc->label, pConn, tmrId); return; } - int reportDisc = 0; - - pthread_mutex_lock(&pChann->mutex); - - tTrace("%s cid:%d sid:%d id:%s, close the connection since no activity pConn:%p", pServer->label, pConn->chann, - pConn->sid, pConn->meterId, pConn); - if (pConn->rspReceived == 0) { - pConn->rspReceived = 1; - reportDisc = 1; - } - - pthread_mutex_unlock(&pChann->mutex); - - if (reportDisc) taosReportDisconnection(pChann, pConn); + tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn); + rpcCloseConn(pConn); } -void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, +void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle) { - STaosHeader *pHeader; + SRpcHeader *pHeader = (SRpcHeader *)data; uint8_t code; - SRpcConn * pConn = (SRpcConn *)thandle; - STaosRpc * pServer = (STaosRpc *)shandle; + SRpcConn *pConn = (SRpcConn *)thandle; + SRpcInfo *pRpc = (SRpcInfo *)shandle; int msgLen; - char pReply[128]; - SSchedMsg schedMsg; - int chann, sid; - SRpcChann * pChann = NULL; tDump(data, dataLen); - if (ip == 0 && taosCloseConn[pServer->type]) { - // it means the connection is broken - if (pConn) { - pChann = pServer->channList + pConn->chann; - tTrace("%s cid:%d sid:%d id:%s, underlying link is gone pConn:%p", pServer->label, pConn->chann, pConn->sid, - pConn->meterId, pConn); - pConn->rspReceived = 1; - pConn->chandle = NULL; - taosReportDisconnection(pChann, pConn); - } - tfree(data); + if (ip == 0 && taosCloseConn[pRpc->type] && pConn) { + // it means the connection is broken, it only happens for TCP + tTrace("%s pConn:%p, underlying link is gone%p", pRpc->label, pConn); + pContext->terrno = TSDB_CODE_NETWORK_UNAVAIL; + taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl); return NULL; } - pHeader = (STaosHeader *)data; - msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen); - - code = (uint8_t)taosProcessMsgHeader(pHeader, &pConn, pServer, dataLen, ip, port, chandle); - - pHeader->destId = htonl(pHeader->destId); - chann = pHeader->destId >> pServer->bits; - sid = pHeader->destId & pServer->mask; + pthread_mutex_lock(&pRpc->mutex); + code = rpcProcessHeader(pHeader, &pConn, pRpc, dataLen, ip, port, chandle); + pthread_mutex_unlock(&pRpc->mutex); - if (pConn && pServer->idleTime) { - SRpcChann *pChann = pServer->channList + pConn->chann; - taosTmrReset(taosProcessIdleTimer, pServer->idleTime, pConn, pChann->tmrCtrl, &pConn->pIdleTimer); + if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { + tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", + pRpc->label, pConn, taosMsg[pHeader->msgType], ip, port, code, + dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); } - if (code == TSDB_CODE_ALREADY_PROCESSED) { - tTrace("%s cid:%d sid:%d id:%s, %s wont be processed, source:0x%08x dest:0x%08x tranId:%d pConn:%p", pServer->label, - chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->sourceId, htonl(pHeader->destId), - pHeader->tranId, pConn); - free(data); - return pConn; + if (pConn && pRpc->idleTime) { + taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); } - if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace( - "%s cid:%d sid:%d id:%s, %s received from 0x%x:%hu, parse code:%u, first:%u len:%d source:0x%08x dest:0x%08x " - "tranId:%d pConn:%p", - pServer->label, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], ip, port, code, pHeader->content[0], - dataLen, pHeader->sourceId, htonl(pHeader->destId), pHeader->tranId, pConn); + if (code != TSDB_CODE_ALREADY_PROCESSED) { + if (code != 0) { // parsing error + if ( rpcIsReq(pHeader->msgType) ) { + taosSendErrorMsgToPeer(data, code, ip, port, chandle); + tTrace("%s pConn:%p, %s is sent with error code:%u", pRpc->label, pConn, taosMsg[pHeader->msgType+1], code); + } + } else { // parsing OK + rpcProcessIncomingMsg(pConn, pHeader); + } } - if (code != 0) { - // parsing error + if ( code != 0 ) free (data); + return pConn; +} - if (pHeader->msgType & 1U) { - memset(pReply, 0, sizeof(pReply)); - - msgLen = taosBuildErrorMsgToPeer(data, code, pReply); - (*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle); - tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid, - pHeader->meterId, taosMsg[pHeader->msgType + 1], code, pConn); - } else { - tTrace("%s cid:%d sid:%d id:%s, %s is received, parsing error:%u pConn:%p", pServer->label, chann, sid, - pHeader->meterId, taosMsg[pHeader->msgType], code, pConn); - } +void taosProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) { + SRpcInfo *pRpc = pConn->pRpc; + int msgLen = rpcContLenFromHeader(pHeader->msgLen); - free(data); + pHeader = rpcDecompressRpcMsg(pHeader); + + if ( rpcIsReq(msgType) ) { + (*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pConn); } else { - // parsing OK + // it's a response + STaosRsp *pRsp = (STaosRsp *)msg; + int32_t code = htonl(pRsp->code); - // internal communication is based on TAOS protocol, a trick here to make it efficient - if (pHeader->spi) msgLen -= sizeof(STaosDigest); - msgLen -= (int)sizeof(STaosHeader); - pHeader->msgLen = msgLen + (int)sizeof(SIntMsg); + SRpcReqContext *pContext = pConn->pContext; + pConn->pContext = NULL; - if ((pHeader->msgType & 1U) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) { - schedMsg.msg = NULL; // connection shall be closed - } else { - pHeader = taosDecompressRpcMsg(pHeader, &schedMsg, msgLen); - } + taosAddConnToIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId); - if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16U)) { - tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", pServer->label, chann, sid, - pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, pConn->pTimer); + if (code == TSDB_CODE_NOT_MASTER) { + pContext->terrno = code; + taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl); + } else { + rpcFreeMsg(rpcGetMsgFromCont(pContext->cont)); // free the request msg + (*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pContext->ahandle); } - - pChann = pServer->channList + pConn->chann; - schedMsg.fp = taosProcessSchedMsg; - schedMsg.ahandle = pConn->ahandle; - schedMsg.thandle = pConn; - taosScheduleTask(pChann->qhandle, &schedMsg); } - - return pConn; } -int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) { - STaosHeader *pHeader; - SMsgNode * pMsgNode; - char * msg; - int msgLen = 0; - SRpcConn * pConn = (SRpcConn *)thandle; - STaosRpc * pServer; - SRpcChann * pChann; - uint8_t msgType; - - if (pConn == NULL) return -1; - if (pConn->signature != pConn) return -1; - - pServer = pConn->pServer; - pChann = pServer->channList + pConn->chann; - pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader)); - pHeader->destIp = pConn->peerIp; - msg = (char *)pHeader; +SRpcConn *rpcGetConnToServer(void *shandle, SRpcIpSet ipSet) { + SRpcInfo *pRpc = (SRpcInfo *)shandle; + + SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[index], pRpc->peerPort, pRpc->meterId); + + if ( pConn == NULL ) { + SRpcConnInit connInit; + memset(&connInit, 0, sizeof(connInit)); + connInit.sid = 0; + connInit.spi = pRpc->spi; + connInit.encrypt = pRpc->encrypt; + connInit.meterId = pRpc->user; + connInit.peerId = 0; + connInit.shandle = pRpc; + connInit.peerIp = ipstr; + connInit.peerPort = pRpc->peerPort; + pConn = rpcOpenConn(&connInit); + } - if ((pHeader->msgType & 1U) == 0 && pConn->localPort) pHeader->port = pConn->localPort; - - contLen = taosCompressRpcMsg(pCont, contLen); + return pConn; +} - msgLen = contLen + (int32_t)sizeof(STaosHeader); +int taosAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { + SRpcHeader *pHeader = (SRpcHeader *)msg; if (pConn->spi) { // add auth part pHeader->spi = pConn->spi; - STaosDigest *pDigest = (STaosDigest *)(pCont + contLen); + SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen); pDigest->timeStamp = htonl(taosGetTimestampSec()); - msgLen += sizeof(STaosDigest); + msgLen += sizeof(SRpcDigest); pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); - taosBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); + rpcBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); } else { pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen); } - pthread_mutex_lock(&pChann->mutex); - msgType = pHeader->msgType; + return msgLen; +} - if ((msgType & 1U) == 0) { - // response - pConn->inType = 0; - tfree(pConn->pRspMsg); - pConn->pRspMsg = msg; - pConn->rspMsgLen = msgLen; +int rpcSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) { + int writtenLen = 0; + SRpcInfo *pRpc = pConn->pRpc; + SRpcHeader *pHeader = (SRpcHeader *)data; + int code = 0; - if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; + dataLen = taosAddAuthPart(pConn, data, dataLen); + if ( rpcIsReq(pHeader->msgType)) { + if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) + tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d", + pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, + pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); } else { - // request - pMsgNode = (SMsgNode *)(pCont - sizeof(STaosHeader) - sizeof(SMsgNode)); - pMsgNode->msgLen = msgLen; - pMsgNode->next = NULL; - pMsgNode->ahandle = ahandle; - - if (pConn->outType) { - if (pConn->pTail) { - pConn->pTail->next = pMsgNode; - pConn->pTail = pMsgNode; - } else { - pConn->pTail = pMsgNode; - pConn->pHead = pMsgNode; - } - - tTrace("%s cid:%d sid:%d id:%s, msg:%s is put into queue pConn:%p", pServer->label, pConn->chann, pConn->sid, - pConn->meterId, taosMsg[msgType], pConn); - msgLen = 0; + if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) + tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", + pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pConn->peerPort, + (uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId); + } - } else { - assert(pConn->pMsgNode == NULL); - if (pConn->pMsgNode) { - tError("%s cid:%d sid:%d id:%s, bug, there shall be no pengding req pConn:%p", pServer->label, pConn->chann, - pConn->sid, pConn->meterId, pConn); - } + writtenLen = (*taosSendData[pRpc->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle); - pConn->outType = msgType; - pConn->outTranId = pHeader->tranId; - pConn->pMsgNode = pMsgNode; - pConn->rspReceived = 0; - if (pMsgNode->ahandle) pConn->ahandle = pMsgNode->ahandle; - } + if (writtenLen != dataLen) { + tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, + dataLen, writtenLen, strerror(errno)); + code = -1; } + + tDump(data, dataLen); - if (msgLen) { - taosSendDataToPeer(pConn, (char *)pHeader, msgLen); - if (msgType & 1U) { - taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer); - } + return code; +} + +void rpcSendReqToOneServer(SRpcConn *pConn, SRpcReqContext *pContext) { + + char *pHeader = rpcHeaderFromCont(pContext->pCont); + SRpcHeader *msg = (char *)pHeader; + int msgLen = rpcGetMsgLen(pContext->contLen); + char msgType = pContext->msgType; + + // set the message header + pHeader->version = 1; + pHeader->msgType = msgType; + pHeader->tcp = 0; + pHeader->encrypt = 0; + pConn->tranId++; + if ( pConn->tranId == 0 ) pConn->tranId++; + pHeader->tranId = pConn->tranId; + pHeader->sourceId = pConn->ownId; + pHeader->destId = pConn->peerId; + pHeader->port = 0; + pHeader->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid()); + memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); + + // set the connection parameters + pConn->outType = msgType; + pConn->outTranId = pHeader->tranId; + pConn->pMsgNode = pMsgNode; + pConn->pReqMsg = msg; + pConn->reqMsgLen = msgLen; + pConn->context = pContext; + + if ( rpcSendDataToPeer(pConn, msg, msgLen) < 0 ) { + taosReportError(pConn->pContext, terrno); + } else { + taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); } +} + +void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, char *pCont, int contLen, void *ahandle) { + SRpcConn *pConn; + SRpcReqContext *pContext; + + contLen = rpcCompressRpcMsg(pCont, contLen); + pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHeader)-sizeof(SRpcReqContext)); + pContext->ahandle = ahandle; + pContext->pRpc = (SRpcInfo *)shandle; + pContext->ipSet = ipSet; + pContext->contLen = contLen + pContext->pCont = pCont; + pContext->type = type; + + pConn = rpcGetConnToServer(shandle, ipSet); + pContext->terrno = terrno; + if (pConn == NULL) taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl); + + rpcSendReqToOneServer(pConn, pContext); + + return; +} + +void rpcSendResponse(SRpcConn *pConn, char *pCont, int contLen) { + int msgLen = 0; + SRpcConn *pConn; + SRpcHeader *pHeader = rpcHeaderFromCont(pCont); + char *msg = (char *)pHeader; + + contLen = rpcCompressRpcMsg(pCont, contLen); + msgLen = rpcMsgLenFromCont(contLen); - pthread_mutex_unlock(&pChann->mutex); + pthread_mutex_lock(&pRpc->mutex); - return contLen; + // set msg header + pHeader->version = 1; + pHeader->msgType = pConn->inType+1; + pHeader->spi = 0; + pHeader->tcp = 0; + pHeader->encrypt = 0; + pHeader->tranId = pConn->inTranId; + pHeader->sourceId = pConn->ownId; + pHeader->destId = pConn->peerId; + pHeader->uid = 0; + memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId)); + + // set pConn parameters + pConn->inType = 0; + rpcFreeMsg(pConn->pRspMsg); + pConn->pRspMsg = msg; + pConn->rspMsgLen = msgLen; + + if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; + + pthread_mutex_lock(&pRpc->mutex); + + rpcSendDataToPeer(pConn, msg, msgLen); + + return; } -int taosReSendRspToPeer(SRpcConn *pConn) { - STaosHeader *pHeader; - int writtenLen; - STaosRpc * pServer = pConn->pServer; +static void rpcResendRspToPeer(SRpcConn *pConn) { - if (pConn->pRspMsg == NULL || pConn->rspMsgLen <= 0) { - tError("%s cid:%d sid:%d id:%s, rsp is null", pServer->label, pConn->chann, pConn->sid, pConn->meterId); - return -1; + if (pConn->pRspMsg == NULL || pConn->rspMsgLen <= 0 || pConn->rspMsgLen <= sizeof(SRpcHeader)) { + tError("%s pConn:%p, rsp is null", pRpc->label); + return; } - pHeader = (STaosHeader *)pConn->pRspMsg; - if (pHeader->msgLen <= sizeof(SIntMsg) + 1 || pHeader->msgType <= 0) { - tError("%s cid:%d sid:%d id:%s, rsp is null, rspLen:%d, msgType:%d", pServer->label, pConn->chann, pConn->sid, - pConn->meterId, pHeader->msgLen, pHeader->msgType); - return -1; + SRpcHeader *pHeader = (SRpcHeader *)pConn->pRspMsg; + if (pHeader->msgType <= 0) { + tError("%s pConn:%p, msgType is messed up, rspLen:%d, msgType:%d", pRpc->label, pConn, pHeader->msgLen, pHeader->msgType); + return; } - writtenLen = - (*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, pConn->pRspMsg, pConn->rspMsgLen, pConn->chandle); + rpcSendDataToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); +} + +static void rpcProcessConnError(void *param, void *id) { + SRpcReqContext *pContext = (SRpcContext *)param; - if (writtenLen != pConn->rspMsgLen) { - tError("%s cid:%d sid:%d id:%s, failed to re-send %s, reason:%s pConn:%p", pServer->label, pConn->chann, pConn->sid, - pConn->meterId, taosMsg[(int)pHeader->msgType], strerror(errno), pConn); + if ( pContext->numOfRetry >= pContext->ipSet.numOfIps ) { + char *rsp = calloc(1, RPC_MSG_OVERHEAD + sizeof(STaosRsp)); + if ( rsp ) { + STaosRsp *pRsp = (rsp+sizeof(SRpcHeader)); + pRsp->code = pContext->terrno; + (*(pRpc->fp))(pContext->msgType+1, pRsp, sizeof(STaosRsp), pContext->ahandle); + } else { + tError("%s failed to malloc RSP", pRpc->label); + } } else { - tTrace("%s cid:%d sid:%d id:%s, msg:%s is re-sent to %s:%hu, len:%d pConn:%p", pServer->label, pConn->chann, - pConn->sid, pConn->meterId, taosMsg[(int)pHeader->msgType], pConn->peerIpstr, pConn->peerPort, - pConn->rspMsgLen, pConn); - } + // move to next IP + pContext->ipSet.index++; + pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps; + + pConn = rpcGetConnToServer(pContext->pRpc, pContext->ipSet); + pContext->terrno = terrno; + if (pConn == NULL) taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl); - return 0; + taosSendReqToOneServer(pConn, pContext); + } } -void taosProcessTaosTimer(void *param, void *tmrId) { - STaosHeader *pHeader = NULL; - SRpcConn * pConn = (SRpcConn *)param; - int msgLen; - int reportDisc = 0; +static void rpcProcessRetryTimer(void *param, void *tmrId) { + SRpcConn *pConn = (SRpcConn *)param; + int reportDisc = 0; if (pConn->signature != param) { tError("pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param); return; } - STaosRpc * pServer = pConn->pServer; - SRpcChann *pChann = pServer->channList + pConn->chann; + SRpcInfo *pRpc = pConn->pRpc; if (pConn->pTimer != tmrId) { - tTrace("%s cid:%d sid:%d id:%s, timer:%p already processed pConn:%p", pServer->label, pConn->chann, pConn->sid, - pConn->meterId, tmrId, pConn); + tTrace("%s pConn:%p, timer:%p already processed%", pRpc->label, pConn); return; } - pthread_mutex_lock(&pChann->mutex); + pthread_mutex_lock(&pRpc->mutex); - if (pConn->rspReceived) { - tTrace("%s cid:%d sid:%d id:%s, rsp just received, pConn:%p", pServer->label, pConn->chann, pConn->sid, - pConn->meterId, pConn); - } else if (pConn->outType == 0) { - tTrace("%s cid:%d sid:%d id:%s, outtype is zero, pConn:%p", pServer->label, pConn->chann, pConn->sid, - pConn->meterId, pConn); + if (pConn->outType == 0) { + tTrace("%s pConn:%p, outtype is zero", pRpc->label, pConn); } else { - tTrace("%s cid:%d sid:%d id:%s, expected %s is not received, pConn:%p", pServer->label, pConn->chann, pConn->sid, - pConn->meterId, taosMsg[(int)pConn->outType + 1], pConn); + tTrace("%s pConn:%p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); pConn->pTimer = NULL; pConn->retry++; if (pConn->retry < 4) { - tTrace("%s cid:%d sid:%d id:%s, re-send msg:%s to %s:%hu pConn:%p", pServer->label, pConn->chann, pConn->sid, - pConn->meterId, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn); - if (pConn->pMsgNode && pConn->pMsgNode->msgLen > 0) { - pHeader = (STaosHeader *)((char *)pConn->pMsgNode + sizeof(SMsgNode)); - pHeader->destId = pConn->peerId; - msgLen = pConn->pMsgNode->msgLen; - if (pConn->spi) { - STaosDigest *pDigest = (STaosDigest *)(((char *)pHeader) + pConn->pMsgNode->msgLen - sizeof(STaosDigest)); - pDigest->timeStamp = htonl(taosGetTimestampSec()); - taosBuildAuthHeader((uint8_t *)pHeader, pConn->pMsgNode->msgLen - TSDB_AUTH_LEN, pDigest->auth, - pConn->secret); - } + tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label, + taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); + if (pConn->pReqMsg && pConn->pReqMsgLen > 0) { + rpcSendDataToPeer(pConn, pReqMsg, pReqMsgLen); } } else { // close the connection - tTrace("%s cid:%d sid:%d id:%s, failed to send msg:%s to %s:%hu pConn:%p", pServer->label, pConn->chann, - pConn->sid, pConn->meterId, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn); - if (pConn->rspReceived == 0) { - pConn->rspReceived = 1; - reportDisc = 1; - } + tTrace("%s pConn:%p, failed to send msg:%s to %s:%hu", pRpc->label, pConn, + taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn); + reportDisc = 1; } } - if (pHeader) { - (*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, msgLen, pConn->chandle); - taosTmrReset(taosProcessTaosTimer, tsRpcTimer<retry, pConn, pChann->tmrCtrl, &pConn->pTimer); - } - - pthread_mutex_unlock(&pChann->mutex); - - if (reportDisc) taosReportDisconnection(pChann, pConn); -} - -void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, uint16_t *peerPort, int *cid, int *sid) { - SRpcConn *pConn = (SRpcConn *)thandle; - - *peerId = pConn->peerId; - *peerIp = pConn->peerIp; - *peerPort = pConn->peerPort; - - *cid = pConn->chann; - *sid = pConn->sid; -} - -int taosGetOutType(void *thandle) { - SRpcConn *pConn = (SRpcConn *)thandle; - if (pConn == NULL) return -1; - - return pConn->outType; -} - -void taosProcessSchedMsg(SSchedMsg *pMsg) { - SIntMsg * pHeader = (SIntMsg *)pMsg->msg; - SRpcConn *pConn = (SRpcConn *)pMsg->thandle; - if (pConn == NULL || pConn->signature != pMsg->thandle || pConn->pServer == NULL) return; - STaosRpc *pRpc = pConn->pServer; - - void *ahandle = (*(pRpc->fp))(pMsg->msg, pMsg->ahandle, pMsg->thandle); - - if (ahandle == NULL || pMsg->msg == NULL) { - taosCloseRpcConn(pConn); - } else { - pConn->ahandle = ahandle; - if (pHeader && ((pHeader->msgType & 1) == 0)) taosProcessResponse(pConn); - } - - if (pMsg->msg) free(pMsg->msg - sizeof(STaosHeader) + sizeof(SIntMsg)); -} - -void taosStopRpcConn(void *thandle) { - SRpcConn * pConn = (SRpcConn *)thandle; - STaosRpc * pServer = pConn->pServer; - SRpcChann *pChann = pServer->channList + pConn->chann; - - tTrace("%s cid:%d sid:%d id:%s, stop the connection pConn:%p", pServer->label, pConn->chann, pConn->sid, - pConn->meterId, pConn); - - int reportDisc = 0; - pthread_mutex_lock(&pChann->mutex); - - if (pConn->outType) { - pConn->rspReceived = 1; - reportDisc = 1; - pthread_mutex_unlock(&pChann->mutex); - } else { - pthread_mutex_unlock(&pChann->mutex); - taosCloseRpcConn(pConn); - } + pthread_mutex_unlock(&pRpc->mutex); - if (reportDisc) taosReportDisconnection(pChann, pConn); + pConn->terrno = TSDB_CODE_NETWORK_UNAVAIL; + if (reportDisc) taosProcessConnError(pConn->pContext, NULL); } -int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) { +static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) { MD5_CTX context; int ret = -1; @@ -1484,7 +1160,7 @@ int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey return ret; } -int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) { +static int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) { MD5_CTX context; MD5Init(&context); diff --git a/src/util/inc/tbuffer.h b/src/util/inc/tbuffer.h new file mode 100644 index 0000000000000000000000000000000000000000..9dc6d97eb6acce4077cc2e7b93c05a49839e68f7 --- /dev/null +++ b/src/util/inc/tbuffer.h @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2020 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include + +#ifndef TDENGINE_TBUFFER_H +#define TDENGINE_TBUFFER_H + + +/* +SBuffer can be used to read or write a buffer, but cannot be used for both +read & write at a same time. Below is an example: + +int main(int argc, char** argv) { + //--------------------- write ------------------------ + SBuffer wbuf; + int32_t code = tbufBeginWrite(&wbuf); + if (code != 0) { + // handle errors + return 0; + } + + // reserve 1024 bytes for the buffer to improve performance + tbufEnsureCapacity(&wbuf, 1024); + + // write 5 integers to the buffer + for (int i = 0; i < 5; i++) { + tbufWriteInt32(&wbuf, i); + } + + // write a string to the buffer + tbufWriteString(&wbuf, "this is a string.\n"); + + // acquire the result and close the write buffer + size_t size = tbufTell(&wbuf); + char* data = tbufGetData(&wbuf, true); + tbufClose(&wbuf, true); + + + //------------------------ read ----------------------- + SBuffer rbuf; + code = tbufBeginRead(&rbuf, data, size); + if (code != 0) { + printf("you will see this message after print out 5 integers and a string.\n"); + tbufClose(&rbuf, false); + return 0; + } + + // read & print out 5 integers + for (int i = 0; i < 5; i++) { + printf("%d\n", tbufReadInt32(&rbuf)); + } + + // read & print out a string + printf(tbufReadString(&rbuf, NULL)); + + // try read another integer, this result in an error as there no this integer + tbufReadInt32(&rbuf); + + printf("you should not see this message.\n"); + tbufClose(&rbuf, false); + + return 0; +} +*/ +typedef struct { + jmp_buf jb; + char* data; + size_t pos; + size_t size; +} SBuffer; + + +// common functions can be used in both read & write +#define tbufThrowError(buf, code) longjmp((buf)->jb, (code)) +size_t tbufTell(SBuffer* buf); +size_t tbufSeekTo(SBuffer* buf, size_t pos); +size_t tbufSkip(SBuffer* buf, size_t size); +void tbufClose(SBuffer* buf, bool keepData); + + +// basic read functions +#define tbufBeginRead(buf, data, len) (((buf)->data = (char*)data), ((buf)->pos = 0), ((buf)->size = ((data) == NULL) ? 0 : (len)), setjmp((buf)->jb)) +char* tbufRead(SBuffer* buf, size_t size); +void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size); +const char* tbufReadString(SBuffer* buf, size_t* len); +size_t tbufReadToString(SBuffer* buf, char* dst, size_t size); + + +// basic write functions +#define tbufBeginWrite(buf) ((buf)->data = NULL, ((buf)->pos = 0), ((buf)->size = 0), setjmp((buf)->jb)) +void tbufEnsureCapacity(SBuffer* buf, size_t size); +char* tbufGetData(SBuffer* buf, bool takeOver); +void tbufWrite(SBuffer* buf, const void* data, size_t size); +void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size); +void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len); +void tbufWriteString(SBuffer* buf, const char* str); + + +// read & write function for primitive types +#ifndef TBUFFER_DEFINE_FUNCTION +#define TBUFFER_DEFINE_FUNCTION(type, name) \ + type tbufRead##name(SBuffer* buf); \ + void tbufWrite##name(SBuffer* buf, type data); \ + void tbufWrite##name##At(SBuffer* buf, size_t pos, type data); +#endif + +TBUFFER_DEFINE_FUNCTION( bool, Bool ) +TBUFFER_DEFINE_FUNCTION( char, Char ) +TBUFFER_DEFINE_FUNCTION( int8_t, Int8 ) +TBUFFER_DEFINE_FUNCTION( uint8_t, Unt8 ) +TBUFFER_DEFINE_FUNCTION( int16_t, Int16 ) +TBUFFER_DEFINE_FUNCTION( uint16_t, Uint16 ) +TBUFFER_DEFINE_FUNCTION( int32_t, Int32 ) +TBUFFER_DEFINE_FUNCTION( uint32_t, Uint32 ) +TBUFFER_DEFINE_FUNCTION( int64_t, Int64 ) +TBUFFER_DEFINE_FUNCTION( uint64_t, Uint64 ) +TBUFFER_DEFINE_FUNCTION( float, Float ) +TBUFFER_DEFINE_FUNCTION( double, Double ) + +#endif \ No newline at end of file diff --git a/src/util/src/tbuffer.c b/src/util/src/tbuffer.c new file mode 100644 index 0000000000000000000000000000000000000000..ac7d22078d42eea60fde157ea364dee146203a2f --- /dev/null +++ b/src/util/src/tbuffer.c @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2020 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include + +#define TBUFFER_DEFINE_FUNCTION(type, name) \ + type tbufRead##name(SBuffer* buf) { \ + type ret; \ + tbufReadToBuffer(buf, &ret, sizeof(type)); \ + return ret; \ + }\ + void tbufWrite##name(SBuffer* buf, type data) {\ + tbufWrite(buf, &data, sizeof(data));\ + }\ + void tbufWrite##name##At(SBuffer* buf, size_t pos, type data) {\ + tbufWriteAt(buf, pos, &data, sizeof(data));\ + } + +#include "../inc/tbuffer.h" + + +//////////////////////////////////////////////////////////////////////////////// +// common functions + +size_t tbufTell(SBuffer* buf) { + return buf->pos; +} + +size_t tbufSeekTo(SBuffer* buf, size_t pos) { + if (pos > buf->size) { + // TODO: update error code, other tbufThrowError need to be changed too + tbufThrowError(buf, 1); + } + size_t old = buf->pos; + buf->pos = pos; + return old; +} + +size_t tbufSkip(SBuffer* buf, size_t size) { + return tbufSeekTo(buf, buf->pos + size); +} + +void tbufClose(SBuffer* buf, bool keepData) { + if (!keepData) { + free(buf->data); + } + buf->data = NULL; + buf->pos = 0; + buf->size = 0; +} + +//////////////////////////////////////////////////////////////////////////////// +// read functions + +char* tbufRead(SBuffer* buf, size_t size) { + char* ret = buf->data + buf->pos; + tbufSkip(buf, size); + return ret; +} + +void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size) { + assert(dst != NULL); + // always using memcpy, leave optimization to compiler + memcpy(dst, tbufRead(buf, size), size); +} + +const char* tbufReadString(SBuffer* buf, size_t* len) { + uint16_t l = tbufReadUint16(buf); + char* ret = buf->data + buf->pos; + tbufSkip(buf, l + 1); + ret[l] = 0; // ensure the string end with '\0' + if (len != NULL) { + *len = l; + } + return ret; +} + +size_t tbufReadToString(SBuffer* buf, char* dst, size_t size) { + assert(dst != NULL); + size_t len; + const char* str = tbufReadString(buf, &len); + if (len >= size) { + len = size - 1; + } + memcpy(dst, str, len); + dst[len] = 0; + return len; +} + + +//////////////////////////////////////////////////////////////////////////////// +// write functions + +void tbufEnsureCapacity(SBuffer* buf, size_t size) { + size += buf->pos; + if (size > buf->size) { + size_t nsize = size + buf->size; + char* data = realloc(buf->data, nsize); + if (data == NULL) { + tbufThrowError(buf, 2); + } + buf->data = data; + buf->size = nsize; + } +} + +char* tbufGetData(SBuffer* buf, bool takeOver) { + char* ret = buf->data; + if (takeOver) { + buf->pos = 0; + buf->size = 0; + buf->data = NULL; + } + return ret; +} + +void tbufEndWrite(SBuffer* buf) { + free(buf->data); + buf->data = NULL; + buf->pos = 0; + buf->size = 0; +} + +void tbufWrite(SBuffer* buf, const void* data, size_t size) { + assert(data != NULL); + tbufEnsureCapacity(buf, size); + memcpy(buf->data + buf->pos, data, size); + buf->pos += size; +} + +void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size) { + assert(data != NULL); + // this function can only be called to fill the gap on previous writes, + // so 'pos + size <= buf->pos' must be true + assert(pos + size <= buf->pos); + memcpy(buf->data + pos, data, size); +} + +void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len) { + // maximum string length is 65535, if longer string is required + // this function and the corresponding read function need to be + // revised. + assert(len <= 0xffff); + tbufWriteUint16(buf, (uint16_t)len); + tbufWrite(buf, str, len + 1); +} + +void tbufWriteString(SBuffer* buf, const char* str) { + tbufWriteStringLen(buf, str, strlen(str)); +} diff --git a/src/vnode/CMakeLists.txt b/src/vnode/CMakeLists.txt index 5a7b605cb81c20bb8f530e30a0dc6a34688a92ec..92270a886f4cae4363e8c774bfa2622c3989b23b 100644 --- a/src/vnode/CMakeLists.txt +++ b/src/vnode/CMakeLists.txt @@ -1,4 +1,7 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 2.8) -PROJECT(TDengine) +cmake_minimum_required(VERSION 2.8) -ADD_SUBDIRECTORY(detail) \ No newline at end of file +project(tsdb) + +add_subdirectory(common) + +add_subdirectory(tsdb) \ No newline at end of file diff --git a/src/vnode/common/CMakeLists.txt b/src/vnode/common/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..38803c2421ffc97bbecf018d02b962228af9eb62 --- /dev/null +++ b/src/vnode/common/CMakeLists.txt @@ -0,0 +1,8 @@ +aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/src SOURCE_LIST) + +list(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/src/vnodePeer.c) + +message(STATUS "Common source file ${SOURCE_LIST}") + +add_library(common ${SOURCE_LIST}) +target_include_directories(common PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc") diff --git a/src/vnode/common/src/schema.c b/src/vnode/common/src/schema.c index cf0664c7cc0dee5542cc958921e01c60acb568ff..79b41533d23a0ef5ae8ace9da67e9416d5f604b0 100644 --- a/src/vnode/common/src/schema.c +++ b/src/vnode/common/src/schema.c @@ -51,7 +51,7 @@ SISchema tdConvertSchemaToInline(SSchema *pSchema) { char *pName = TD_ISCHEMA_COL_NAMES(pISchema); for (int32_t i = 0; i < totalCols; i++) { SColumn *pCol = TD_SCHEMA_COLUMN_AT(TD_ISCHEMA_SCHEMA(pISchema), i); - char * colName = TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i), i); + char * colName = TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i)); TD_COLUMN_NAME(pCol) = pName; diff --git a/src/vnode/tsdb/CMakeLists.txt b/src/vnode/tsdb/CMakeLists.txt index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..4a88fbd7d65dfe6a327d7b5ad0348fb12dc079e4 100644 --- a/src/vnode/tsdb/CMakeLists.txt +++ b/src/vnode/tsdb/CMakeLists.txt @@ -0,0 +1,9 @@ +aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/src SOURCE_LIST) + +message(STATUS "tsdb source files: ${SOURCE_LIST}") + +add_library(tsdb STATIC ${SOURCE_LIST}) + +target_link_libraries(tsdb common) + +target_include_directories(tsdb PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc") \ No newline at end of file diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 2117c951ca8b74d1d850d92fc2cd418146c32a76..b04f0148f06b27d8dd3b2e6d3e35519828eb2e08 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -8,7 +8,7 @@ #include #include -#include "cache.h" +// #include "cache.h" #include "schema.h" #define TSDB_VERSION_MAJOR 1 @@ -18,6 +18,21 @@ typedef void tsdb_repo_t; // use void to hide implementation details from ou typedef int32_t table_id_t; // table ID type in this repository typedef int16_t tsdb_id_t; // TSDB repository ID +// Submit message +typedef struct { + int32_t numOfTables; + char data[]; +} SSubmitMsg; + +// Submit message for one table +typedef struct { + table_id_t tableId; // table ID to insert + int32_t sversion; // data schema version + int32_t numOfRows; // number of rows data + int64_t uid; // table UID to insert + char data[]; +} SSubmitBlock; + // Retention policy. typedef struct { // TODO: Need a more fancy description @@ -54,7 +69,7 @@ typedef struct { SDataShardPolicy dataShardPolicy; SBlockRowsPolicy blockRowsPolicy; SRetentionPolicy retentionPlicy; // retention configuration - SCachePool * cachePool; // the cache pool the repository to use + void * cachePool; // the cache pool the repository to use } STSDBCfg; // the TSDB repository info @@ -205,6 +220,9 @@ typedef struct STimeWindow { int64_t ekey; } STimeWindow; +typedef struct { +} SColumnFilterInfo; + // query condition to build vnode iterator typedef struct STSDBQueryCond { STimeWindow twindow; @@ -237,6 +255,10 @@ typedef struct STableIDList { int32_t num; } STableIDList; +typedef struct { + +} SFields; + /** * Get the data block iterator, starting from position according to the query condition * @param pRepo the TSDB repository to query on diff --git a/src/vnode/tsdb/inc/tsdbCache.h b/src/vnode/tsdb/inc/tsdbCache.h index b0993c1e093be67e1267c3bc897f25cf40d50c01..049cdc0847d8da27f46fbf77d30d9d24fc3bc67a 100644 --- a/src/vnode/tsdb/inc/tsdbCache.h +++ b/src/vnode/tsdb/inc/tsdbCache.h @@ -3,7 +3,7 @@ #include -#include "cache.h" +// #include "cache.h" #define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16*1024*1024 /* 16M */ @@ -13,22 +13,21 @@ typedef struct { int32_t numOfRows // numOfRows } STableCacheInfo; -typedef struct { - char *pData; - STableCacheInfo *pTableInfo; - SCacheBlock *prev; - SCacheBlock *next; +typedef struct _tsdb_cache_block { + char * pData; + STableCacheInfo * pTableInfo; + struct _tsdb_cache_block *prev; + struct _tsdb_cache_block *next; } STSDBCacheBlock; // Use a doublely linked list to implement this typedef struct STSDBCache { // Number of blocks the cache is allocated - int32_t numOfBlocks; + int32_t numOfBlocks; STSDBCacheBlock *cacheList; - void * current; + void * current; } SCacheHandle; - // ---- Operation on STSDBCacheBlock #define TSDB_CACHE_BLOCK_DATA(pBlock) ((pBlock)->pData) #define TSDB_CACHE_AVAIL_SPACE(pBlock) ((char *)((pBlock)->pTableInfo) - ((pBlock)->pData)) diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 91abea076f4551257ec83620833c00045a084fb2..02eb0c78813a7e89c326bfc6b99b3351b805ab67 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -1,7 +1,8 @@ #if !defined(_TD_TSDB_FILE_H_) #define _TD_TSDB_FILE_H_ -#include "tstring.h" +#include +// #include "tstring.h" typedef int32_t file_id_t; @@ -24,7 +25,7 @@ typedef struct { } SFileInfo; typedef struct { - tstring_t fname; + char * fname; SFileInfo fInfo; } SFILE; diff --git a/src/vnode/tsdb/inc/tsdbMeta.h b/src/vnode/tsdb/inc/tsdbMeta.h index 28bafa1dc8a8b426edca2e4a0397d23676d1b1ec..588394459e151f4fc03c0c5923dcf05082daeba5 100644 --- a/src/vnode/tsdb/inc/tsdbMeta.h +++ b/src/vnode/tsdb/inc/tsdbMeta.h @@ -4,7 +4,7 @@ #include -#include "taosdef.h" +// #include "taosdef.h" // Initially, there are 4 tables #define TSDB_INIT_NUMBER_OF_SUPER_TABLE 4 @@ -30,7 +30,7 @@ typedef struct STable { // For TSDB_SUPER_TABLE, it is the schema including tags // For TSDB_NTABLE, it is only the schema, not including tags // For TSDB_STABLE, it is NULL - SVSchema *pSchema; + SSchema *pSchema; // Tag value for this table // For TSDB_SUPER_TABLE and TSDB_NTABLE, it is NULL @@ -75,7 +75,7 @@ typedef struct { #define TSDB_TABLE_CACHE_DATA(pTable) ((pTable)->content.pData) #define TSDB_SUPER_TABLE_INDEX(pTable) ((pTable)->content.pIndex) -SVSchema *tsdbGetTableSchema(STable *pTable); +SSchema *tsdbGetTableSchema(STable *pTable); // ---- Operation on SMetaHandle #define TSDB_NUM_OF_TABLES(pHandle) ((pHandle)->numOfTables) diff --git a/src/vnode/tsdb/src/tsdb.c b/src/vnode/tsdb/src/tsdb.c index d0087945240e8ac83befb1a5fb45a7a4c3aa41ed..7e13e3183ad8ab33e9f975356359fd7d247f069c 100644 --- a/src/vnode/tsdb/src/tsdb.c +++ b/src/vnode/tsdb/src/tsdb.c @@ -2,14 +2,15 @@ #include #include -#include "taosdef.h" +// #include "taosdef.h" // #include "disk.h" +#include "tsdb.h" #include "tsdbCache.h" #include "tsdbMeta.h" typedef struct STSDBRepo { // TSDB configuration - STSDBcfg *pCfg; + STSDBCfg *pCfg; // The meter meta handle of this TSDB repository SMetaHandle *pMetaHandle; @@ -18,12 +19,12 @@ typedef struct STSDBRepo { SCacheHandle *pCacheHandle; // Disk tier handle for multi-tier storage - SDiskTier *pDiskTier; + void *pDiskTier; // File Store void *pFileStore; - pthread_mutext_t tsdbMutex; + pthread_mutex_t tsdbMutex; } STSDBRepo; diff --git a/src/vnode/tsdb/src/tsdbFileStore.c b/src/vnode/tsdb/src/tsdbFileStore.c index f6cc959f8f6dd24fb9ef4d65c23e5d8debf97929..a47f2eb1e48c85b47ab4e5adce262074d03942d6 100644 --- a/src/vnode/tsdb/src/tsdbFileStore.c +++ b/src/vnode/tsdb/src/tsdbFileStore.c @@ -1,6 +1,6 @@ #include "tsdbFile.h" char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type){ - char *suffix = tsdbFileSuffix[type]; + // char *suffix = tsdbFileSuffix[type]; // TODO } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 5bc82dfa103056e58abe3746460503d85b8e824c..9fad7b126931f40ccae7a3c3bf001e933c4d9115 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -1,6 +1,7 @@ #include -#include "taosdef.h" +// #include "taosdef.h" +#include "tsdb.h" #include "tsdbMeta.h" SMetaHandle *tsdbCreateMetaHandle(int32_t numOfTables) { @@ -11,7 +12,7 @@ SMetaHandle *tsdbCreateMetaHandle(int32_t numOfTables) { pMetahandle->numOfTables = 0; pMetahandle->numOfSuperTables = 0; - pMetahandle->pTables = calloc(sizeof(STable *) * numOfTables); + pMetahandle->pTables = calloc(sizeof(STable *), numOfTables); if (pMetahandle->pTables == NULL) { free(pMetahandle); return NULL;