提交 15e4b5ca 编写于 作者: S slguan

Merge branch '2.0' of https://github.com/taosdata/TDengine into 2.0

......@@ -33,83 +33,42 @@ extern "C" {
#define TAOS_SOCKET_TYPE_NAME_TCP "tcp"
#define TAOS_SOCKET_TYPE_NAME_UDP "udp"
#define TAOS_ID_ASSIGNED 0
#define TAOS_ID_FREE 1
#define TAOS_ID_REALLOCATE 2
#define TAOS_CONN_SOCKET_TYPE_S() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPS:TAOS_CONN_TCPS)
#define TAOS_CONN_SOCKET_TYPE_C() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPC:TAOS_CONN_TCPC)
#define taosSendMsgToPeer(x, y, z) taosSendMsgToPeerH(x, y, z, NULL)
#define taosOpenRpcChann(x, y, z) taosOpenRpcChannWithQ(x,y,z,NULL)
#define taosBuildReqMsg(x, y) taosBuildReqMsgWithSize(x, y, 512)
#define taosBuildRspMsg(x, y) taosBuildRspMsgWithSize(x, y, 512)
extern int tsRpcHeadSize;
typedef struct {
char *localIp; // local IP used
uint16_t localPort; // local port
char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections
void *(*fp)(char *, void *, void *); // function to process the incoming msg
void *qhandle; // queue handle
int bits; // number of bits for sessionId
int numOfChanns; // number of channels
int sessionsPerChann; // number of sessions per channel
int idMgmt; // TAOS_ID_ASSIGNED, TAOS_ID_FREE
void *(*fp)(char type, char *pCont, int contLen, void *handle, int index); // function to process the incoming msg
int sessions; // number of sessions allowed
int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled
int noFree; // not free buffer
void (*efp)(int cid); // call back function to process not activated chann
int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret,
uint8_t *ckey); // call back to retrieve auth info
} SRpcInit;
typedef struct {
int cid; // channel ID
int sid; // session ID
char * meterId; // meter ID
uint32_t peerId; // peer link ID
void * shandle; // pointer returned by taosOpenRpc
void * ahandle; // handle provided by app
char * peerIp; // peer IP string
uint16_t peerPort; // peer port
char *meterId; // meter ID
char spi; // security parameter index
char encrypt; // encrypt algorithm
char * secret; // key for authentication
char * ckey; // ciphering key
} SRpcConnInit;
extern int tsRpcHeadSize;
void *taosOpenRpc(SRpcInit *pRpc);
void taosCloseRpc(void *);
int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle);
void taosCloseRpcChann(void *handle, int cid);
void *taosOpenRpcConn(SRpcConnInit *pInit, uint8_t *code);
void taosCloseRpcConn(void *thandle);
void taosStopRpcConn(void *thandle);
int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle);
char *taosBuildReqHeader(void *param, char type, char *msg);
char *taosBuildReqMsgWithSize(void *, char type, int size);
char *taosBuildRspMsgWithSize(void *, char type, int size);
int taosSendSimpleRsp(void *thandle, char rsptype, char code);
int taosSetSecurityInfo(int cid, int sid, char *id, int spi, int encrypt, char *secret, char *ckey);
char *secret; // key for authentication
char *ckey; // ciphering key
int (*afp) (char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // call back to retrieve auth info
} SRpcInit;
void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, uint16_t *peerPort, int *cid, int *sid);
typedef struct {
int16_t index;
int16_t numOfIps;
uint32_t ip[TSDB_MAX_REPLICA];
} SRpcIpSet;
void *rpcOpen(SRpcInit *pRpc);
void rpcClose(void *);
char *rpcMallocCont(int contLen);
void rpcFreeCont(char *pCont);
void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle);
void rpcSendResponse(void *pConn, void *pCont, int contLen);
void rpcSendSimpleRsp(void *pConn, int32_t code);
int taosGetOutType(void *thandle);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCCACHE_H
#define TDENGINE_TSCCACHE_H
#ifdef __cplusplus
extern "C" {
#endif
void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer);
void taosCloseConnCache(void *handle);
void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user);
void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCACHE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "tmempool.h"
#include "tsclient.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
typedef struct _c_hash_t {
uint32_t ip;
uint16_t port;
struct _c_hash_t *prev;
struct _c_hash_t *next;
void * data;
uint64_t time;
} SConnHash;
typedef struct {
SConnHash ** connHashList;
mpool_h connHashMemPool;
int maxSessions;
int total;
int * count;
int64_t keepTimer;
pthread_mutex_t mutex;
void (*cleanFp)(void *);
void *tmrCtrl;
void *pTimer;
} SConnCache;
int taosHashConn(void *handle, uint32_t ip, uint16_t port, char *user) {
SConnCache *pObj = (SConnCache *)handle;
int hash = 0;
// size_t user_len = strlen(user);
hash = ip >> 16;
hash += (unsigned short)(ip & 0xFFFF);
hash += port;
while (*user != '\0') {
hash += *user;
user++;
}
hash = hash % pObj->maxSessions;
return hash;
}
void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64_t time) {
if (pNode == NULL) return;
if (time < pObj->keepTimer + pNode->time) return;
SConnHash *pPrev = pNode->prev, *pNext;
while (pNode) {
(*pObj->cleanFp)(pNode->data);
pNext = pNode->next;
pObj->total--;
pObj->count[hash]--;
tscTrace("%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode,
pObj->count[hash]);
taosMemPoolFree(pObj->connHashMemPool, (char *)pNode);
pNode = pNext;
}
if (pPrev)
pPrev->next = NULL;
else
pObj->connHashList[hash] = NULL;
}
void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) {
int hash;
SConnHash * pNode;
SConnCache *pObj;
uint64_t time = taosGetTimestampMs();
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL;
if (data == NULL) {
tscTrace("data:%p ip:%p:%d not valid, not added in cache", data, ip, port);
return NULL;
}
hash = taosHashConn(pObj, ip, port, user);
pNode = (SConnHash *)taosMemPoolMalloc(pObj->connHashMemPool);
pNode->ip = ip;
pNode->port = port;
pNode->data = data;
pNode->prev = NULL;
pNode->time = time;
pthread_mutex_lock(&pObj->mutex);
pNode->next = pObj->connHashList[hash];
if (pObj->connHashList[hash] != NULL) (pObj->connHashList[hash])->prev = pNode;
pObj->connHashList[hash] = pNode;
pObj->total++;
pObj->count[hash]++;
taosRemoveExpiredNodes(pObj, pNode->next, hash, time);
pthread_mutex_unlock(&pObj->mutex);
tscTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pObj->count[hash]);
return pObj;
}
void taosCleanConnCache(void *handle, void *tmrId) {
int hash;
SConnHash * pNode;
SConnCache *pObj;
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return;
if (pObj->pTimer != tmrId) return;
uint64_t time = taosGetTimestampMs();
for (hash = 0; hash < pObj->maxSessions; ++hash) {
pthread_mutex_lock(&pObj->mutex);
pNode = pObj->connHashList[hash];
taosRemoveExpiredNodes(pObj, pNode, hash, time);
pthread_mutex_unlock(&pObj->mutex);
}
// tscTrace("timer, total connections in cache:%d", pObj->total);
taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer);
}
void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) {
int hash;
SConnHash * pNode;
SConnCache *pObj;
void * pData = NULL;
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL;
uint64_t time = taosGetTimestampMs();
hash = taosHashConn(pObj, ip, port, user);
pthread_mutex_lock(&pObj->mutex);
pNode = pObj->connHashList[hash];
while (pNode) {
if (time >= pObj->keepTimer + pNode->time) {
taosRemoveExpiredNodes(pObj, pNode, hash, time);
pNode = NULL;
break;
}
if (pNode->ip == ip && pNode->port == port) break;
pNode = pNode->next;
}
if (pNode) {
taosRemoveExpiredNodes(pObj, pNode->next, hash, time);
if (pNode->prev) {
pNode->prev->next = pNode->next;
} else {
pObj->connHashList[hash] = pNode->next;
}
if (pNode->next) {
pNode->next->prev = pNode->prev;
}
pData = pNode->data;
taosMemPoolFree(pObj->connHashMemPool, (char *)pNode);
pObj->total--;
pObj->count[hash]--;
}
pthread_mutex_unlock(&pObj->mutex);
if (pData) {
tscTrace("%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pObj->count[hash]);
}
return pData;
}
void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) {
SConnHash **connHashList;
mpool_h connHashMemPool;
SConnCache *pObj;
connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash));
if (connHashMemPool == 0) return NULL;
connHashList = calloc(sizeof(SConnHash *), maxSessions);
if (connHashList == 0) {
taosMemPoolCleanUp(connHashMemPool);
return NULL;
}
pObj = malloc(sizeof(SConnCache));
if (pObj == NULL) {
taosMemPoolCleanUp(connHashMemPool);
free(connHashList);
return NULL;
}
memset(pObj, 0, sizeof(SConnCache));
pObj->count = calloc(sizeof(int), maxSessions);
pObj->total = 0;
pObj->keepTimer = keepTimer;
pObj->maxSessions = maxSessions;
pObj->connHashMemPool = connHashMemPool;
pObj->connHashList = connHashList;
pObj->cleanFp = cleanFp;
pObj->tmrCtrl = tmrCtrl;
taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer);
pthread_mutex_init(&pObj->mutex, NULL);
return pObj;
}
void taosCloseConnCache(void *handle) {
SConnCache *pObj;
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return;
pthread_mutex_lock(&pObj->mutex);
taosTmrStopA(&(pObj->pTimer));
if (pObj->connHashMemPool) taosMemPoolCleanUp(pObj->connHashMemPool);
tfree(pObj->connHashList);
tfree(pObj->count)
pthread_mutex_unlock(&pObj->mutex);
pthread_mutex_destroy(&pObj->mutex);
memset(pObj, 0, sizeof(SConnCache));
free(pObj);
}
......@@ -21,7 +21,7 @@
#include "tmd5.h"
#include "tmempool.h"
#include "trpc.h"
#include "taosdef.h"
#include "tsdb.h"
#include "tsocket.h"
#include "ttcpclient.h"
#include "ttcpserver.h"
......@@ -31,15 +31,40 @@
#include "tutil.h"
#include "lz4.h"
typedef struct _msg_node {
struct _msg_node *next;
void * ahandle;
int msgLen;
} SMsgNode;
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest))
#define rpcHeaderFromCont(cont) ((STaosHeader *) (cont - sizeof(SRpcHeader)))
#define rpcContFromHeader(msg) ( msg + sizeof(SRpcHeader))
#define rpcMsgLenFromCont(contLen) ( contLen + sizeof(SRpcHeader))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHeader))
#define rpcIsReq(type) (type & 1U)
typedef struct {
int sessions;
int numOfThreads;
int type;
int idleTime; // milliseconds;
uint16_t localPort;
char label[12];
char *meterId; // meter ID
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
char *ckey; // ciphering key
void *(*fp)(char *, void *ahandle, void *thandle); // FP to call the application
int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // FP to retrieve auth info
SRpcConn *connList;
void *idPool;
void *tmrCtrl;
void *hash;
void *shandle; // returned handle from lower layer during initialization
void *pCache; // connection cache
pthread_mutex_t mutex;
} SRpcInfo;
typedef struct {
void * signature;
int chann; // channel ID
void *signature;
int sid; // session ID
uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID
......@@ -58,51 +83,54 @@ typedef struct {
uint16_t inTranId;
uint8_t outType;
char inType;
char closing;
char rspReceived;
void * chandle; // handle passed by TCP/UDP connection layer
void * ahandle; // handle returned by upper app layter
void *chandle; // handle passed by TCP/UDP connection layer
void *ahandle; // handle provided by upper app layter
int retry;
int tretry; // total retry
void * pTimer;
void * pIdleTimer;
char * pRspMsg;
char * pQuickRsp;
void *pTimer;
void *pIdleTimer;
char *pRspMsg; // including header
int rspMsgLen;
SMsgNode * pMsgNode;
SMsgNode * pHead, *pTail;
struct rpc_server *pServer;
char *pReqMsg; // including header
int reqMsgLen;
SRpcInfo *pRpc;
} SRpcConn;
typedef struct {
int sessions;
void * qhandle; // for scheduler
SRpcConn * connList;
void * idPool;
void * tmrCtrl;
void * hash;
pthread_mutex_t mutex;
} SRpcChann;
SRpcIpSet ipSet;
void *ahandle;
SRpcInfo *pRpc;
char type;
char *pCont;
int contLen;
int numOfRetry;
char msg[];
} SRpcReqContext;
typedef struct rpc_server {
void *shandle; // returned handle from lower layer during initialization
void *qhandle; // for scheduler
int bits; // number of bits for session ID
int mask;
int numOfChanns;
int numOfThreads;
int idMgmt; // ID management method
int type;
int idleTime; // milliseconds;
int noFree; // do not free the request msg when rsp is received
int index; // for UDP server, next thread for new connection
uint16_t localPort;
char label[12];
void *(*fp)(char *, void *ahandle, void *thandle);
void (*efp)(int); // FP to report error
int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // FP to retrieve auth info
SRpcChann *channList;
} STaosRpc;
typedef struct {
char version : 4;
char comp : 4;
char tcp : 2;
char spi : 3;
char encrypt : 3;
uint16_t tranId;
uint32_t uid; // for unique ID inside a client
uint32_t sourceId;
uint32_t destId;
uint32_t destIp;
char meterId[TSDB_UNI_LEN];
uint16_t port; // for UDP only
char empty[1];
uint8_t msgType;
int32_t msgLen;
uint8_t content[0];
} SRpcHeader;
typedef struct {
uint32_t timeStamp;
uint8_t auth[TSDB_AUTH_LEN];
} SRpcDigest;
int tsRpcProgressTime = 10; // milliseocnds
......@@ -111,13 +139,25 @@ int tsRpcMaxRetry;
int tsRpcHeadSize;
void *(*taosInitConn[])(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
taosInitUdpServer, taosInitUdpClient, taosInitTcpServer, taosInitTcpClient};
taosInitUdpServer,
taosInitUdpClient,
taosInitTcpServer,
taosInitTcpClient
};
void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer,
taosCleanUpTcpClient};
void (*taosCleanUpConn[])(void *thandle) = {
taosCleanUpUdpConnection,
taosCleanUpUdpConnection,
taosCleanUpTcpServer,
taosCleanUpTcpClient
};
int (*taosSendData[])(uint32_t ip, uint16_t port, char *data, int len, void *chandle) = {
taosSendUdpData, taosSendUdpData, taosSendTcpServerData, taosSendTcpClientData};
taosSendUdpData,
taosSendUdpData,
taosSendTcpServerData,
taosSendTcpClientData
};
void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = {
taosOpenUdpConnection,
......@@ -126,19 +166,22 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) =
taosOpenTcpClientConnection,
};
void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpServerConnection, taosCloseTcpClientConnection};
void (*taosCloseConn[])(void *chandle) = {
NULL,
NULL,
taosCloseTcpServerConnection,
taosCloseTcpClientConnection
};
int taosReSendRspToPeer(SRpcConn *pConn);
void taosProcessTaosTimer(void *, void *);
void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle,
void *chandle);
int taosSendDataToPeer(SRpcConn *pConn, char *data, int dataLen);
void taosProcessSchedMsg(SSchedMsg *pMsg);
int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
int rpcResendRspToPeer(SRpcConn *pConn);
void rpcProcessRetryTimer(void *, void *);
void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle);
int rpcSendDataToPeer(SRpcConn *pConn, char *data, int dataLen);
int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
static int32_t taosCompressRpcMsg(char* pCont, int32_t contLen) {
STaosHeader* pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
SRpcHeader* pHeader = rpcHeaderFromCont(pCont);
int32_t overhead = sizeof(int32_t) * 2;
int32_t finalLen = 0;
......@@ -146,7 +189,7 @@ static int32_t taosCompressRpcMsg(char* pCont, int32_t contLen) {
return contLen;
}
char *buf = malloc (contLen + overhead + 8); // 16 extra bytes
char *buf = malloc (contLen + overhead+8); // 16 extra bytes
if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno));
return contLen;
......@@ -181,131 +224,97 @@ static int32_t taosCompressRpcMsg(char* pCont, int32_t contLen) {
return finalLen;
}
static STaosHeader* taosDecompressRpcMsg(STaosHeader* pHeader, SSchedMsg* pSchedMsg, int32_t msgLen) {
static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) {
int overhead = sizeof(int32_t) * 2;
SRpcHeader *pNewHeader = NULL;
char *pCont = pHeader->content;
if (pHeader->comp == 0) {
pSchedMsg->msg = (char *)(&(pHeader->destId));
return pHeader;
}
if (pHeader->comp) {
// decompress the content
assert(GET_INT32_VAL(pHeader->content) == 0);
// contLen is original message length before compression applied
int contLen = htonl(GET_INT32_VAL(pHeader->content + sizeof(int32_t)));
int contLen = htonl(GET_INT32_VAL(pCont + sizeof(int32_t)));
// prepare the temporary buffer to decompress message
char *buf = malloc(sizeof(STaosHeader) + contLen);
//tDump(pHeader->content, msgLen);
char *buf = rpcMallocCont(contLen);
if (buf) {
int32_t originalLen = LZ4_decompress_safe((const char*)(pHeader->content + overhead), buf + sizeof(STaosHeader),
msgLen - overhead, contLen);
memcpy(buf, pHeader, sizeof(STaosHeader));
free(pHeader); // free the compressed message buffer
STaosHeader* pNewHeader = (STaosHeader *) buf;
pNewHeader->msgLen = originalLen + (int) sizeof(SIntMsg);
pNewHeader = rpcHeaderFromCont(buf);
int compLen = rpcContLenFromMsg(pHeader->msgLen) - overhead;
int32_t originalLen = LZ4_decompress_safe((const char*)(pCont + overhead), buf, compLen, contLen);
assert(originalLen == contLen);
pSchedMsg->msg = (char *)(&(pNewHeader->destId));
//tDump(pHeader->content, contLen);
return pNewHeader;
memcpy(pNewHeader, pHeader, sizeof(SRpcHeader));
pNewHeader->msgLen = rpcMsgLenFromCont(originalLen);
free(pHeader); // free the compressed message buffer
pHeader = pNewHeader;
} else {
tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno));
pSchedMsg->msg = NULL;
}
}
return NULL;
return pHeader;
}
char *taosBuildReqHeader(void *param, char type, char *msg) {
STaosHeader *pHeader;
SRpcConn * pConn = (SRpcConn *)param;
char *rpcMallocCont(int size) {
char *pMsg = NULL;
if (pConn == NULL || pConn->signature != pConn) {
tError("pConn:%p, connection has to be openned first before building a message", pConn);
size += RPC_MSG_OVERHEAD;
pMsg = (char *)calloc(1, (size_t)size);
if (pMsg == NULL) {
tError("failed to malloc msg, size:%d", size);
return NULL;
}
pHeader = (STaosHeader *)(msg + sizeof(SMsgNode));
memset(pHeader, 0, sizeof(STaosHeader));
pHeader->version = 1;
pHeader->comp = 0;
pHeader->msgType = type;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pHeader->tranId = atomic_add_fetch_16(&pConn->tranId, 1);
if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_16(&pConn->tranId, 1);
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pHeader->port = 0;
pHeader->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHeader);
}
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
void rpcFreeCont(char *cont) {
char *msg = cont - sizeof(SRpcHeader);
free(msg);
}
return (char *)pHeader->content;
static void rpcFreeMsg(char *msg) {
msg -= sizeof(SRpcReqContext);
free(msg);
}
char *taosBuildReqMsgWithSize(void *param, char type, int size) {
STaosHeader *pHeader;
char * pMsg;
SRpcConn * pConn = (SRpcConn *)param;
void rpcSendSimpleRsp(void *thandle, int_32 code) {
char *pMsg;
STaosRsp *pRsp;
int msgLen = sizeof(STaosRsp);
if (pConn == NULL || pConn->signature != pConn) {
tError("pConn:%p, connection has to be openned first before building a message", pConn);
return NULL;
if (thandle == NULL) {
tError("connection is gone, response could not be sent");
return;
}
size += sizeof(SMsgNode) + sizeof(STaosHeader) + sizeof(STaosDigest);
pMsg = (char *)malloc((size_t)size);
memset(pMsg, 0, (size_t)size);
pHeader = (STaosHeader *)(pMsg + sizeof(SMsgNode));
pHeader->version = 1;
pHeader->msgType = type;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1);
if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1);
pMsg = rpcMallocCont(msgLen);
if (pMsg == NULL) return;
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pRsp = (STaosRsp *)pMsg;
pRsp->code = htonl(code);
pHeader->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
taosSendResponse(thandle, pMsg, msgLen);
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
return (char *)pHeader->content;
return;
}
char *taosBuildRspMsgWithSize(void *param, char type, int size) {
STaosHeader *pHeader;
char * pMsg;
SRpcConn * pConn = (SRpcConn *)param;
if (pConn == NULL || pConn->signature != pConn) {
tError("pConn:%p, connection has to be opened first before building a message", pConn);
return NULL;
}
static void rpcSendQuickRsp(SRpcConn *pConn, char code) {
char msg[RPC_MSG_OVERHEAD + sizeof(STaosRsp)];
SRpcHeader *pHeader;
int msgLen;
size += sizeof(SMsgNode) + sizeof(STaosHeader) + sizeof(STaosDigest);
pMsg = (char *)malloc((size_t)size);
if (pMsg == NULL) {
tError("pConn:%p, malloc(%d) failed when building a type:%d message", pConn, size, type);
return NULL;
}
pRsp = (STaosRsp *)rpcContFromHeader(msg);
pRsp->code = htonl(code);
msgLen = sizeof(STaosRsp);
memset(pMsg, 0, (size_t)size);
pHeader = (STaosHeader *)pMsg;
// set msg header
memset(msg, 0, sizeof(SRpcHeader));
pHeader = (SRpcHeader *)msg;
pHeader->version = 1;
pHeader->msgType = type;
pHeader->msgType = pConn->inType+1;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
......@@ -315,334 +324,240 @@ char *taosBuildRspMsgWithSize(void *param, char type, int size) {
pHeader->uid = 0;
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
return (char *)pHeader->content;
}
int taosSendSimpleRsp(void *thandle, char rsptype, char code) {
char *pMsg, *pStart;
int msgLen;
if (thandle == NULL) {
tError("connection is gone, response could not be sent");
return -1;
}
pStart = taosBuildRspMsgWithSize(thandle, rsptype, 32);
if (pStart == NULL) {
tError("build rsp msg error, return null prt");
return -1;
}
pMsg = pStart;
*pMsg = code;
pMsg++;
msgLen = (int)(pMsg - pStart);
taosSendMsgToPeer(thandle, pStart, msgLen);
return msgLen;
rpcSendDataToPeer(pConn, (char *)msg, msgLen);
}
int taosSendQuickRsp(void *thandle, char rsptype, char code) {
char * pCont;
int contLen;
STaosHeader *pHeader;
char * msg;
int msgLen;
SRpcConn * pConn = (SRpcConn *)thandle;
void *rpcOpen(SRpcInit *pInit) {
SRpcInfo *pRpc;
pCont = taosBuildRspMsgWithSize(thandle, rsptype, 32);
if (pCont == NULL) return 0;
*pCont = code;
contLen = 1;
tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime;
tsRpcHeadSize = RPC_MSG_OVERHEAD;
pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
msg = (char *)pHeader;
msgLen = contLen + (int32_t)sizeof(STaosHeader);
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL;
if (pConn->spi) {
// add auth part
pHeader->spi = pConn->spi;
STaosDigest *pDigest = (STaosDigest *)(pCont + contLen);
pDigest->timeStamp = htonl(taosGetTimestampSec());
msgLen += sizeof(STaosDigest);
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
taosBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
} else {
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
strcpy(pRpc->label, pInit->label);
pRpc->fp = pInit->fp;
pRpc->type = pInit->connType;
pRpc->idleTime = pInit->idleTime;
pRpc->numOfThreads = pInit->numOfThreads;
if (pRpc->numOfThreads > TSDB_MAX_RPC_THREADS) {
pRpc->numOfThreads = TSDB_MAX_RPC_THREADS;
}
tfree(pConn->pQuickRsp);
pConn->pQuickRsp = msg;
taosSendDataToPeer(pConn, (char *)pHeader, msgLen);
return msgLen;
}
void *taosOpenRpc(SRpcInit *pRpc) {
STaosRpc *pServer;
pRpc->localPort = pInit->localPort;
pRpc->afp = pInit->afp;
pRpc->sessions = pInit->session;
strcpy(pRpc->meterId, pInit->meterId);
pRpc->spi = pInit->spi;
strcpy(pRpc->secret, pInit->secret);
strcpy(pRpc->ckey, pInit->ckey);
pRpc->afp = pInit->afp;
pRpc->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->numOfThreads,
taosProcessDataFromPeer, pRpc);
if (pRpc->shandle == NULL) {
tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort);
taosCloseRpc(pRpc);
return NULL;
}
tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime;
tsRpcHeadSize = sizeof(STaosHeader) + sizeof(SMsgNode);
pServer = (STaosRpc *)malloc(sizeof(STaosRpc));
if (pServer == NULL) return NULL;
memset(pServer, 0, sizeof(STaosRpc));
pServer->bits = pRpc->bits;
pServer->mask = (1 << (pRpc->bits)) - 1;
pServer->numOfChanns = pRpc->numOfChanns;
strcpy(pServer->label, pRpc->label);
pServer->fp = pRpc->fp;
pServer->idMgmt = pRpc->idMgmt;
pServer->type = pRpc->connType;
pServer->idleTime = pRpc->idleTime;
pServer->noFree = pRpc->noFree;
pServer->numOfThreads = pRpc->numOfThreads;
if (pServer->numOfThreads > TSDB_MAX_RPC_THREADS) {
pServer->numOfThreads = TSDB_MAX_RPC_THREADS;
pRpc->numOfThreads = TSDB_MAX_RPC_THREADS;
size_t size = sizeof(SRpcConn) * sessions;
pRpc->connList = (SRpcConn *)calloc(1, size);
if (pRpc->connList == NULL) {
tError("%s failed to allocate memory for taos connections, size:%d", pRpc->label, size);
taosCloseRpc(pRpc);
return NULL;
}
pServer->localPort = pRpc->localPort;
pServer->qhandle = pRpc->qhandle;
pServer->efp = pRpc->efp;
pServer->afp = pRpc->afp;
int size = (int)sizeof(SRpcChann) * pRpc->numOfChanns;
pServer->channList = (SRpcChann *)malloc((size_t)size);
if (pServer->channList == NULL) {
tError("%s, failed to malloc channList", pRpc->label);
tfree(pServer);
pRpc->idPool = taosInitIdPool(sessions);
if (pRpc->idPool == NULL) {
tError("%s failed to init ID pool", pRpc->label);
taosCloseRpc(pRpc);
return NULL;
}
memset(pServer->channList, 0, (size_t)size);
pServer->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->numOfThreads,
taosProcessDataFromPeer, pServer);
if (pServer->shandle == NULL) {
tError("%s, failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort);
taosCloseRpc(pServer);
pRpc->tmrCtrl = taosTmrInit(sessions * 2 + 1, 50, 10000, pRpc->label);
if (pRpc->tmrCtrl == NULL) {
tError("%s failed to init timers", pRpc->label);
taosCloseRpc(pRpc);
return NULL;
}
if (pServer->numOfChanns == 1) {
int retVal = taosOpenRpcChann(pServer, 0, pRpc->sessionsPerChann);
if (0 != retVal) {
tError("%s, failed to open rpc chann", pRpc->label);
taosCloseRpc(pServer);
pRpc->hash = taosInitStrHash(sessions, sizeof(pRpc), taosHashString);
if (pRpc->hash == NULL) {
tError("%s failed to init string hash", pRpc->label);
taosCloseRpc(pRpc);
return NULL;
}
pRpc->pCahche = taosOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000);
if ( pRpc->pCache == NULL ) {
tError("%s failed to init connection cache", pRpc->label);
taosCloseRpc(pRpc);
return NULL;
}
pthread_mutex_init(&pRpc->mutex, NULL);
tTrace("%s RPC is openned, numOfThreads:%d", pRpc->label, pRpc->numOfThreads);
return pServer;
return pRpc;
}
int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle) {
STaosRpc * pServer = (STaosRpc *)handle;
SRpcChann *pChann;
tTrace("cid:%d, handle:%p open rpc chann", cid, handle);
void rpcClose(void *param) {
SRpcInfo *pRpc = (SRpcInfo *)param;
if (pServer == NULL) return -1;
if (cid >= pServer->numOfChanns || cid < 0) {
tError("%s: cid:%d, chann is out of range, max:%d", pServer->label, cid, pServer->numOfChanns);
return -1;
}
pChann = pServer->channList + cid;
memset(pChann, 0, sizeof(SRpcChann));
size_t size = sizeof(SRpcConn) * sessions;
pChann->connList = (SRpcConn *)calloc(1, size);
if (pChann->connList == NULL) {
tError("%s cid:%d, failed to allocate memory for taos connections, size:%d", pServer->label, cid, size);
return -1;
}
if (pServer->idMgmt == TAOS_ID_FREE) {
pChann->idPool = taosInitIdPool(sessions);
if (pChann->idPool == NULL) {
tError("%s cid:%d, failed to init ID pool", pServer->label, cid);
return -1;
}
}
(*taosCleanUpConn[pRpc->type])(pRpc->shandle);
pChann->tmrCtrl = taosTmrInit(sessions * 2 + 1, 50, 10000, pServer->label);
if (pChann->tmrCtrl == NULL) {
tError("%s cid:%d, failed to init timers", pServer->label, cid);
return -1;
for (int i = 0; i < pRpc->sessions; ++i) {
if (pRpc->connList[i].signature != NULL) {
taosCloseRpcConn((void *)(pRpc->connList + i));
}
pChann->hash = taosInitStrHash(sessions, sizeof(pChann), taosHashString);
if (pChann->hash == NULL) {
tError("%s cid:%d, failed to init string hash", pServer->label, cid);
return -1;
}
pthread_mutex_init(&pChann->mutex, NULL);
pChann->sessions = sessions;
pChann->qhandle = qhandle ? qhandle : pServer->qhandle;
taosCleanUpStrHash(pRpc->hash);
taosTmrCleanUp(pRpc->tmrCtrl);
taosIdPoolCleanUp(pRpc->idPool);
taosCloseConnCache(pRpc->pCache);
return TSDB_CODE_SUCCESS;
tfree(pRpc->connList);
pthread_mutex_destroy(&pRpc->mutex);
tfree(pRpc);
}
void taosCloseRpcChann(void *handle, int cid) {
STaosRpc * pServer = (STaosRpc *)handle;
SRpcChann *pChann;
tTrace("cid:%d, handle:%p close rpc chann", cid, handle);
static SRpcConn *rpcOpenConn(SRpcConnInit *pInit) {
SRpcConn *pConn;
SRpcInfo *pRpc = (SRpcInfo *)pInit->shandle;
if (pServer == NULL) return;
if (cid >= pServer->numOfChanns || cid < 0) {
tError("%s cid:%d, chann is out of range, max:%d", pServer->label, cid, pServer->numOfChanns);
return;
}
if ( (uint8_t)(rpcGetConn(pInit->sid, pInit->meterId, pRpc, &pConn, 1, NULL)) != 0 )
return NULL;
pChann = pServer->channList + cid;
if (pConn->peerId == 0) pConn->peerId = pRpc->peerId;
strcpy(pConn->peerIpstr, pInit->peerIp);
pConn->peerIp = inet_addr(pInit->peerIp);
pConn->peerPort = pInit->peerPort;
pConn->ahandle = pInit->ahandle;
pConn->spi = pRpc->spi;
pConn->encrypt = pRpc->encrypt;
if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN);
for (int i = 0; i < pChann->sessions; ++i) {
if (pChann->connList[i].signature != NULL) {
taosCloseRpcConn((void *)(pChann->connList + i));
// if it is client, it shall set up connection first
if (taosOpenConn[pRpc->type]) {
pConn->chandle = (*taosOpenConn[pRpc->type])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (pConn->chandle) {
tTrace("%s pConn:%p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label,
pConn, pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort);
} else {
tError("%s pConn:%p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn,
pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort);
terrorno = TSDB_CODE_NETWORK_UNAVAIL;
rpcCloseConn(pConn);
pConn = NULL;
}
}
taosCleanUpStrHash(pChann->hash);
taosTmrCleanUp(pChann->tmrCtrl);
taosIdPoolCleanUp(pChann->idPool);
tfree(pChann->connList);
pthread_mutex_destroy(&pChann->mutex);
memset(pChann, 0, sizeof(SRpcChann));
return pConn;
}
void taosCloseRpcConn(void *thandle) {
static void rpcCloseConn(void *thandle) {
SRpcConn *pConn = (SRpcConn *)thandle;
if (pConn == NULL) return;
STaosRpc *pServer = pConn->pServer;
if (pConn->signature != thandle || pServer == NULL) return;
if (pConn->closing) return;
SRpcChann *pChann = pServer->channList + pConn->chann;
SRpcInfo *pRpc = pConn->pRpc;
if (pConn->signature != thandle || pRpc == NULL) return;
pthread_mutex_lock(&pChann->mutex);
pthread_mutex_lock(&pRpc->mutex);
pConn->closing = 1;
pConn->signature = NULL;
if (taosCloseConn[pServer->type]) (*taosCloseConn[pServer->type])(pConn->chandle);
if (taosCloseConn[pRpc->type]) (*taosCloseConn[pRpc->type])(pConn->chandle);
taosTmrStopA(&pConn->pTimer);
taosTmrStopA(&pConn->pIdleTimer);
tfree(pConn->pRspMsg);
if (pServer->noFree == 0) free(pConn->pMsgNode);
pConn->pMsgNode = NULL;
tfree(pConn->pQuickRsp);
SMsgNode *pMsgNode;
while (pConn->pHead) {
pMsgNode = pConn->pHead;
pConn->pHead = pConn->pHead->next;
memset(pMsgNode, 0, sizeof(SMsgNode));
if (pServer->noFree == 0) free(pMsgNode);
}
rpcFreeMsg(pConn->pRspMsg);
rpcFreeMsg(pConn-pReqMsg);
char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId);
taosDeleteStrHash(pChann->hash, hashstr);
taosDeleteStrHash(pRpc->hash, hashstr);
tTrace("%s cid:%d sid:%d id:%s, TAOS connection closed, pConn:%p", pServer->label, pConn->chann, pConn->sid,
tTrace("%s pConn:%p, TAOS connection closed", pRpc->label, pConn->sid,
pConn->meterId, pConn);
int freeId = pConn->sid;
memset(pConn, 0, sizeof(SRpcConn));
if (pChann->idPool) taosFreeId(pChann->idPool, freeId);
if (pRpc->idPool) taosFreeId(pRpc->idPool, freeId);
pthread_mutex_unlock(&pChann->mutex);
pthread_mutex_unlock(&pRpc->mutex);
}
int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcConn **ppConn, char req, char *hashstr) {
static int rpcGetConn(int sid, char *meterId, SRpcInfo *pRpc, SRpcConn **ppConn, char req, char *hashstr) {
SRpcConn * pConn = NULL;
SRpcChann *pChann;
if (pServer == NULL) return -1;
pChann = pServer->channList + chann;
if (pRpc == NULL) return -1;
if (pServer->idMgmt == TAOS_ID_FREE) {
if (sid == 0) {
if (req) {
int osid = sid;
SRpcConn **ppConn = (SRpcConn **)taosGetStrHashData(pChann->hash, hashstr);
SRpcConn **ppConn = (SRpcConn **)taosGetStrHashData(pRpc->hash, hashstr);
if (ppConn) pConn = *ppConn;
if (pConn == NULL) {
sid = taosAllocateId(pChann->idPool);
sid = taosAllocateId(pRpc->idPool);
if (sid <= 0) {
tError("%s cid:%d, maximum number of sessions:%d is reached", pServer->label, chann, pChann->sessions);
tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
return TSDB_CODE_MAX_SESSIONS;
} else {
tTrace("%s cid:%d sid:%d, ID allocated, used:%d, old id:%d", pServer->label, chann, sid,
taosIdPoolNumOfUsed(pChann->idPool), osid);
tTrace("%s sid:%d, ID allocated, used:%d, old id:%d", pRpc->label, sid,
taosIdPoolNumOfUsed(pRpc->idPool), osid);
}
} else {
sid = pConn->sid;
tTrace("%s cid:%d sid:%d id:%s, session is already there", pServer->label, pConn->chann, pConn->sid,
tTrace("%s sid:%d id:%s, session is already there", pRpc->label, pConn->sid,
pConn->meterId);
}
} else {
return TSDB_CODE_UNEXPECTED_RESPONSE;
}
} else {
if (pChann->connList[sid].signature == NULL) {
tError("%s cid:%d, sid:%d session is already released", pServer->label, chann, sid);
if (pRpc->connList[sid].signature == NULL) {
tError("%s sid:%d session is already released", pRpc->label, sid);
return TSDB_CODE_INVALID_VALUE;
}
}
}
pConn = pChann->connList + sid;
if (pChann == NULL || pChann->connList == NULL) {
tTrace("%s cid:%d sid:%d, connlist is null, received:%s", pServer->label, chann, sid, meterId);
return TSDB_CODE_MISMATCHED_METER_ID;
}
pConn = pRpc->connList + sid;
if (pConn->signature == NULL) {
memset(pConn, 0, sizeof(SRpcConn));
pConn->signature = pConn;
memcpy(pConn->meterId, meterId, tListLen(pConn->meterId));
pConn->pServer = pServer;
pConn->chann = chann;
pConn->pRpc = pRpc;
pConn->sid = sid;
pConn->tranId = (uint16_t)(rand() & 0xFFFF);
pConn->ownId = htonl((uint32_t)((pConn->chann << pServer->bits) + pConn->sid));
if (pServer->afp) {
int ret = (*pServer->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
pConn->ownId = htonl(pConn->sid);
if (pRpc->afp) {
int ret = (*pRpc->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
if (ret != 0) {
tWarn("%s cid:%d sid:%d id:%s, meterId not there pConn:%p", pServer->label, chann, sid, pConn->meterId, pConn);
taosFreeId(pChann->idPool, sid); // sid shall be released
tWarn("%s pConn:%p, meterId not there", pRpc->label, pConn);
taosFreeId(pRpc->idPool, sid); // sid shall be released
memset(pConn, 0, sizeof(SRpcConn));
return ret;
}
}
if ((pServer->type == TAOS_CONN_UDPC || pServer->type == TAOS_CONN_UDPS) && pServer->numOfThreads > 1 &&
pServer->localPort) {
if ((pRpc->type == TAOS_CONN_UDPC || pRpc->type == TAOS_CONN_UDPS) && pRpc->numOfThreads > 1 &&
pRpc->localPort) {
// UDP server, assign to new connection
pServer->index = (pServer->index + 1) % pServer->numOfThreads;
pConn->localPort = (int16_t)(pServer->localPort + pServer->index);
pRpc->index = (pRpc->index + 1) % pRpc->numOfThreads;
pConn->localPort = (int16_t)(pRpc->localPort + pRpc->index);
}
taosAddStrHash(pChann->hash, hashstr, (char *)&pConn);
tTrace("%s cid:%d sid:%d id:%s, TAOS connection is allocated, localPort:%d pConn:%p", pServer->label, chann, sid,
pConn->meterId, pConn->localPort, pConn);
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
tTrace("%s pConn:%p, TAOS connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid);
} else {
if (memcmp(pConn->meterId, meterId, tListLen(pConn->meterId)) != 0) {
tTrace("%s cid:%d sid:%d id:%s, meterId is not matched, received:%s", pServer->label, chann, sid, pConn->meterId,
meterId);
tTrace("%s pConn:%p, meterId:%s is not matched, received:%s", pRpc->label, pConn, pConn->meterId, meterId);
return TSDB_CODE_MISMATCHED_METER_ID;
}
}
......@@ -652,215 +567,159 @@ int taosGetRpcConn(int chann, int sid, char *meterId, STaosRpc *pServer, SRpcCon
return TSDB_CODE_SUCCESS;
}
void *taosOpenRpcConn(SRpcConnInit *pInit, uint8_t *code) {
SRpcConn *pConn;
STaosRpc *pServer = (STaosRpc *)pInit->shandle;
*code = (uint8_t)(taosGetRpcConn(pInit->cid, pInit->sid, pInit->meterId, pServer, &pConn, 1, NULL));
if (*code == TSDB_CODE_MAX_SESSIONS) *code = TSDB_CODE_MAX_CONNECTIONS;
if (*code != TSDB_CODE_SUCCESS) return NULL;
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHeader *pHeader = (SRpcHeader *)msg;
SRpcInfo *pRpc = pConn->pRpc;
int code = 0;
if (pConn->peerId == 0) pConn->peerId = pInit->peerId;
if (pConn->spi == 0 ) return 0;
strcpy(pConn->peerIpstr, pInit->peerIp);
pConn->peerIp = inet_addr(pInit->peerIp);
pConn->peerPort = pInit->peerPort;
pConn->ahandle = pInit->ahandle;
pConn->spi = pInit->spi;
pConn->encrypt = pInit->encrypt;
if (pConn->spi) memcpy(pConn->secret, pInit->secret, TSDB_KEY_LEN);
if (pHeader->spi == pConn->spi) {
// authentication
SRpcDigest *pDigest = (SRpcDigest *)((char *)pHeader + msgLen - sizeof(SRpcDigest));
// if it is client, it shall set up connection first
if (taosOpenConn[pServer->type]) {
pConn->chandle = (*taosOpenConn[pServer->type])(pServer->shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (pConn->chandle) {
tTrace("%s cid:%d sid:%d id:%s, nw connection is set up, ip:%s:%hu localPort:%d pConn:%p", pServer->label,
pConn->chann, pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort, pConn);
int32_t delta;
delta = (int32_t)htonl(pDigest->timeStamp);
delta -= (int32_t)taosGetTimestampSec();
if (abs(delta) > 900) {
tWarn("%s pConn:%p, time diff:%d is too big, msg discarded, timestamp:%d", pRpc->label, pConn,
delta, htonl(pDigest->timeStamp));
code = TSDB_CODE_INVALID_TIME_STAMP;
} else {
tError("%s cid:%d sid:%d id:%s, failed to set up nw connection to ip:%s:%hu", pServer->label, pConn->chann,
pConn->sid, pInit->meterId, pConn->peerIpstr, pConn->peerPort);
*code = TSDB_CODE_NETWORK_UNAVAIL;
taosCloseRpcConn(pConn);
pConn = NULL;
if (rpcAuthenticateMsg((uint8_t *)pHeader, dataLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
char ipstr[24];
tinet_ntoa(ipstr, ip);
mLError("id:%s from %s, authentication failed", pHeader->meterId, ipstr);
tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE;
} else {
pHeader->msgLen -= sizeof(SRpcDigest);
}
}
return pConn;
}
void taosCloseRpc(void *param) {
STaosRpc *pServer = (STaosRpc *)param;
(*taosCleanUpConn[pServer->type])(pServer->shandle);
for (int cid = 0; cid < pServer->numOfChanns; ++cid) taosCloseRpcChann(pServer, cid);
tfree(pServer->channList);
tfree(pServer);
}
int taosSetSecurityInfo(int chann, int sid, char *id, int spi, int encrypt, char *secret, char *ckey) {
/*
SRpcConn *pConn;
pConn = connList[chann*tsSessionsPerChann + sid];
if ( pConn == NULL ) {
pConn = (SRpcConn *)sizeof(SRpcConn);
if ( pConn == NULL ) {
tError("failed to allocate memory for taosConn");
return -1;
} else {
// if it is request or response with code 0, msg shall be discarded
if (rpcIsReq(pHeader->msgType) || (pHeader->content[0] == 0)) {
tTrace("%s pConn:%p, auth spi not matched, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE;
}
memset(pConn, 0, sizeof(SRpcConn));
pConn->chann = chann;
pConn->sid = sid;
}
pConn->spi = spi;
pConn->encrypt = encrypt;
memcpy(pConn->secret, pConn->secret, TSDB_KEY_LEN);
memcpy(pConn->cipheringKey, ckey, TSDB_KEY_LEN);
memcpy(pConn->meterId, id, TSDB_TABLE_ID_LEN);
*/
return -1;
return code;
}
int taosSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) {
int writtenLen = 0;
STaosRpc * pServer = pConn->pServer;
STaosHeader *pHeader = (STaosHeader *)data;
static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
int code = 0;
if (pConn->signature != pConn || pServer == NULL) return -1;
if (pConn->peerId == 0) {
pConn->peerId = pHeader->sourceId;
} else {
if (pConn->peerId != pHeader->sourceId) {
tTrace("%s pConn:%p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn,
pConn->peerId, pHeader->sourceId);
return TSDB_CODE_INVALID_VALUE;
}
}
if (pHeader->msgType & 1) {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace("%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d pConn:%p",
pServer->label, pConn->chann, pConn->sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->peerIpstr,
pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId, pConn);
if (pConn->inTranId == pHeader->tranId) {
if (pConn->inType == pHeader->msgType) {
tTrace("%s pConn:%p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHeader->msgType]);
taosSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
} else if (pConn->inType == 0) {
tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn,
taosMsg[pHeader->msgType], pConn->inTranId);
rpcResendRspToPeer(pConn);
} else {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace(
"%s cid:%d sid:%d id:%s, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d pConn:%p",
pServer->label, pConn->chann, pConn->sid, pConn->meterId, taosMsg[pHeader->msgType], pConn->peerIpstr,
pConn->peerPort, (uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId,
pConn);
tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHeader->msgType]);
}
writtenLen = (*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle);
// do not reply any message
return TSDB_CODE_ALREADY_PROCESSED;
}
if (writtenLen != dataLen)
tError("%s cid:%d sid:%d id:%s, dataLen:%d writtenLen:%d, not good, reason:%s", pServer->label, pConn->chann,
pConn->sid, pConn->meterId, dataLen, writtenLen, strerror(errno));
// assert ( writtenLen == dataLen );
tDump(data, dataLen);
if (pConn->inType != 0) {
tTrace("%s pConn:%p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn,
pConn->inTranId, pHeader->tranId);
return TSDB_CODE_LAST_SESSION_NOT_FINISHED;
}
pConn->inTranId = pHeader->tranId;
pConn->inType = pHeader->msgType;
return 0;
}
void taosProcessResponse(SRpcConn *pConn) {
STaosHeader *pHeader;
char * msg = NULL;
int msgLen = 0;
static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
pConn->peerId = pHeader->sourceId;
if (pConn == NULL) return;
STaosRpc *pServer = pConn->pServer;
if (pConn->signature != pConn || pServer == NULL) return;
SRpcChann *pChann = pServer->channList + pConn->chann;
if (pConn->outType == 0 || pConn->pContext == NULL) {
return TSDB_CODE_UNEXPECTED_RESPONSE;
}
pthread_mutex_lock(&pChann->mutex);
if (pHeader->tranId != pConn->outTranId) {
return TSDB_CODE_INVALID_TRAN_ID;
}
pConn->outType = 0;
pConn->rspReceived = 0;
if (pServer->noFree == 0) tfree(pConn->pMsgNode);
pConn->pMsgNode = NULL;
if (pConn->pHead) {
SMsgNode *pMsgNode = pConn->pHead;
// assert ( pMsgNode->msgLen >= sizeof(STaosHeader) && pMsgNode->msgLen < RPC_MAX_UDP_SIZE);
if (pMsgNode->msgLen >= sizeof(STaosHeader)) {
pConn->pMsgNode = pMsgNode;
pConn->pHead = pMsgNode->next;
if (pMsgNode->ahandle) pConn->ahandle = pMsgNode->ahandle;
if (pHeader->msgType != pConn->outType + 1) {
return TSDB_CODE_INVALID_RESPONSE_TYPE;
}
pHeader = (STaosHeader *)((char *)pMsgNode + sizeof(SMsgNode));
pConn->outType = pHeader->msgType;
pConn->outTranId = pHeader->tranId;
if (*pHeader->content == TSDB_CODE_NOT_READY) {
return = TSDB_CODE_ALREADY_PROCESSED;
}
msg = (char *)pHeader;
msgLen = pMsgNode->msgLen;
taosTmrStopA(&pConn->pTimer);
pConn->retry = 0;
if (*pHeader->content == TSDB_CODE_ACTION_IN_PROGRESS || pHeader->tcp) {
if (pConn->tretry <= tsRpcMaxRetry) {
tTrace("%s pConn:%p, peer is still processing the transaction", pRpc->label, pConn);
pConn->tretry++;
taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer);
return TSDB_CODE_ALREADY_PROCESSED;
} else {
tError("%s cid:%d sid:%d id:%s, invalid msgLen:%d pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pMsgNode->msgLen, pConn);
pConn->pHead = NULL;
}
if (pConn->pHead == NULL) pConn->pTail = NULL;
// peer still in processing, give up
*pHeader->content = TSDB_CODE_TOO_SLOW;
}
if (msg) {
taosSendDataToPeer(pConn, msg, msgLen);
taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer);
}
pthread_mutex_unlock(&pChann->mutex);
pConn->tretry = 0;
pConn->outType = 0;
pConn->pReqMsg = NULL;
pConn->pReqMsgLen = 0;
}
int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pServer, int dataLen, uint32_t ip,
static int rpcProcessHeader(SRpcHeader *pHeader, SRpcConn **ppConn, SRpcInfo *pRpc, int dataLen, uint32_t ip,
uint16_t port, void *chandle) {
int chann, sid, code = 0;
int sid, code = 0;
SRpcConn * pConn = NULL;
SRpcChann *pChann;
int msgLen;
char hashstr[40] = {0};
// int reSend = 0;
*ppConn = NULL;
uint32_t destId = htonl(pHeader->destId);
chann = destId >> pServer->bits;
sid = destId & pServer->mask;
uint32_t sid = htonl(pHeader->destId);
if (pHeader->msgType >= TSDB_MSG_TYPE_MAX || pHeader->msgType <= 0) {
tTrace("%s cid:%d sid:%d, invalid message type:%d", pServer->label, chann, sid, pHeader->msgType);
tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHeader->msgType);
return TSDB_CODE_INVALID_MSG_TYPE;
}
msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen);
if (dataLen != msgLen) {
tTrace("%s cid:%d sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pServer->label, chann, sid,
pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen);
if (dataLen != pHeader->msgLen) {
tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid,
taosMsg[pHeader->msgType], dataLen, msgLen);
return TSDB_CODE_INVALID_MSG_LEN;
}
if (chann < 0 || chann >= pServer->numOfChanns) {
tTrace("%s cid:%d sid:%d, chann is out of range, max:%d, %s discarded", pServer->label, chann, sid,
pServer->numOfChanns, taosMsg[pHeader->msgType]);
return TSDB_CODE_INVALID_SESSION_ID;
}
pChann = pServer->channList + chann;
if (pChann->sessions == 0) {
tTrace("%s cid:%d, chann is not activated yet, %s discarded", pServer->label, chann, taosMsg[pHeader->msgType]);
if (pServer->efp) (*(pServer->efp))(chann);
return TSDB_CODE_NOT_ACTIVE_SESSION;
}
if (sid < 0 || sid >= pChann->sessions) {
tTrace("%s cid:%d sid:%d, sid is out of range, max sid:%d, %s discarded", pServer->label, chann, sid,
pChann->sessions, taosMsg[pHeader->msgType]);
if (sid < 0 || sid >= pRpc->sessions) {
tTrace("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid,
pRpc->sessions, taosMsg[pHeader->msgType]);
return TSDB_CODE_INVALID_SESSION_ID;
}
// if ( pHeader->tcp ) return TSDB_CODE_ALREADY_PROCESSED;
if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHeader->uid, pHeader->sourceId);
pthread_mutex_lock(&pChann->mutex);
code = taosGetRpcConn(chann, sid, pHeader->meterId, pServer, &pConn, pHeader->msgType & 1, hashstr);
if (code != TSDB_CODE_SUCCESS) goto _exit;
code = rpcGetConn(sid, pHeader->meterId, pRpc, &pConn, rpcIsReq(pHeader->msgType), hashstr);
if (code != TSDB_CODE_SUCCESS) return code;
*ppConn = pConn;
sid = pConn->sid;
......@@ -873,175 +732,44 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer
}
if (pHeader->uid) pConn->peerUid = pHeader->uid;
if (port) pConn->peerPort = port;
if (pHeader->port) // port maybe changed by the peer
pConn->peerPort = pHeader->port;
if (chandle) pConn->chandle = chandle;
if (pHeader->tcp) {
tTrace("%s cid:%d sid:%d id:%s, content will be transfered via TCP pConn:%p", pServer->label, chann, sid,
pConn->meterId, pConn);
if (pConn->outType) taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer);
code = TSDB_CODE_ALREADY_PROCESSED;
goto _exit;
tTrace("%s pConn:%p, content will be transfered via TCP", pRpc->label, pConn);
if (pConn->outType) taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
return TSDB_CODE_ALREADY_PROCESSED;
}
if (pConn->spi != 0) {
if (pHeader->spi == pConn->spi) {
// authentication
STaosDigest *pDigest = (STaosDigest *)((char *)pHeader + dataLen - sizeof(STaosDigest));
int32_t delta;
delta = (int32_t)htonl(pDigest->timeStamp);
delta -= (int32_t)taosGetTimestampSec();
if (abs(delta) > 900) {
tWarn("%s cid:%d sid:%d id:%s, time diff:%d is too big, msg discarded pConn:%p, timestamp:%d", pServer->label,
chann, sid, pConn->meterId, delta, pConn, htonl(pDigest->timeStamp));
// the requirement of goldwind, should not return error in this case
code = TSDB_CODE_INVALID_TIME_STAMP;
goto _exit;
}
if (taosAuthenticateMsg((uint8_t *)pHeader, dataLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
char ipstr[24];
tinet_ntoa(ipstr, ip);
mLError("user:%s login from %s, authentication failed", pHeader->meterId, ipstr);
tError("%s cid:%d sid:%d id:%s, authentication failed, msg discarded pConn:%p", pServer->label, chann, sid,
pConn->meterId, pConn);
code = TSDB_CODE_AUTH_FAILURE;
goto _exit;
}
} else {
// if it is request or response with code 0, msg shall be discarded
if ((pHeader->msgType & 1) || (pHeader->content[0] == 0)) {
tTrace("%s cid:%d sid:%d id:%s, auth spi not matched, msg discarded pConn:%p", pServer->label, chann, sid,
pConn->meterId, pConn);
code = TSDB_CODE_AUTH_FAILURE;
goto _exit;
}
}
}
code = rpcCheckAuthentication(pConn, (char *)pHeader, dataLen);
if ( code != 0 ) return code;
if (pHeader->msgType != TSDB_MSG_TYPE_REG && pHeader->encrypt) {
// decrypt here
}
pHeader->destId = pConn->ownId; // destId maybe 0, it shall be changed
if (pHeader->msgType & 1) {
if (pConn->peerId == 0) {
pConn->peerId = pHeader->sourceId;
} else {
if (pConn->peerId != pHeader->sourceId) {
tTrace("%s cid:%d sid:%d id:%s, source Id is changed, old:0x%08x new:0x%08x pConn:%p", pServer->label, chann,
sid, pConn->meterId, pConn->peerId, pHeader->sourceId, pConn);
code = TSDB_CODE_INVALID_VALUE;
goto _exit;
}
}
if (pConn->inTranId == pHeader->tranId) {
if (pConn->inType == pHeader->msgType) {
tTrace("%s cid:%d sid:%d id:%s, %s is retransmitted, pConn:%p", pServer->label, chann, sid, pConn->meterId,
taosMsg[pHeader->msgType], pConn);
taosSendQuickRsp(pConn, (char)(pHeader->msgType + 1), TSDB_CODE_ACTION_IN_PROGRESS);
} else if (pConn->inType == 0) {
tTrace("%s cid:%d sid:%d id:%s, %s is already processed, tranId:%d pConn:%p", pServer->label, chann, sid,
pConn->meterId, taosMsg[pHeader->msgType], pConn->inTranId, pConn);
taosReSendRspToPeer(pConn);
if ( rpcIsReq(pHeader->msgType) ) {
code = rpcProcessReqHeader(pConn, pHeader);
} else {
tTrace("%s cid:%d sid:%d id:%s, mismatched message %s and tranId pConn:%p", pServer->label, chann, sid,
pConn->meterId, taosMsg[pHeader->msgType], pConn);
}
// do not reply any message
code = TSDB_CODE_ALREADY_PROCESSED;
goto _exit;
code = rpcProcessRspHeader(pConn, pHeader);
}
if (pConn->inType != 0) {
tTrace("%s cid:%d sid:%d id:%s, last session is not finished, inTranId:%d tranId:%d pConn:%p", pServer->label,
chann, sid, pConn->meterId, pConn->inTranId, pHeader->tranId, pConn);
code = TSDB_CODE_LAST_SESSION_NOT_FINISHED;
goto _exit;
}
pConn->inTranId = pHeader->tranId;
pConn->inType = pHeader->msgType;
if (sid == 0) // send a response first
taosSendQuickRsp(pConn, (char)(pConn->inType + 1), TSDB_CODE_ACTION_IN_PROGRESS);
} else {
// response from taos
pConn->peerId = pHeader->sourceId;
if (pConn->outType == 0) {
code = TSDB_CODE_UNEXPECTED_RESPONSE;
goto _exit;
}
if (pHeader->tranId != pConn->outTranId) {
code = TSDB_CODE_INVALID_TRAN_ID;
goto _exit;
}
if (pHeader->msgType != pConn->outType + 1) {
code = TSDB_CODE_INVALID_RESPONSE_TYPE;
goto _exit;
}
if (*pHeader->content == TSDB_CODE_NOT_READY) {
code = TSDB_CODE_ALREADY_PROCESSED;
goto _exit;
}
taosTmrStopA(&pConn->pTimer);
pConn->retry = 0;
if (*pHeader->content == TSDB_CODE_ACTION_IN_PROGRESS || pHeader->tcp) {
if (pConn->tretry <= tsRpcMaxRetry) {
tTrace("%s cid:%d sid:%d id:%s, peer is still processing the transaction, pConn:%p", pServer->label, chann, sid,
pHeader->meterId, pConn);
pConn->tretry++;
taosTmrReset(taosProcessTaosTimer, tsRpcProgressTime, pConn, pChann->tmrCtrl, &pConn->pTimer);
code = TSDB_CODE_ALREADY_PROCESSED;
goto _exit;
} else {
// peer still in processing, give up
*pHeader->content = TSDB_CODE_TOO_SLOW;
}
}
pConn->tretry = 0;
if (pConn->rspReceived) {
code = TSDB_CODE_UNEXPECTED_RESPONSE;
goto _exit;
} else {
pConn->rspReceived = 1;
}
}
_exit:
pthread_mutex_unlock(&pChann->mutex);
// if (reSend) taosReSendRspToPeer(pConn);
return code;
}
int taosBuildErrorMsgToPeer(char *pMsg, int code, char *pReply) {
STaosHeader *pRecvHeader, *pReplyHeader;
char * pContent;
void rpcSendErrorMsgToPeer(char *pMsg, int32 code, uint_32 ip, uint_16 port, void *chandle) {
SRpcHeader *pRecvHeader, *pReplyHeader;
char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(STaosRsp)];
STaosRsp *pRsp;
uint32_t timeStamp;
int msgLen;
pRecvHeader = (STaosHeader *)pMsg;
pReplyHeader = (STaosHeader *)pReply;
pRecvHeader = (SRpcHeader *)pMsg;
pReplyHeader = (SRpcHeader *)msg;
memset(msg, 0, sizeof(SRpcHeader));
pReplyHeader->version = pRecvHeader->version;
pReplyHeader->msgType = (char)(pRecvHeader->msgType + 1);
pReplyHeader->tcp = 0;
......@@ -1052,299 +780,328 @@ int taosBuildErrorMsgToPeer(char *pMsg, int code, char *pReply) {
pReplyHeader->destId = pRecvHeader->sourceId;
memcpy(pReplyHeader->meterId, pRecvHeader->meterId, tListLen(pReplyHeader->meterId));
pContent = (char *)pReplyHeader->content;
*pContent = (char)code;
pContent++;
pRsp = (STaosRsp *)pReplyHeader->content;
pRsp->code = htonl(code);
msgLen = sizeof(STaosRsp);
char *pContent = pRsp->more;
if (code == TSDB_CODE_INVALID_TIME_STAMP) {
// include a time stamp if client's time is not synchronized well
timeStamp = taosGetTimestampSec();
memcpy(pContent, &timeStamp, sizeof(timeStamp));
pContent += sizeof(timeStamp);
msgLen += sizeof(timeStamp);
}
msgLen = (int)(pContent - pReply);
pReplyHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
(*taosSendData[pRpc->type])(ip, port, pReply, msgLen, chandle);
return msgLen;
}
void taosReportDisconnection(SRpcChann *pChann, SRpcConn *pConn)
{
SSchedMsg schedMsg;
schedMsg.fp = taosProcessSchedMsg;
schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
taosScheduleTask(pChann->qhandle, &schedMsg);
return;
}
void taosProcessIdleTimer(void *param, void *tmrId) {
void rpcProcessIdleTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param;
if (pConn->signature != param) {
tError("idle timer pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param);
return;
}
STaosRpc * pServer = pConn->pServer;
SRpcChann *pChann = pServer->channList + pConn->chann;
SRpcInfo * pRpc = pConn->pRpc;
if (pConn->pIdleTimer != tmrId) {
tTrace("%s cid:%d sid:%d id:%s, idle timer:%p already processed pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, tmrId, pConn);
tTrace("%s pConn:%p, idle timer:%p already processed", pRpc->label, pConn, tmrId);
return;
}
int reportDisc = 0;
pthread_mutex_lock(&pChann->mutex);
tTrace("%s cid:%d sid:%d id:%s, close the connection since no activity pConn:%p", pServer->label, pConn->chann,
pConn->sid, pConn->meterId, pConn);
if (pConn->rspReceived == 0) {
pConn->rspReceived = 1;
reportDisc = 1;
}
pthread_mutex_unlock(&pChann->mutex);
if (reportDisc) taosReportDisconnection(pChann, pConn);
tTrace("%s pConn:%p, close the connection since no activity", pRpc->label, pConn);
rpcCloseConn(pConn);
}
void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle,
void *rpcProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle,
void *chandle) {
STaosHeader *pHeader;
SRpcHeader *pHeader = (SRpcHeader *)data;
uint8_t code;
SRpcConn * pConn = (SRpcConn *)thandle;
STaosRpc * pServer = (STaosRpc *)shandle;
SRpcConn *pConn = (SRpcConn *)thandle;
SRpcInfo *pRpc = (SRpcInfo *)shandle;
int msgLen;
char pReply[128];
SSchedMsg schedMsg;
int chann, sid;
SRpcChann * pChann = NULL;
tDump(data, dataLen);
if (ip == 0 && taosCloseConn[pServer->type]) {
// it means the connection is broken
if (pConn) {
pChann = pServer->channList + pConn->chann;
tTrace("%s cid:%d sid:%d id:%s, underlying link is gone pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pConn);
pConn->rspReceived = 1;
pConn->chandle = NULL;
taosReportDisconnection(pChann, pConn);
}
tfree(data);
if (ip == 0 && taosCloseConn[pRpc->type] && pConn) {
// it means the connection is broken, it only happens for TCP
tTrace("%s pConn:%p, underlying link is gone%p", pRpc->label, pConn);
pContext->terrno = TSDB_CODE_NETWORK_UNAVAIL;
taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl);
return NULL;
}
pHeader = (STaosHeader *)data;
msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen);
code = (uint8_t)taosProcessMsgHeader(pHeader, &pConn, pServer, dataLen, ip, port, chandle);
pthread_mutex_lock(&pRpc->mutex);
code = rpcProcessHeader(pHeader, &pConn, pRpc, dataLen, ip, port, chandle);
pthread_mutex_unlock(&pRpc->mutex);
pHeader->destId = htonl(pHeader->destId);
chann = pHeader->destId >> pServer->bits;
sid = pHeader->destId & pServer->mask;
if (pConn && pServer->idleTime) {
SRpcChann *pChann = pServer->channList + pConn->chann;
taosTmrReset(taosProcessIdleTimer, pServer->idleTime, pConn, pChann->tmrCtrl, &pConn->pIdleTimer);
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%u len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], ip, port, code,
dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
}
if (code == TSDB_CODE_ALREADY_PROCESSED) {
tTrace("%s cid:%d sid:%d id:%s, %s wont be processed, source:0x%08x dest:0x%08x tranId:%d pConn:%p", pServer->label,
chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->sourceId, htonl(pHeader->destId),
pHeader->tranId, pConn);
free(data);
return pConn;
if (pConn && pRpc->idleTime) {
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
}
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace(
"%s cid:%d sid:%d id:%s, %s received from 0x%x:%hu, parse code:%u, first:%u len:%d source:0x%08x dest:0x%08x "
"tranId:%d pConn:%p",
pServer->label, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], ip, port, code, pHeader->content[0],
dataLen, pHeader->sourceId, htonl(pHeader->destId), pHeader->tranId, pConn);
if (code != TSDB_CODE_ALREADY_PROCESSED) {
if (code != 0) { // parsing error
if ( rpcIsReq(pHeader->msgType) ) {
taosSendErrorMsgToPeer(data, code, ip, port, chandle);
tTrace("%s pConn:%p, %s is sent with error code:%u", pRpc->label, pConn, taosMsg[pHeader->msgType+1], code);
}
} else { // parsing OK
rpcProcessIncomingMsg(pConn, pHeader);
}
}
if (code != 0) {
// parsing error
if ( code != 0 ) free (data);
return pConn;
}
if (pHeader->msgType & 1U) {
memset(pReply, 0, sizeof(pReply));
void taosProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) {
SRpcInfo *pRpc = pConn->pRpc;
int msgLen = rpcContLenFromHeader(pHeader->msgLen);
msgLen = taosBuildErrorMsgToPeer(data, code, pReply);
(*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle);
tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid,
pHeader->meterId, taosMsg[pHeader->msgType + 1], code, pConn);
} else {
tTrace("%s cid:%d sid:%d id:%s, %s is received, parsing error:%u pConn:%p", pServer->label, chann, sid,
pHeader->meterId, taosMsg[pHeader->msgType], code, pConn);
}
pHeader = rpcDecompressRpcMsg(pHeader);
free(data);
if ( rpcIsReq(msgType) ) {
(*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pConn);
} else {
// parsing OK
// it's a response
STaosRsp *pRsp = (STaosRsp *)msg;
int32_t code = htonl(pRsp->code);
// internal communication is based on TAOS protocol, a trick here to make it efficient
if (pHeader->spi) msgLen -= sizeof(STaosDigest);
msgLen -= (int)sizeof(STaosHeader);
pHeader->msgLen = msgLen + (int)sizeof(SIntMsg);
SRpcReqContext *pContext = pConn->pContext;
pConn->pContext = NULL;
if ((pHeader->msgType & 1U) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) {
schedMsg.msg = NULL; // connection shall be closed
} else {
pHeader = taosDecompressRpcMsg(pHeader, &schedMsg, msgLen);
}
taosAddConnToIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId);
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16U)) {
tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", pServer->label, chann, sid,
pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, pConn->pTimer);
if (code == TSDB_CODE_NOT_MASTER) {
pContext->terrno = code;
taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl);
} else {
rpcFreeMsg(rpcGetMsgFromCont(pContext->cont)); // free the request msg
(*(pRpc->fp))(pHeader->msgType, pHeader->content, msgLen, pContext->ahandle);
}
pChann = pServer->channList + pConn->chann;
schedMsg.fp = taosProcessSchedMsg;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
taosScheduleTask(pChann->qhandle, &schedMsg);
}
return pConn;
}
int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) {
STaosHeader *pHeader;
SMsgNode * pMsgNode;
char * msg;
int msgLen = 0;
SRpcConn * pConn = (SRpcConn *)thandle;
STaosRpc * pServer;
SRpcChann * pChann;
uint8_t msgType;
if (pConn == NULL) return -1;
if (pConn->signature != pConn) return -1;
SRpcConn *rpcGetConnToServer(void *shandle, SRpcIpSet ipSet) {
SRpcInfo *pRpc = (SRpcInfo *)shandle;
pServer = pConn->pServer;
pChann = pServer->channList + pConn->chann;
pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
pHeader->destIp = pConn->peerIp;
msg = (char *)pHeader;
SRpcConn *pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[index], pRpc->peerPort, pRpc->meterId);
if ((pHeader->msgType & 1U) == 0 && pConn->localPort) pHeader->port = pConn->localPort;
if ( pConn == NULL ) {
SRpcConnInit connInit;
memset(&connInit, 0, sizeof(connInit));
connInit.sid = 0;
connInit.spi = pRpc->spi;
connInit.encrypt = pRpc->encrypt;
connInit.meterId = pRpc->user;
connInit.peerId = 0;
connInit.shandle = pRpc;
connInit.peerIp = ipstr;
connInit.peerPort = pRpc->peerPort;
pConn = rpcOpenConn(&connInit);
}
contLen = taosCompressRpcMsg(pCont, contLen);
return pConn;
}
msgLen = contLen + (int32_t)sizeof(STaosHeader);
int taosAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHeader *pHeader = (SRpcHeader *)msg;
if (pConn->spi) {
// add auth part
pHeader->spi = pConn->spi;
STaosDigest *pDigest = (STaosDigest *)(pCont + contLen);
SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
pDigest->timeStamp = htonl(taosGetTimestampSec());
msgLen += sizeof(STaosDigest);
msgLen += sizeof(SRpcDigest);
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
taosBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
rpcBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
} else {
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
pthread_mutex_lock(&pChann->mutex);
msgType = pHeader->msgType;
return msgLen;
}
if ((msgType & 1U) == 0) {
// response
pConn->inType = 0;
tfree(pConn->pRspMsg);
pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen;
int rpcSendDataToPeer(SRpcConn *pConn, char *data, int dataLen) {
int writtenLen = 0;
SRpcInfo *pRpc = pConn->pRpc;
SRpcHeader *pHeader = (SRpcHeader *)data;
int code = 0;
if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
dataLen = taosAddAuthPart(pConn, data, dataLen);
if ( rpcIsReq(pHeader->msgType)) {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr,
pConn->peerPort, dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
} else {
// request
pMsgNode = (SMsgNode *)(pCont - sizeof(STaosHeader) - sizeof(SMsgNode));
pMsgNode->msgLen = msgLen;
pMsgNode->next = NULL;
pMsgNode->ahandle = ahandle;
if (pConn->outType) {
if (pConn->pTail) {
pConn->pTail->next = pMsgNode;
pConn->pTail = pMsgNode;
} else {
pConn->pTail = pMsgNode;
pConn->pHead = pMsgNode;
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pConn->peerPort,
(uint8_t)pHeader->content[0], dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
}
tTrace("%s cid:%d sid:%d id:%s, msg:%s is put into queue pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, taosMsg[msgType], pConn);
msgLen = 0;
writtenLen = (*taosSendData[pRpc->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, dataLen, pConn->chandle);
} else {
assert(pConn->pMsgNode == NULL);
if (pConn->pMsgNode) {
tError("%s cid:%d sid:%d id:%s, bug, there shall be no pengding req pConn:%p", pServer->label, pConn->chann,
pConn->sid, pConn->meterId, pConn);
if (writtenLen != dataLen) {
tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn,
dataLen, writtenLen, strerror(errno));
code = -1;
}
tDump(data, dataLen);
return code;
}
void rpcSendReqToOneServer(SRpcConn *pConn, SRpcReqContext *pContext) {
char *pHeader = rpcHeaderFromCont(pContext->pCont);
SRpcHeader *msg = (char *)pHeader;
int msgLen = rpcGetMsgLen(pContext->contLen);
char msgType = pContext->msgType;
// set the message header
pHeader->version = 1;
pHeader->msgType = msgType;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pConn->tranId++;
if ( pConn->tranId == 0 ) pConn->tranId++;
pHeader->tranId = pConn->tranId;
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pHeader->port = 0;
pHeader->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
// set the connection parameters
pConn->outType = msgType;
pConn->outTranId = pHeader->tranId;
pConn->pMsgNode = pMsgNode;
pConn->rspReceived = 0;
if (pMsgNode->ahandle) pConn->ahandle = pMsgNode->ahandle;
}
}
pConn->pReqMsg = msg;
pConn->reqMsgLen = msgLen;
pConn->context = pContext;
if (msgLen) {
taosSendDataToPeer(pConn, (char *)pHeader, msgLen);
if (msgType & 1U) {
taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer);
}
if ( rpcSendDataToPeer(pConn, msg, msgLen) < 0 ) {
taosReportError(pConn->pContext, terrno);
} else {
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
}
}
pthread_mutex_unlock(&pChann->mutex);
void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, char *pCont, int contLen, void *ahandle) {
SRpcConn *pConn;
SRpcReqContext *pContext;
return contLen;
contLen = rpcCompressRpcMsg(pCont, contLen);
pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHeader)-sizeof(SRpcReqContext));
pContext->ahandle = ahandle;
pContext->pRpc = (SRpcInfo *)shandle;
pContext->ipSet = ipSet;
pContext->contLen = contLen
pContext->pCont = pCont;
pContext->type = type;
pConn = rpcGetConnToServer(shandle, ipSet);
pContext->terrno = terrno;
if (pConn == NULL) taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl);
rpcSendReqToOneServer(pConn, pContext);
return;
}
int taosReSendRspToPeer(SRpcConn *pConn) {
STaosHeader *pHeader;
int writtenLen;
STaosRpc * pServer = pConn->pServer;
void rpcSendResponse(SRpcConn *pConn, char *pCont, int contLen) {
int msgLen = 0;
SRpcConn *pConn;
SRpcHeader *pHeader = rpcHeaderFromCont(pCont);
char *msg = (char *)pHeader;
contLen = rpcCompressRpcMsg(pCont, contLen);
msgLen = rpcMsgLenFromCont(contLen);
pthread_mutex_lock(&pRpc->mutex);
if (pConn->pRspMsg == NULL || pConn->rspMsgLen <= 0) {
tError("%s cid:%d sid:%d id:%s, rsp is null", pServer->label, pConn->chann, pConn->sid, pConn->meterId);
return -1;
// set msg header
pHeader->version = 1;
pHeader->msgType = pConn->inType+1;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pHeader->tranId = pConn->inTranId;
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pHeader->uid = 0;
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
// set pConn parameters
pConn->inType = 0;
rpcFreeMsg(pConn->pRspMsg);
pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen;
if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
pthread_mutex_lock(&pRpc->mutex);
rpcSendDataToPeer(pConn, msg, msgLen);
return;
}
static void rpcResendRspToPeer(SRpcConn *pConn) {
if (pConn->pRspMsg == NULL || pConn->rspMsgLen <= 0 || pConn->rspMsgLen <= sizeof(SRpcHeader)) {
tError("%s pConn:%p, rsp is null", pRpc->label);
return;
}
pHeader = (STaosHeader *)pConn->pRspMsg;
if (pHeader->msgLen <= sizeof(SIntMsg) + 1 || pHeader->msgType <= 0) {
tError("%s cid:%d sid:%d id:%s, rsp is null, rspLen:%d, msgType:%d", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pHeader->msgLen, pHeader->msgType);
return -1;
SRpcHeader *pHeader = (SRpcHeader *)pConn->pRspMsg;
if (pHeader->msgType <= 0) {
tError("%s pConn:%p, msgType is messed up, rspLen:%d, msgType:%d", pRpc->label, pConn, pHeader->msgLen, pHeader->msgType);
return;
}
writtenLen =
(*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, pConn->pRspMsg, pConn->rspMsgLen, pConn->chandle);
rpcSendDataToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen);
}
static void rpcProcessConnError(void *param, void *id) {
SRpcReqContext *pContext = (SRpcContext *)param;
if (writtenLen != pConn->rspMsgLen) {
tError("%s cid:%d sid:%d id:%s, failed to re-send %s, reason:%s pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, taosMsg[(int)pHeader->msgType], strerror(errno), pConn);
if ( pContext->numOfRetry >= pContext->ipSet.numOfIps ) {
char *rsp = calloc(1, RPC_MSG_OVERHEAD + sizeof(STaosRsp));
if ( rsp ) {
STaosRsp *pRsp = (rsp+sizeof(SRpcHeader));
pRsp->code = pContext->terrno;
(*(pRpc->fp))(pContext->msgType+1, pRsp, sizeof(STaosRsp), pContext->ahandle);
} else {
tTrace("%s cid:%d sid:%d id:%s, msg:%s is re-sent to %s:%hu, len:%d pConn:%p", pServer->label, pConn->chann,
pConn->sid, pConn->meterId, taosMsg[(int)pHeader->msgType], pConn->peerIpstr, pConn->peerPort,
pConn->rspMsgLen, pConn);
tError("%s failed to malloc RSP", pRpc->label);
}
} else {
// move to next IP
pContext->ipSet.index++;
pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps;
return 0;
pConn = rpcGetConnToServer(pContext->pRpc, pContext->ipSet);
pContext->terrno = terrno;
if (pConn == NULL) taosTmrStart(taosProcessConnError, 0, pContext, pRpc->tmrCtrl);
taosSendReqToOneServer(pConn, pContext);
}
}
void taosProcessTaosTimer(void *param, void *tmrId) {
STaosHeader *pHeader = NULL;
SRpcConn * pConn = (SRpcConn *)param;
int msgLen;
static void rpcProcessRetryTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param;
int reportDisc = 0;
if (pConn->signature != param) {
......@@ -1352,124 +1109,43 @@ void taosProcessTaosTimer(void *param, void *tmrId) {
return;
}
STaosRpc * pServer = pConn->pServer;
SRpcChann *pChann = pServer->channList + pConn->chann;
SRpcInfo *pRpc = pConn->pRpc;
if (pConn->pTimer != tmrId) {
tTrace("%s cid:%d sid:%d id:%s, timer:%p already processed pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, tmrId, pConn);
tTrace("%s pConn:%p, timer:%p already processed%", pRpc->label, pConn);
return;
}
pthread_mutex_lock(&pChann->mutex);
pthread_mutex_lock(&pRpc->mutex);
if (pConn->rspReceived) {
tTrace("%s cid:%d sid:%d id:%s, rsp just received, pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pConn);
} else if (pConn->outType == 0) {
tTrace("%s cid:%d sid:%d id:%s, outtype is zero, pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pConn);
if (pConn->outType == 0) {
tTrace("%s pConn:%p, outtype is zero", pRpc->label, pConn);
} else {
tTrace("%s cid:%d sid:%d id:%s, expected %s is not received, pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, taosMsg[(int)pConn->outType + 1], pConn);
tTrace("%s pConn:%p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]);
pConn->pTimer = NULL;
pConn->retry++;
if (pConn->retry < 4) {
tTrace("%s cid:%d sid:%d id:%s, re-send msg:%s to %s:%hu pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn);
if (pConn->pMsgNode && pConn->pMsgNode->msgLen > 0) {
pHeader = (STaosHeader *)((char *)pConn->pMsgNode + sizeof(SMsgNode));
pHeader->destId = pConn->peerId;
msgLen = pConn->pMsgNode->msgLen;
if (pConn->spi) {
STaosDigest *pDigest = (STaosDigest *)(((char *)pHeader) + pConn->pMsgNode->msgLen - sizeof(STaosDigest));
pDigest->timeStamp = htonl(taosGetTimestampSec());
taosBuildAuthHeader((uint8_t *)pHeader, pConn->pMsgNode->msgLen - TSDB_AUTH_LEN, pDigest->auth,
pConn->secret);
}
tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort);
if (pConn->pReqMsg && pConn->pReqMsgLen > 0) {
rpcSendDataToPeer(pConn, pReqMsg, pReqMsgLen);
}
} else {
// close the connection
tTrace("%s cid:%d sid:%d id:%s, failed to send msg:%s to %s:%hu pConn:%p", pServer->label, pConn->chann,
pConn->sid, pConn->meterId, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn);
if (pConn->rspReceived == 0) {
pConn->rspReceived = 1;
tTrace("%s pConn:%p, failed to send msg:%s to %s:%hu", pRpc->label, pConn,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn);
reportDisc = 1;
}
}
}
if (pHeader) {
(*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, msgLen, pConn->chandle);
taosTmrReset(taosProcessTaosTimer, tsRpcTimer<<pConn->retry, pConn, pChann->tmrCtrl, &pConn->pTimer);
}
pthread_mutex_unlock(&pChann->mutex);
if (reportDisc) taosReportDisconnection(pChann, pConn);
}
void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, uint16_t *peerPort, int *cid, int *sid) {
SRpcConn *pConn = (SRpcConn *)thandle;
*peerId = pConn->peerId;
*peerIp = pConn->peerIp;
*peerPort = pConn->peerPort;
*cid = pConn->chann;
*sid = pConn->sid;
}
int taosGetOutType(void *thandle) {
SRpcConn *pConn = (SRpcConn *)thandle;
if (pConn == NULL) return -1;
return pConn->outType;
}
void taosProcessSchedMsg(SSchedMsg *pMsg) {
SIntMsg * pHeader = (SIntMsg *)pMsg->msg;
SRpcConn *pConn = (SRpcConn *)pMsg->thandle;
if (pConn == NULL || pConn->signature != pMsg->thandle || pConn->pServer == NULL) return;
STaosRpc *pRpc = pConn->pServer;
void *ahandle = (*(pRpc->fp))(pMsg->msg, pMsg->ahandle, pMsg->thandle);
if (ahandle == NULL || pMsg->msg == NULL) {
taosCloseRpcConn(pConn);
} else {
pConn->ahandle = ahandle;
if (pHeader && ((pHeader->msgType & 1) == 0)) taosProcessResponse(pConn);
}
if (pMsg->msg) free(pMsg->msg - sizeof(STaosHeader) + sizeof(SIntMsg));
}
void taosStopRpcConn(void *thandle) {
SRpcConn * pConn = (SRpcConn *)thandle;
STaosRpc * pServer = pConn->pServer;
SRpcChann *pChann = pServer->channList + pConn->chann;
tTrace("%s cid:%d sid:%d id:%s, stop the connection pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pConn);
int reportDisc = 0;
pthread_mutex_lock(&pChann->mutex);
if (pConn->outType) {
pConn->rspReceived = 1;
reportDisc = 1;
pthread_mutex_unlock(&pChann->mutex);
} else {
pthread_mutex_unlock(&pChann->mutex);
taosCloseRpcConn(pConn);
}
pthread_mutex_unlock(&pRpc->mutex);
if (reportDisc) taosReportDisconnection(pChann, pConn);
pConn->terrno = TSDB_CODE_NETWORK_UNAVAIL;
if (reportDisc) taosProcessConnError(pConn->pContext, NULL);
}
int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
MD5_CTX context;
int ret = -1;
......@@ -1484,7 +1160,7 @@ int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey
return ret;
}
int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
static int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
MD5_CTX context;
MD5Init(&context);
......
/*
* Copyright (c) 2020 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stddef.h>
#include <stdint.h>
#include <stdbool.h>
#include <setjmp.h>
#ifndef TDENGINE_TBUFFER_H
#define TDENGINE_TBUFFER_H
/*
SBuffer can be used to read or write a buffer, but cannot be used for both
read & write at a same time. Below is an example:
int main(int argc, char** argv) {
//--------------------- write ------------------------
SBuffer wbuf;
int32_t code = tbufBeginWrite(&wbuf);
if (code != 0) {
// handle errors
return 0;
}
// reserve 1024 bytes for the buffer to improve performance
tbufEnsureCapacity(&wbuf, 1024);
// write 5 integers to the buffer
for (int i = 0; i < 5; i++) {
tbufWriteInt32(&wbuf, i);
}
// write a string to the buffer
tbufWriteString(&wbuf, "this is a string.\n");
// acquire the result and close the write buffer
size_t size = tbufTell(&wbuf);
char* data = tbufGetData(&wbuf, true);
tbufClose(&wbuf, true);
//------------------------ read -----------------------
SBuffer rbuf;
code = tbufBeginRead(&rbuf, data, size);
if (code != 0) {
printf("you will see this message after print out 5 integers and a string.\n");
tbufClose(&rbuf, false);
return 0;
}
// read & print out 5 integers
for (int i = 0; i < 5; i++) {
printf("%d\n", tbufReadInt32(&rbuf));
}
// read & print out a string
printf(tbufReadString(&rbuf, NULL));
// try read another integer, this result in an error as there no this integer
tbufReadInt32(&rbuf);
printf("you should not see this message.\n");
tbufClose(&rbuf, false);
return 0;
}
*/
typedef struct {
jmp_buf jb;
char* data;
size_t pos;
size_t size;
} SBuffer;
// common functions can be used in both read & write
#define tbufThrowError(buf, code) longjmp((buf)->jb, (code))
size_t tbufTell(SBuffer* buf);
size_t tbufSeekTo(SBuffer* buf, size_t pos);
size_t tbufSkip(SBuffer* buf, size_t size);
void tbufClose(SBuffer* buf, bool keepData);
// basic read functions
#define tbufBeginRead(buf, data, len) (((buf)->data = (char*)data), ((buf)->pos = 0), ((buf)->size = ((data) == NULL) ? 0 : (len)), setjmp((buf)->jb))
char* tbufRead(SBuffer* buf, size_t size);
void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size);
const char* tbufReadString(SBuffer* buf, size_t* len);
size_t tbufReadToString(SBuffer* buf, char* dst, size_t size);
// basic write functions
#define tbufBeginWrite(buf) ((buf)->data = NULL, ((buf)->pos = 0), ((buf)->size = 0), setjmp((buf)->jb))
void tbufEnsureCapacity(SBuffer* buf, size_t size);
char* tbufGetData(SBuffer* buf, bool takeOver);
void tbufWrite(SBuffer* buf, const void* data, size_t size);
void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size);
void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len);
void tbufWriteString(SBuffer* buf, const char* str);
// read & write function for primitive types
#ifndef TBUFFER_DEFINE_FUNCTION
#define TBUFFER_DEFINE_FUNCTION(type, name) \
type tbufRead##name(SBuffer* buf); \
void tbufWrite##name(SBuffer* buf, type data); \
void tbufWrite##name##At(SBuffer* buf, size_t pos, type data);
#endif
TBUFFER_DEFINE_FUNCTION( bool, Bool )
TBUFFER_DEFINE_FUNCTION( char, Char )
TBUFFER_DEFINE_FUNCTION( int8_t, Int8 )
TBUFFER_DEFINE_FUNCTION( uint8_t, Unt8 )
TBUFFER_DEFINE_FUNCTION( int16_t, Int16 )
TBUFFER_DEFINE_FUNCTION( uint16_t, Uint16 )
TBUFFER_DEFINE_FUNCTION( int32_t, Int32 )
TBUFFER_DEFINE_FUNCTION( uint32_t, Uint32 )
TBUFFER_DEFINE_FUNCTION( int64_t, Int64 )
TBUFFER_DEFINE_FUNCTION( uint64_t, Uint64 )
TBUFFER_DEFINE_FUNCTION( float, Float )
TBUFFER_DEFINE_FUNCTION( double, Double )
#endif
\ No newline at end of file
/*
* Copyright (c) 2020 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <memory.h>
#include <assert.h>
#define TBUFFER_DEFINE_FUNCTION(type, name) \
type tbufRead##name(SBuffer* buf) { \
type ret; \
tbufReadToBuffer(buf, &ret, sizeof(type)); \
return ret; \
}\
void tbufWrite##name(SBuffer* buf, type data) {\
tbufWrite(buf, &data, sizeof(data));\
}\
void tbufWrite##name##At(SBuffer* buf, size_t pos, type data) {\
tbufWriteAt(buf, pos, &data, sizeof(data));\
}
#include "../inc/tbuffer.h"
////////////////////////////////////////////////////////////////////////////////
// common functions
size_t tbufTell(SBuffer* buf) {
return buf->pos;
}
size_t tbufSeekTo(SBuffer* buf, size_t pos) {
if (pos > buf->size) {
// TODO: update error code, other tbufThrowError need to be changed too
tbufThrowError(buf, 1);
}
size_t old = buf->pos;
buf->pos = pos;
return old;
}
size_t tbufSkip(SBuffer* buf, size_t size) {
return tbufSeekTo(buf, buf->pos + size);
}
void tbufClose(SBuffer* buf, bool keepData) {
if (!keepData) {
free(buf->data);
}
buf->data = NULL;
buf->pos = 0;
buf->size = 0;
}
////////////////////////////////////////////////////////////////////////////////
// read functions
char* tbufRead(SBuffer* buf, size_t size) {
char* ret = buf->data + buf->pos;
tbufSkip(buf, size);
return ret;
}
void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size) {
assert(dst != NULL);
// always using memcpy, leave optimization to compiler
memcpy(dst, tbufRead(buf, size), size);
}
const char* tbufReadString(SBuffer* buf, size_t* len) {
uint16_t l = tbufReadUint16(buf);
char* ret = buf->data + buf->pos;
tbufSkip(buf, l + 1);
ret[l] = 0; // ensure the string end with '\0'
if (len != NULL) {
*len = l;
}
return ret;
}
size_t tbufReadToString(SBuffer* buf, char* dst, size_t size) {
assert(dst != NULL);
size_t len;
const char* str = tbufReadString(buf, &len);
if (len >= size) {
len = size - 1;
}
memcpy(dst, str, len);
dst[len] = 0;
return len;
}
////////////////////////////////////////////////////////////////////////////////
// write functions
void tbufEnsureCapacity(SBuffer* buf, size_t size) {
size += buf->pos;
if (size > buf->size) {
size_t nsize = size + buf->size;
char* data = realloc(buf->data, nsize);
if (data == NULL) {
tbufThrowError(buf, 2);
}
buf->data = data;
buf->size = nsize;
}
}
char* tbufGetData(SBuffer* buf, bool takeOver) {
char* ret = buf->data;
if (takeOver) {
buf->pos = 0;
buf->size = 0;
buf->data = NULL;
}
return ret;
}
void tbufEndWrite(SBuffer* buf) {
free(buf->data);
buf->data = NULL;
buf->pos = 0;
buf->size = 0;
}
void tbufWrite(SBuffer* buf, const void* data, size_t size) {
assert(data != NULL);
tbufEnsureCapacity(buf, size);
memcpy(buf->data + buf->pos, data, size);
buf->pos += size;
}
void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size) {
assert(data != NULL);
// this function can only be called to fill the gap on previous writes,
// so 'pos + size <= buf->pos' must be true
assert(pos + size <= buf->pos);
memcpy(buf->data + pos, data, size);
}
void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len) {
// maximum string length is 65535, if longer string is required
// this function and the corresponding read function need to be
// revised.
assert(len <= 0xffff);
tbufWriteUint16(buf, (uint16_t)len);
tbufWrite(buf, str, len + 1);
}
void tbufWriteString(SBuffer* buf, const char* str) {
tbufWriteStringLen(buf, str, strlen(str));
}
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
cmake_minimum_required(VERSION 2.8)
ADD_SUBDIRECTORY(detail)
\ No newline at end of file
project(tsdb)
add_subdirectory(common)
add_subdirectory(tsdb)
\ No newline at end of file
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/src SOURCE_LIST)
list(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/src/vnodePeer.c)
message(STATUS "Common source file ${SOURCE_LIST}")
add_library(common ${SOURCE_LIST})
target_include_directories(common PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc")
......@@ -51,7 +51,7 @@ SISchema tdConvertSchemaToInline(SSchema *pSchema) {
char *pName = TD_ISCHEMA_COL_NAMES(pISchema);
for (int32_t i = 0; i < totalCols; i++) {
SColumn *pCol = TD_SCHEMA_COLUMN_AT(TD_ISCHEMA_SCHEMA(pISchema), i);
char * colName = TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i), i);
char * colName = TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i));
TD_COLUMN_NAME(pCol) = pName;
......
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/src SOURCE_LIST)
message(STATUS "tsdb source files: ${SOURCE_LIST}")
add_library(tsdb STATIC ${SOURCE_LIST})
target_link_libraries(tsdb common)
target_include_directories(tsdb PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc")
\ No newline at end of file
......@@ -8,7 +8,7 @@
#include <stdbool.h>
#include <stdint.h>
#include "cache.h"
// #include "cache.h"
#include "schema.h"
#define TSDB_VERSION_MAJOR 1
......@@ -18,6 +18,21 @@ typedef void tsdb_repo_t; // use void to hide implementation details from ou
typedef int32_t table_id_t; // table ID type in this repository
typedef int16_t tsdb_id_t; // TSDB repository ID
// Submit message
typedef struct {
int32_t numOfTables;
char data[];
} SSubmitMsg;
// Submit message for one table
typedef struct {
table_id_t tableId; // table ID to insert
int32_t sversion; // data schema version
int32_t numOfRows; // number of rows data
int64_t uid; // table UID to insert
char data[];
} SSubmitBlock;
// Retention policy.
typedef struct {
// TODO: Need a more fancy description
......@@ -54,7 +69,7 @@ typedef struct {
SDataShardPolicy dataShardPolicy;
SBlockRowsPolicy blockRowsPolicy;
SRetentionPolicy retentionPlicy; // retention configuration
SCachePool * cachePool; // the cache pool the repository to use
void * cachePool; // the cache pool the repository to use
} STSDBCfg;
// the TSDB repository info
......@@ -205,6 +220,9 @@ typedef struct STimeWindow {
int64_t ekey;
} STimeWindow;
typedef struct {
} SColumnFilterInfo;
// query condition to build vnode iterator
typedef struct STSDBQueryCond {
STimeWindow twindow;
......@@ -237,6 +255,10 @@ typedef struct STableIDList {
int32_t num;
} STableIDList;
typedef struct {
} SFields;
/**
* Get the data block iterator, starting from position according to the query condition
* @param pRepo the TSDB repository to query on
......
......@@ -3,7 +3,7 @@
#include <stdint.h>
#include "cache.h"
// #include "cache.h"
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16*1024*1024 /* 16M */
......@@ -13,11 +13,11 @@ typedef struct {
int32_t numOfRows // numOfRows
} STableCacheInfo;
typedef struct {
char *pData;
STableCacheInfo *pTableInfo;
SCacheBlock *prev;
SCacheBlock *next;
typedef struct _tsdb_cache_block {
char * pData;
STableCacheInfo * pTableInfo;
struct _tsdb_cache_block *prev;
struct _tsdb_cache_block *next;
} STSDBCacheBlock;
// Use a doublely linked list to implement this
......@@ -28,7 +28,6 @@ typedef struct STSDBCache {
void * current;
} SCacheHandle;
// ---- Operation on STSDBCacheBlock
#define TSDB_CACHE_BLOCK_DATA(pBlock) ((pBlock)->pData)
#define TSDB_CACHE_AVAIL_SPACE(pBlock) ((char *)((pBlock)->pTableInfo) - ((pBlock)->pData))
......
#if !defined(_TD_TSDB_FILE_H_)
#define _TD_TSDB_FILE_H_
#include "tstring.h"
#include <stdint.h>
// #include "tstring.h"
typedef int32_t file_id_t;
......@@ -24,7 +25,7 @@ typedef struct {
} SFileInfo;
typedef struct {
tstring_t fname;
char * fname;
SFileInfo fInfo;
} SFILE;
......
......@@ -4,7 +4,7 @@
#include <pthread.h>
#include "taosdef.h"
// #include "taosdef.h"
// Initially, there are 4 tables
#define TSDB_INIT_NUMBER_OF_SUPER_TABLE 4
......@@ -30,7 +30,7 @@ typedef struct STable {
// For TSDB_SUPER_TABLE, it is the schema including tags
// For TSDB_NTABLE, it is only the schema, not including tags
// For TSDB_STABLE, it is NULL
SVSchema *pSchema;
SSchema *pSchema;
// Tag value for this table
// For TSDB_SUPER_TABLE and TSDB_NTABLE, it is NULL
......@@ -75,7 +75,7 @@ typedef struct {
#define TSDB_TABLE_CACHE_DATA(pTable) ((pTable)->content.pData)
#define TSDB_SUPER_TABLE_INDEX(pTable) ((pTable)->content.pIndex)
SVSchema *tsdbGetTableSchema(STable *pTable);
SSchema *tsdbGetTableSchema(STable *pTable);
// ---- Operation on SMetaHandle
#define TSDB_NUM_OF_TABLES(pHandle) ((pHandle)->numOfTables)
......
......@@ -2,14 +2,15 @@
#include <stdint.h>
#include <stdlib.h>
#include "taosdef.h"
// #include "taosdef.h"
// #include "disk.h"
#include "tsdb.h"
#include "tsdbCache.h"
#include "tsdbMeta.h"
typedef struct STSDBRepo {
// TSDB configuration
STSDBcfg *pCfg;
STSDBCfg *pCfg;
// The meter meta handle of this TSDB repository
SMetaHandle *pMetaHandle;
......@@ -18,12 +19,12 @@ typedef struct STSDBRepo {
SCacheHandle *pCacheHandle;
// Disk tier handle for multi-tier storage
SDiskTier *pDiskTier;
void *pDiskTier;
// File Store
void *pFileStore;
pthread_mutext_t tsdbMutex;
pthread_mutex_t tsdbMutex;
} STSDBRepo;
......
#include "tsdbFile.h"
char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type){
char *suffix = tsdbFileSuffix[type];
// char *suffix = tsdbFileSuffix[type];
// TODO
}
\ No newline at end of file
#include <stdlib.h>
#include "taosdef.h"
// #include "taosdef.h"
#include "tsdb.h"
#include "tsdbMeta.h"
SMetaHandle *tsdbCreateMetaHandle(int32_t numOfTables) {
......@@ -11,7 +12,7 @@ SMetaHandle *tsdbCreateMetaHandle(int32_t numOfTables) {
pMetahandle->numOfTables = 0;
pMetahandle->numOfSuperTables = 0;
pMetahandle->pTables = calloc(sizeof(STable *) * numOfTables);
pMetahandle->pTables = calloc(sizeof(STable *), numOfTables);
if (pMetahandle->pTables == NULL) {
free(pMetahandle);
return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册