提交 f20b026a 编写于 作者: H hzcheng

Merge branch '2.0' of https://github.com/taosdata/TDengine into 2.0

...@@ -4,11 +4,11 @@ PROJECT(TDengine) ...@@ -4,11 +4,11 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY(os) ADD_SUBDIRECTORY(os)
ADD_SUBDIRECTORY(util) ADD_SUBDIRECTORY(util)
ADD_SUBDIRECTORY(rpc) ADD_SUBDIRECTORY(rpc)
ADD_SUBDIRECTORY(client) #ADD_SUBDIRECTORY(client)
ADD_SUBDIRECTORY(kit) #ADD_SUBDIRECTORY(kit)
ADD_SUBDIRECTORY(plugins) #ADD_SUBDIRECTORY(plugins)
ADD_SUBDIRECTORY(sdb) #ADD_SUBDIRECTORY(sdb)
ADD_SUBDIRECTORY(mnode) #ADD_SUBDIRECTORY(mnode)
ADD_SUBDIRECTORY(dnode) #ADD_SUBDIRECTORY(dnode)
ADD_SUBDIRECTORY(vnode) #ADD_SUBDIRECTORY(vnode)
ADD_SUBDIRECTORY(connector/jdbc) #ADD_SUBDIRECTORY(connector/jdbc)
...@@ -4,8 +4,10 @@ PROJECT(TDengine) ...@@ -4,8 +4,10 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(jni) INCLUDE_DIRECTORIES(jni)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
AUX_SOURCE_DIRECTORY(./src SRC) AUX_SOURCE_DIRECTORY(src SRC)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux)
......
...@@ -30,6 +30,7 @@ extern "C" { ...@@ -30,6 +30,7 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "tutil.h" #include "tutil.h"
#include "trpc.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \ #define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \
(res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows) (res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows)
...@@ -324,6 +325,7 @@ typedef struct _sql_obj { ...@@ -324,6 +325,7 @@ typedef struct _sql_obj {
int64_t stime; int64_t stime;
uint32_t queryId; uint32_t queryId;
void * thandle; void * thandle;
SRpcIpSet ipSet;
void * pStream; void * pStream;
void * pSubscription; void * pSubscription;
char * sqlstr; char * sqlstr;
...@@ -371,12 +373,6 @@ typedef struct _sstream { ...@@ -371,12 +373,6 @@ typedef struct _sstream {
struct _sstream *prev, *next; struct _sstream *prev, *next;
} SSqlStream; } SSqlStream;
typedef struct {
char numOfIps;
uint32_t ip[TSDB_MAX_MGMT_IPS];
char ipstr[TSDB_MAX_MGMT_IPS][TSDB_IPv4ADDR_LEN];
} SIpStrList;
// tscSql API // tscSql API
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
...@@ -461,7 +457,7 @@ extern void * tscQhandle; ...@@ -461,7 +457,7 @@ extern void * tscQhandle;
extern int tscKeepConn[]; extern int tscKeepConn[];
extern int tsInsertHeadSize; extern int tsInsertHeadSize;
extern int tscNumOfThreads; extern int tscNumOfThreads;
extern SIpStrList tscMgmtIpList; extern SRpcIpSet tscMgmtIpList;
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "tscSQLParser.h" #include "tscSQLParser.h"
#include "tutil.h" #include "tutil.h"
#include "tnote.h" #include "tnote.h"
#include "tsched.h"
static void tscProcessFetchRow(SSchedMsg *pMsg); static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
......
...@@ -279,7 +279,7 @@ void tscKillConnection(STscObj *pObj) { ...@@ -279,7 +279,7 @@ void tscKillConnection(STscObj *pObj) {
SSqlObj *pSql = pObj->sqlList; SSqlObj *pSql = pObj->sqlList;
while (pSql) { while (pSql) {
taosStopRpcConn(pSql->thandle); //taosStopRpcConn(pSql->thandle);
pSql = pSql->next; pSql = pSql->next;
} }
......
...@@ -31,10 +31,14 @@ ...@@ -31,10 +31,14 @@
#define TSC_MGMT_VNODE 999 #define TSC_MGMT_VNODE 999
SIpStrList tscMgmtIpList; SRpcIpSet tscMgmtIpList;
int tsMasterIndex = 0; int tsMasterIndex = 0;
int tsSlaveIndex = 1; int tsSlaveIndex = 1;
//temp
SRpcIpSet tscMgmtIpSet;
SRpcIpSet tscDnodeIpSet;
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
...@@ -53,7 +57,7 @@ void tscPrintMgmtIp() { ...@@ -53,7 +57,7 @@ void tscPrintMgmtIp() {
tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps); tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps);
} else { } else {
for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) { for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) {
tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipstr[i]); tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipStr[i]);
} }
} }
} }
...@@ -62,7 +66,7 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) { ...@@ -62,7 +66,7 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) {
tscMgmtIpList.numOfIps = pIpList->numOfIps; tscMgmtIpList.numOfIps = pIpList->numOfIps;
if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) { if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) {
for (int i = 0; i < pIpList->numOfIps; ++i) { for (int i = 0; i < pIpList->numOfIps; ++i) {
tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]); tinet_ntoa(tscMgmtIpList.ipStr[i], pIpList->ip[i]);
tscMgmtIpList.ip[i] = pIpList->ip[i]; tscMgmtIpList.ip[i] = pIpList->ip[i];
} }
tscTrace("cluster mgmt IP list:"); tscTrace("cluster mgmt IP list:");
...@@ -73,9 +77,9 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) { ...@@ -73,9 +77,9 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) {
void tscSetMgmtIpListFromEdge() { void tscSetMgmtIpListFromEdge() {
if (tscMgmtIpList.numOfIps != 2) { if (tscMgmtIpList.numOfIps != 2) {
tscMgmtIpList.numOfIps = 2; tscMgmtIpList.numOfIps = 2;
strcpy(tscMgmtIpList.ipstr[0], tsMasterIp); strcpy(tscMgmtIpList.ipStr[0], tsMasterIp);
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpList.ipstr[1], tsMasterIp); strcpy(tscMgmtIpList.ipStr[1], tsMasterIp);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
tscTrace("edge mgmt IP list:"); tscTrace("edge mgmt IP list:");
tscPrintMgmtIp(); tscPrintMgmtIp();
...@@ -168,7 +172,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -168,7 +172,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
if (tscShouldFreeHeatBeat(pObj->pHb)) { if (tscShouldFreeHeatBeat(pObj->pHb)) {
tscTrace("%p free HB object and release connection, pConn:%p", pObj, pObj->pHb->thandle); tscTrace("%p free HB object and release connection, pConn:%p", pObj, pObj->pHb->thandle);
taosCloseRpcConn(pObj->pHb->thandle); //taosCloseRpcConn(pObj->pHb->thandle);
tscFreeSqlObj(pObj->pHb); tscFreeSqlObj(pObj->pHb);
tscCloseTscObj(pObj); tscCloseTscObj(pObj);
...@@ -178,6 +182,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -178,6 +182,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
tscProcessSql(pObj->pHb); tscProcessSql(pObj->pHb);
} }
void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) { void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
STscObj *pTscObj = pSql->pTscObj; STscObj *pTscObj = pSql->pTscObj;
if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) { if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) {
...@@ -187,23 +192,24 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) { ...@@ -187,23 +192,24 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1; if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1;
void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user); void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user);
if (thandle == NULL) {
SRpcConnInit connInit; // if (thandle == NULL) {
memset(&connInit, 0, sizeof(connInit)); // SRpcConnInit connInit;
connInit.cid = 0; // memset(&connInit, 0, sizeof(connInit));
connInit.sid = 0; // connInit.cid = 0;
connInit.meterId = pSql->pTscObj->user; // connInit.sid = 0;
connInit.peerId = 0; // connInit.meterId = pSql->pTscObj->user;
connInit.shandle = pTscMgmtConn; // connInit.peerId = 0;
connInit.ahandle = pSql; // connInit.shandle = pTscMgmtConn;
connInit.peerPort = tsMgmtShellPort; // connInit.ahandle = pSql;
connInit.spi = 1; // connInit.peerPort = tsMgmtShellPort;
connInit.encrypt = 0; // connInit.spi = 1;
connInit.secret = pSql->pTscObj->pass; // connInit.encrypt = 0;
// connInit.secret = pSql->pTscObj->pass;
connInit.peerIp = tscMgmtIpList.ipstr[pSql->index]; //
thandle = taosOpenRpcConn(&connInit, pCode); // connInit.peerIp = tscMgmtIpList.ipstr[pSql->index];
} // thandle = taosOpenRpcConn(&connInit, pCode);
// }
pSql->thandle = thandle; pSql->thandle = thandle;
pSql->ip = tscMgmtIpList.ip[pSql->index]; pSql->ip = tscMgmtIpList.ip[pSql->index];
...@@ -267,23 +273,23 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { ...@@ -267,23 +273,23 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
void *thandle = void *thandle =
taosGetConnFromCache(tscConnCache, pVPeersDesc[pSql->index].ip, pVPeersDesc[pSql->index].vnode, pTscObj->user); taosGetConnFromCache(tscConnCache, pVPeersDesc[pSql->index].ip, pVPeersDesc[pSql->index].vnode, pTscObj->user);
if (thandle == NULL) { // if (thandle == NULL) {
SRpcConnInit connInit; // SRpcConnInit connInit;
tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip); // tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip);
memset(&connInit, 0, sizeof(connInit)); // memset(&connInit, 0, sizeof(connInit));
connInit.cid = vidIndex; // connInit.cid = vidIndex;
connInit.sid = 0; // connInit.sid = 0;
connInit.spi = 0; // connInit.spi = 0;
connInit.encrypt = 0; // connInit.encrypt = 0;
connInit.meterId = pSql->pTscObj->user; // connInit.meterId = pSql->pTscObj->user;
connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS)); // connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS));
connInit.shandle = pVnodeConn; // connInit.shandle = pVnodeConn;
connInit.ahandle = pSql; // connInit.ahandle = pSql;
connInit.peerIp = ipstr; // connInit.peerIp = ipstr;
connInit.peerPort = tsVnodeShellPort; // connInit.peerPort = tsVnodeShellPort;
thandle = taosOpenRpcConn(&connInit, pCode); // thandle = taosOpenRpcConn(&connInit, pCode);
vidIndex = (vidIndex + 1) % tscNumOfThreads; // vidIndex = (vidIndex + 1) % tscNumOfThreads;
} // }
pSql->thandle = thandle; pSql->thandle = thandle;
pSql->ip = pVPeersDesc[pSql->index].ip; pSql->ip = pVPeersDesc[pSql->index].ip;
...@@ -291,6 +297,8 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { ...@@ -291,6 +297,8 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
tscTrace("%p vnode:%d ip:%p index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode, tscTrace("%p vnode:%d ip:%p index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode,
pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle); pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle);
//TODO fetch from vpeerdesc
pSql->ipSet = tscMgmtIpSet;
break; break;
} }
...@@ -326,25 +334,29 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -326,25 +334,29 @@ int tscSendMsgToServer(SSqlObj *pSql) {
size_t totalLen = pSql->cmd.payloadLen + tsRpcHeadSize + sizeof(STaosDigest); size_t totalLen = pSql->cmd.payloadLen + tsRpcHeadSize + sizeof(STaosDigest);
// the memory will be released by taosProcessResponse, so no memory leak here // the memory will be released by taosProcessResponse, so no memory leak here
char *buf = malloc(totalLen); char *pStart = rpcMallocCont(pSql->cmd.payloadLen);
if (NULL == buf) { if (NULL == pStart) {
tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]); tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
memcpy(buf, pSql->cmd.payload, totalLen); memcpy(pStart, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, buf);
if (pStart) { if (pStart) {
/* /*
* this SQL object may be released by other thread due to the completion of this query even before the log * this SQL object may be released by other thread due to the completion of this query even before the log
* is dumped to log file. So the signature needs to be kept in a local variable. * is dumped to log file. So the signature needs to be kept in a local variable.
*/ */
uint64_t signature = (uint64_t)pSql->signature; uint64_t signature = (uint64_t)pSql->signature;
if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, buf); //if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart);
int ret;
if (pSql->cmd.command < TSDB_SQL_MGMT)
ret = rpcSendRequest(pTscMgmtConn, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql);
else
ret = rpcSendRequest(pVnodeConn, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql);
int ret = taosSendMsgToPeerH(pSql->thandle, pStart, pSql->cmd.payloadLen, pSql);
if (ret >= 0) { if (ret >= 0) {
code = 0; code = 0;
} }
......
...@@ -64,15 +64,15 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -64,15 +64,15 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
} }
if (ip && ip[0]) { if (ip && ip[0]) {
tscMgmtIpList.numOfIps = 4; tscMgmtIpList.numOfIps = 3;
strcpy(tscMgmtIpList.ipstr[0], ip); strcpy(tscMgmtIpList.ipStr[0], ip);
tscMgmtIpList.ip[0] = inet_addr(ip); tscMgmtIpList.ip[0] = inet_addr(ip);
strcpy(tscMgmtIpList.ipstr[1], ip); strcpy(tscMgmtIpList.ipStr[1], tsMasterIp);
tscMgmtIpList.ip[1] = inet_addr(ip); tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpList.ipstr[2], tsMasterIp); strcpy(tscMgmtIpList.ipStr[2], tsSecondIp);
tscMgmtIpList.ip[2] = inet_addr(tsMasterIp); tscMgmtIpList.ip[2] = inet_addr(tsSecondIp);
strcpy(tscMgmtIpList.ipstr[3], tsSecondIp); tscMgmtIpList.index = 0;
tscMgmtIpList.ip[3] = inet_addr(tsSecondIp); tscMgmtIpList.port = tsMgmtShellPort;
} }
pObj = (STscObj *)malloc(sizeof(STscObj)); pObj = (STscObj *)malloc(sizeof(STscObj));
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "tsched.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsclient.h" #include "tsclient.h"
......
...@@ -24,8 +24,9 @@ ...@@ -24,8 +24,9 @@
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "tsched.h"
#include "tsclient.h" #include "tsclient.h"
// global, not configurable // global, not configurable
void * pVnodeConn; void * pVnodeConn;
void * pVMeterConn; void * pVMeterConn;
...@@ -94,18 +95,17 @@ void taos_init_imp() { ...@@ -94,18 +95,17 @@ void taos_init_imp() {
if (tsTscEnableRecordSql != 0) { if (tsTscEnableRecordSql != 0) {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note");
} }
tscMgmtIpList.numOfIps = 2;
strcpy(tscMgmtIpList.ipstr[0], tsMasterIp);
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpList.ipstr[1], tsMasterIp); tscMgmtIpList.index = 0;
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); tscMgmtIpList.port = tsMgmtShellPort;
tscMgmtIpList.numOfIps = 1;
strcpy(tscMgmtIpList.ipStr[0], tsMasterIp);
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
if (tsSecondIp[0]) { if (tsSecondIp[0]) {
tscMgmtIpList.numOfIps = 3; tscMgmtIpList.numOfIps = 2;
strcpy(tscMgmtIpList.ipstr[2], tsSecondIp); strcpy(tscMgmtIpList.ipStr[1], tsSecondIp);
tscMgmtIpList.ip[2] = inet_addr(tsSecondIp); tscMgmtIpList.ip[1] = inet_addr(tsSecondIp);
} }
tscInitMsgs(); tscInitMsgs();
...@@ -132,42 +132,23 @@ void taos_init_imp() { ...@@ -132,42 +132,23 @@ void taos_init_imp() {
rpcInit.label = "TSC-vnode"; rpcInit.label = "TSC-vnode";
rpcInit.numOfThreads = tscNumOfThreads; rpcInit.numOfThreads = tscNumOfThreads;
rpcInit.fp = tscProcessMsgFromServer; rpcInit.fp = tscProcessMsgFromServer;
rpcInit.bits = 20; rpcInit.sessions = tsMaxVnodeConnections;
rpcInit.numOfChanns = tscNumOfThreads;
rpcInit.sessionsPerChann = tsMaxVnodeConnections / tscNumOfThreads;
rpcInit.idMgmt = TAOS_ID_FREE;
rpcInit.noFree = 0;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C(); rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C();
rpcInit.qhandle = tscQhandle; pVnodeConn = rpcOpen(&rpcInit);
pVnodeConn = taosOpenRpc(&rpcInit);
if (pVnodeConn == NULL) { if (pVnodeConn == NULL) {
tscError("failed to init connection to vnode"); tscError("failed to init connection to vnode");
return; return;
} }
for (int i = 0; i < tscNumOfThreads; ++i) {
int retVal = taosOpenRpcChann(pVnodeConn, i, rpcInit.sessionsPerChann);
if (0 != retVal) {
tError("TSC-vnode, failed to open rpc chann");
taosCloseRpc(pVnodeConn);
return;
}
}
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp; rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0; rpcInit.localPort = 0;
rpcInit.label = "TSC-mgmt"; rpcInit.label = "TSC-mgmt";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.fp = tscProcessMsgFromServer; rpcInit.fp = tscProcessMsgFromServer;
rpcInit.bits = 20; rpcInit.sessions = tsMaxMgmtConnections;
rpcInit.numOfChanns = 1;
rpcInit.sessionsPerChann = tsMaxMgmtConnections;
rpcInit.idMgmt = TAOS_ID_FREE;
rpcInit.noFree = 0;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C(); rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C();
rpcInit.qhandle = tscQhandle; pTscMgmtConn = rpcOpen(&rpcInit);
pTscMgmtConn = taosOpenRpc(&rpcInit);
if (pTscMgmtConn == NULL) { if (pTscMgmtConn == NULL) {
tscError("failed to init connection to mgmt"); tscError("failed to init connection to mgmt");
return; return;
...@@ -183,7 +164,7 @@ void taos_init_imp() { ...@@ -183,7 +164,7 @@ void taos_init_imp() {
if (tscCacheHandle == NULL) tscCacheHandle = taosInitDataCache(tsMaxMeterConnections / 2, tscTmr, refreshTime); if (tscCacheHandle == NULL) tscCacheHandle = taosInitDataCache(tsMaxMeterConnections / 2, tscTmr, refreshTime);
tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, taosCloseRpcConn, tscTmr, tsShellActivityTimer * 1000); tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000);
initialized = 1; initialized = 1;
tscTrace("client is initialized successfully"); tscTrace("client is initialized successfully");
......
...@@ -26,7 +26,7 @@ extern "C" { ...@@ -26,7 +26,7 @@ extern "C" {
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
#define TAOS_DEFINE_ERROR(name, mod, code, msg) {.val = (0x80000000 | ((mod)<<16) | (code)), .str=(msg)}, #define TAOS_DEFINE_ERROR(name, mod, code, msg) {.val = (0x80000000 | ((mod)<<16) | (code)), .str=(msg)},
#else #else
#define TAOS_DEFINE_ERROR(name, mod, code, msg) const int32_t name = (0x80000000 | ((mod)<<16) | (code)); #define TAOS_DEFINE_ERROR(name, mod, code, msg) static const int32_t name = (0x80000000 | ((mod)<<16) | (code));
#endif #endif
#define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code)) #define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code))
......
...@@ -688,6 +688,7 @@ typedef struct { ...@@ -688,6 +688,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t dnode; //the ID of dnode int32_t dnode; //the ID of dnode
int32_t vnode; //the index of vnode int32_t vnode; //the index of vnode
uint32_t ip;
} SVPeerDesc; } SVPeerDesc;
typedef struct { typedef struct {
......
...@@ -21,6 +21,7 @@ extern "C" { ...@@ -21,6 +21,7 @@ extern "C" {
#include <stdbool.h> #include <stdbool.h>
#include <stdint.h> #include <stdint.h>
#include "taosdef.h"
#define TAOS_CONN_UDPS 0 #define TAOS_CONN_UDPS 0
#define TAOS_CONN_UDPC 1 #define TAOS_CONN_UDPC 1
...@@ -37,38 +38,46 @@ extern "C" { ...@@ -37,38 +38,46 @@ extern "C" {
extern int tsRpcHeadSize; extern int tsRpcHeadSize;
typedef struct {
char *localIp; // local IP used
uint16_t localPort; // local port
char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections
void *(*fp)(int8_t type, void *pCont, int32_t contLen, void *handle, int32_t index); // function to process the incoming msg
int sessions; // number of sessions allowed
int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled
char *meterId; // meter ID
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
char *ckey; // ciphering key
int (*afp) (char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); // call back to retrieve auth info
} SRpcInit;
typedef struct { typedef struct {
int16_t index; int16_t index;
int16_t numOfIps; int16_t numOfIps;
uint16_t port; uint16_t port;
uint32_t ip[TSDB_MAX_MPEERS]; uint32_t ip[TSDB_MAX_MPEERS];
char ipStr[TSDB_MAX_MPEERS][TSDB_IPv4ADDR_LEN];
} SRpcIpSet; } SRpcIpSet;
typedef struct {
char *localIp; // local IP used
uint16_t localPort; // local port
char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections
int sessions; // number of sessions allowed
int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled
// the following is for client security only
char *meterId; // meter ID
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
char *ckey; // ciphering key
// call back to process incoming msg
void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code);
// call back to process notify the ipSet changes
void (*ufp)(void *ahandle, SRpcIpSet ipSet);
// call back to retrieve the client auth info
int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey);
} SRpcInit;
void *rpcOpen(SRpcInit *pRpc); void *rpcOpen(SRpcInit *pRpc);
void rpcClose(void *); void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle); void rpcSendRequest(void *thandle, SRpcIpSet ipSet, char msgType, void *pCont, int contLen, void *ahandle);
void rpcSendResponse(void *pConn, void *pCont, int contLen); void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen);
void rpcSendSimpleRsp(void *pConn, int32_t code); void rpcSendRedirectRsp(void *pConn, SRpcIpSet ipSet);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -23,3 +23,6 @@ ENDIF () ...@@ -23,3 +23,6 @@ ENDIF ()
ADD_LIBRARY(trpc ${SRC}) ADD_LIBRARY(trpc ${SRC})
TARGET_LINK_LIBRARIES(trpc tutil) TARGET_LINK_LIBRARIES(trpc tutil)
ADD_SUBDIRECTORY(test)
...@@ -16,10 +16,18 @@ ...@@ -16,10 +16,18 @@
#ifndef _rpc_hash_ip_header_ #ifndef _rpc_hash_ip_header_
#define _rpc_hash_ip_header_ #define _rpc_hash_ip_header_
#ifdef __cplusplus
extern "C" {
#endif
void *taosOpenIpHash(int maxSessions); void *taosOpenIpHash(int maxSessions);
void taosCloseIpHash(void *handle); void taosCloseIpHash(void *handle);
void *taosAddIpHash(void *handle, void *pData, uint32_t ip, uint16_t port); void *taosAddIpHash(void *handle, void *pData, uint32_t ip, uint16_t port);
void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port); void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port);
void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port); void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port);
#ifdef __cplusplus
}
#endif
#endif #endif
...@@ -16,6 +16,10 @@ ...@@ -16,6 +16,10 @@
#ifndef _taos_tcp_client_header_ #ifndef _taos_tcp_client_header_
#define _taos_tcp_client_header_ #define _taos_tcp_client_header_
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h" #include "taosdef.h"
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle); void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle);
...@@ -24,4 +28,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16 ...@@ -24,4 +28,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16
void taosCloseTcpClientConnection(void *chandle); void taosCloseTcpClientConnection(void *chandle);
int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle); int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle);
#ifdef __cplusplus
}
#endif
#endif #endif
...@@ -16,6 +16,10 @@ ...@@ -16,6 +16,10 @@
#ifndef _taos_tcp_server_header_ #ifndef _taos_tcp_server_header_
#define _taos_tcp_server_header_ #define _taos_tcp_server_header_
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h" #include "taosdef.h"
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
...@@ -23,4 +27,8 @@ void taosCleanUpTcpServer(void *param); ...@@ -23,4 +27,8 @@ void taosCleanUpTcpServer(void *param);
void taosCloseTcpServerConnection(void *param); void taosCloseTcpServerConnection(void *param);
int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle); int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle);
#ifdef __cplusplus
}
#endif
#endif #endif
...@@ -16,6 +16,10 @@ ...@@ -16,6 +16,10 @@
#ifndef _taos_udp_header_ #ifndef _taos_udp_header_
#define _taos_udp_header_ #define _taos_udp_header_
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h" #include "taosdef.h"
void *taosInitUdpServer(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); void *taosInitUdpServer(char *ip, uint16_t port, char *label, int, void *fp, void *shandle);
...@@ -30,4 +34,8 @@ void taosSendMsgHdr(void *hdr, int fd); ...@@ -30,4 +34,8 @@ void taosSendMsgHdr(void *hdr, int fd);
void taosInitMsgHdr(void **hdr, void *dest, int maxPkts); void taosInitMsgHdr(void **hdr, void *dest, int maxPkts);
void taosSetMsgHdrData(void *hdr, char *data, int dataLen); void taosSetMsgHdrData(void *hdr, char *data, int dataLen);
#ifdef __cplusplus
}
#endif
#endif #endif
此差异已折叠。
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(../inc)
AUX_SOURCE_DIRECTORY(./ TEST_SRC)
ADD_EXECUTABLE(rpcTest ${TEST_SRC})
TARGET_LINK_LIBRARIES(rpcTest trpc)
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include "os.h"
#include "tlog.h"
#include "trpc.h"
#include <stdint.h>
int32_t main(int32_t argc, char *argv[]) {
dPrint("unit test for rpc module");
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 7000;
rpcInit.label = "unittest";
rpcInit.numOfThreads = 1;
rpcInit.fp = NULL;
rpcInit.sessions = 1000;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S();
rpcInit.idleTime = 2000;
void *pConn = rpcOpen(&rpcInit);
if (pConn != NULL) {
dPrint("conection is opened");
} else {
dError("failed to initialize rpc");
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册