提交 7f53f2d1 编写于 作者: H Haojun Liao

[td-11818] merge 3.0.

...@@ -68,6 +68,14 @@ typedef uint16_t tmsg_t; ...@@ -68,6 +68,14 @@ typedef uint16_t tmsg_t;
#define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_EXT 6
#define TSDB_IE_TYPE_DNODE_STATE 7 #define TSDB_IE_TYPE_DNODE_STATE 7
typedef enum {
HEARTBEAT_TYPE_MQ = 0,
HEARTBEAT_TYPE_QUERY = 1,
// types can be added here
//
HEARTBEAT_TYPE_MAX
} EHbType;
typedef enum _mgmt_table { typedef enum _mgmt_table {
TSDB_MGMT_TABLE_START, TSDB_MGMT_TABLE_START,
TSDB_MGMT_TABLE_ACCT, TSDB_MGMT_TABLE_ACCT,
...@@ -147,7 +155,7 @@ typedef struct { ...@@ -147,7 +155,7 @@ typedef struct {
typedef struct { typedef struct {
SClientHbKey connKey; SClientHbKey connKey;
SHashObj* info; // hash<Slv.key, Sklv> SHashObj* info; // hash<Skv.key, Skv>
} SClientHbReq; } SClientHbReq;
typedef struct { typedef struct {
...@@ -173,7 +181,10 @@ static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { ...@@ -173,7 +181,10 @@ static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) {
} }
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq); void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq);
int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp);
void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp);
static FORCE_INLINE void tFreeClientHbReq(void *pReq) { static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
SClientHbReq* req = (SClientHbReq*)pReq; SClientHbReq* req = (SClientHbReq*)pReq;
...@@ -182,14 +193,17 @@ static FORCE_INLINE void tFreeClientHbReq(void *pReq) { ...@@ -182,14 +193,17 @@ static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
} }
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq);
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) { static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) {
SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; SClientHbBatchReq *req = (SClientHbBatchReq*)pReq;
taosArrayDestroyEx(req->reqs, tFreeClientHbReq); //taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
free(pReq); free(pReq);
} }
int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp);
void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp);
static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKv->keyLen); tlen += taosEncodeFixedI32(buf, pKv->keyLen);
...@@ -220,6 +234,7 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) ...@@ -220,6 +234,7 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey)
return buf; return buf;
} }
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
char* dbName; char* dbName;
...@@ -359,6 +374,31 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { ...@@ -359,6 +374,31 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
return buf; return buf;
} }
typedef struct SMqHbRsp {
int8_t status; //idle or not
int8_t vnodeChanged;
int8_t epChanged; // should use new epset
int8_t reserved;
SEpSet epSet;
} SMqHbRsp;
static FORCE_INLINE int taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) {
int tlen = 0;
tlen += taosEncodeFixedI8(buf, pRsp->status);
tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged);
tlen += taosEncodeFixedI8(buf, pRsp->epChanged);
tlen += taosEncodeSEpSet(buf, &pRsp->epSet);
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
buf = taosDecodeFixedI8(buf, &pRsp->status);
buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged);
buf = taosDecodeFixedI8(buf, &pRsp->epChanged);
buf = taosDecodeSEpSet(buf, &pRsp->epSet);
return buf;
}
typedef struct { typedef struct {
int32_t acctId; int32_t acctId;
int64_t clusterId; int64_t clusterId;
...@@ -993,6 +1033,13 @@ typedef struct { ...@@ -993,6 +1033,13 @@ typedef struct {
uint64_t taskId; uint64_t taskId;
} SSinkDataReq; } SSinkDataReq;
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} SQueryContinueReq;
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
......
...@@ -129,7 +129,7 @@ enum { ...@@ -129,7 +129,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "mnode-kill-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "mnode-kill-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "mnode-kill-conn", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "mnode-kill-conn", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "mnode-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "mnode-heartbeat", SClientHbBatchReq, SClientHbBatchRsp)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL)
...@@ -170,6 +170,8 @@ enum { ...@@ -170,6 +170,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp) TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp)
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp) TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SCHEDULE_DATA_SINK, "vnode-schedule-data-sink", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
......
...@@ -22,9 +22,18 @@ extern "C" { ...@@ -22,9 +22,18 @@ extern "C" {
#include "trpc.h" #include "trpc.h"
enum {
NODE_TYPE_VNODE = 1,
NODE_TYPE_QNODE,
NODE_TYPE_SNODE,
};
typedef struct SQWorkerCfg { typedef struct SQWorkerCfg {
uint32_t maxSchedulerNum; uint32_t maxSchedulerNum;
uint32_t maxResCacheNum; uint32_t maxTaskNum;
uint32_t maxSchTaskNum; uint32_t maxSchTaskNum;
} SQWorkerCfg; } SQWorkerCfg;
...@@ -39,11 +48,17 @@ typedef struct { ...@@ -39,11 +48,17 @@ typedef struct {
uint64_t numOfErrors; uint64_t numOfErrors;
} SQWorkerStat; } SQWorkerStat;
typedef int32_t (*putReqToQueryQFp)(void *, struct SRpcMsg *);
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt);
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tarray.h"
#include "thash.h"
#include "tmsg.h"
#define HEARTBEAT_INTERVAL 1500 // ms
typedef enum {
HEARTBEAT_TYPE_MQ = 0,
// types can be added here
//
HEARTBEAT_TYPE_MAX
} EHbType;
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
typedef struct SAppHbMgr {
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
int64_t reportBytes; // not implemented
int64_t startTime;
// ctl
SRWLatch lock; // lock is used in serialization
// connection
void* transporter;
SEpSet epSet;
// info
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
} SAppHbMgr;
typedef struct SClientHbMgr {
int8_t inited;
// ctl
int8_t threadStop;
pthread_t thread;
pthread_mutex_t lock; // used when app init and cleanup
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;
// TODO: embed param into function
// return type: SArray<Skv>
typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param);
// global, called by mgmt
int hbMgrInit();
void hbMgrCleanUp();
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet);
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
// mq
void hbMgrInitMqHbRspHandle();
...@@ -31,6 +31,41 @@ extern "C" { ...@@ -31,6 +31,41 @@ extern "C" {
#include "trpc.h" #include "trpc.h"
#include "query.h" #include "query.h"
#define HEARTBEAT_INTERVAL 1500 // ms
typedef struct SAppInstInfo SAppInstInfo;
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
typedef struct SAppHbMgr {
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
int64_t reportBytes; // not implemented
int64_t startTime;
// ctl
SRWLatch lock; // lock is used in serialization
// connection
SAppInstInfo* pAppInstInfo;
// info
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
} SAppHbMgr;
typedef struct SClientHbMgr {
int8_t inited;
// ctl
int8_t threadStop;
pthread_t thread;
pthread_mutex_t lock; // used when app init and cleanup
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;
// TODO: embed param into function
// return type: SArray<Skv>
typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param);
typedef struct SQueryExecMetric { typedef struct SQueryExecMetric {
int64_t start; // start timestamp int64_t start; // start timestamp
int64_t parsed; // start to parse int64_t parsed; // start to parse
...@@ -55,15 +90,15 @@ typedef struct SHeartBeatInfo { ...@@ -55,15 +90,15 @@ typedef struct SHeartBeatInfo {
void *pTimer; // timer, used to send request msg to mnode void *pTimer; // timer, used to send request msg to mnode
} SHeartBeatInfo; } SHeartBeatInfo;
typedef struct SAppInstInfo { struct SAppInstInfo {
int64_t numOfConns; int64_t numOfConns;
SCorEpSet mgmtEp; SCorEpSet mgmtEp;
SInstanceSummary summary; SInstanceSummary summary;
SList *pConnList; // STscObj linked list SList *pConnList; // STscObj linked list
int64_t clusterId; int64_t clusterId;
void *pTransporter; void *pTransporter;
SHeartBeatInfo hb; struct SAppHbMgr *pAppHbMgr;
} SAppInstInfo; };
typedef struct SAppInfo { typedef struct SAppInfo {
int64_t startTime; int64_t startTime;
...@@ -81,12 +116,17 @@ typedef struct STscObj { ...@@ -81,12 +116,17 @@ typedef struct STscObj {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int32_t acctId; int32_t acctId;
uint32_t connId; uint32_t connId;
int32_t connType;
uint64_t id; // ref ID returned by taosAddRef uint64_t id; // ref ID returned by taosAddRef
pthread_mutex_t mutex; // used to protect the operation on db pthread_mutex_t mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo *pAppInfo; SAppInstInfo *pAppInfo;
} STscObj; } STscObj;
typedef struct SMqConsumer {
STscObj* pTscObj;
} SMqConsumer;
typedef struct SReqResultInfo { typedef struct SReqResultInfo {
const char *pRspMsg; const char *pRspMsg;
const char *pData; const char *pData;
...@@ -169,6 +209,26 @@ void *doFetchRow(SRequestObj* pRequest); ...@@ -169,6 +209,26 @@ void *doFetchRow(SRequestObj* pRequest);
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
// --- heartbeat
// global, called by mgmt
int hbMgrInit();
void hbMgrCleanUp();
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo);
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
// --- mq
void hbMgrInitMqHbRspHandle();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "clientHb.h" #include "clientInt.h"
#include "trpc.h" #include "trpc.h"
static SClientHbMgr clientHbMgr = {0}; static SClientHbMgr clientHbMgr = {0};
...@@ -21,10 +21,18 @@ static SClientHbMgr clientHbMgr = {0}; ...@@ -21,10 +21,18 @@ static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread(); static int32_t hbCreateThread();
static void hbStopThread(); static void hbStopThread();
static int32_t hbMqHbRspHandle(SClientHbRsp* pReq) { static int32_t hbMqHbRspHandle(SClientHbRsp* pRsp) {
return 0; return 0;
} }
static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) {
if (code != 0) {
return -1;
}
SClientHbRsp* pRsp = (SClientHbRsp*) pMsg->pData;
return hbMqHbRspHandle(pRsp);
}
void hbMgrInitMqHbRspHandle() { void hbMgrInitMqHbRspHandle() {
clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
} }
...@@ -35,18 +43,18 @@ static FORCE_INLINE void hbMgrInitHandle() { ...@@ -35,18 +43,18 @@ static FORCE_INLINE void hbMgrInitHandle() {
} }
SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq* pReq = malloc(sizeof(SClientHbBatchReq)); SClientHbBatchReq* pBatchReq = malloc(sizeof(SClientHbBatchReq));
if (pReq == NULL) { if (pBatchReq == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL; return NULL;
} }
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
pReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
while (pIter != NULL) { while (pIter != NULL) {
taosArrayPush(pReq->reqs, pIter);
SClientHbReq* pOneReq = pIter; SClientHbReq* pOneReq = pIter;
taosArrayPush(pBatchReq->reqs, pOneReq);
taosHashClear(pOneReq->info); taosHashClear(pOneReq->info);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
...@@ -59,10 +67,10 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -59,10 +67,10 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
taosHashCopyKey(pIter, &connKey); taosHashCopyKey(pIter, &connKey);
getConnInfoFp(connKey, NULL); getConnInfoFp(connKey, NULL);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter);
} }
return pReq; return pBatchReq;
} }
static void* hbThreadFunc(void* param) { static void* hbThreadFunc(void* param) {
...@@ -75,20 +83,48 @@ static void* hbThreadFunc(void* param) { ...@@ -75,20 +83,48 @@ static void* hbThreadFunc(void* param) {
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for(int i = 0; i < sz; i++) { for(int i = 0; i < sz; i++) {
SAppHbMgr* pAppHbMgr = taosArrayGet(clientHbMgr.appHbMgrs, i); SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr);
void* reqStr = NULL;
int tlen = tSerializeSClientHbBatchReq(&reqStr, pReq);
SMsgSendInfo info;
/*info.fp = hbHandleRsp;*/
int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
if (connCnt == 0) {
continue;
}
SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr);
if (pReq == NULL) {
continue;
}
int tlen = tSerializeSClientHbBatchReq(NULL, pReq);
void *buf = malloc(tlen);
if (buf == NULL) {
//TODO: error handling
break;
}
void *bufCopy = buf;
tSerializeSClientHbBatchReq(&bufCopy, pReq);
SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq);
free(buf);
break;
}
pInfo->fp = hbMqAsyncCallBack;
pInfo->msgInfo.pData = buf;
pInfo->msgInfo.len = tlen;
pInfo->msgType = TDMT_MND_HEARTBEAT;
pInfo->param = NULL;
pInfo->requestId = generateRequestId();
pInfo->requestObjRefId = 0;
SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(pAppHbMgr->transporter, &pAppHbMgr->epSet, &transporterId, &info); SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
tFreeClientHbBatchReq(pReq); tFreeClientHbBatchReq(pReq);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
taosMsleep(HEARTBEAT_INTERVAL);
} }
taosMsleep(HEARTBEAT_INTERVAL);
} }
return NULL; return NULL;
} }
...@@ -110,7 +146,8 @@ static void hbStopThread() { ...@@ -110,7 +146,8 @@ static void hbStopThread() {
atomic_store_8(&clientHbMgr.threadStop, 1); atomic_store_8(&clientHbMgr.threadStop, 1);
} }
SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet) { SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) {
hbMgrInit();
SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) { if (pAppHbMgr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -119,16 +156,27 @@ SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet) { ...@@ -119,16 +156,27 @@ SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet) {
// init stat // init stat
pAppHbMgr->startTime = taosGetTimestampMs(); pAppHbMgr->startTime = taosGetTimestampMs();
// init connection info // init app info
pAppHbMgr->transporter = transporter; pAppHbMgr->pAppInstInfo = pAppInstInfo;
pAppHbMgr->epSet = epSet;
// init hash info // init hash info
pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
if (pAppHbMgr->activeInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
free(pAppHbMgr);
return NULL;
}
pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq; pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq;
// init getInfoFunc // init getInfoFunc
pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
if (pAppHbMgr->getInfoFuncs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
free(pAppHbMgr);
return NULL;
}
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
return pAppHbMgr; return pAppHbMgr;
} }
...@@ -138,7 +186,7 @@ void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) { ...@@ -138,7 +186,7 @@ void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) {
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SAppHbMgr* pTarget = taosArrayGet(clientHbMgr.appHbMgrs, i); SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == pTarget) { if (pAppHbMgr == pTarget) {
taosHashCleanup(pTarget->activeInfo); taosHashCleanup(pTarget->activeInfo);
taosHashCleanup(pTarget->getInfoFuncs); taosHashCleanup(pTarget->getInfoFuncs);
...@@ -171,7 +219,6 @@ void hbMgrCleanUp() { ...@@ -171,7 +219,6 @@ void hbMgrCleanUp() {
if (old == 0) return; if (old == 0) return;
taosArrayDestroy(clientHbMgr.appHbMgrs); taosArrayDestroy(clientHbMgr.appHbMgrs);
} }
int hbHandleRsp(SClientHbBatchRsp* hbRsp) { int hbHandleRsp(SClientHbBatchRsp* hbRsp) {
......
...@@ -117,6 +117,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -117,6 +117,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet; p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
p->pAppHbMgr = appHbMgrInit(p);
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
pInst = &p; pInst = &p;
...@@ -259,6 +260,101 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { ...@@ -259,6 +260,101 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob); return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob);
} }
typedef struct tmq_t tmq_t;
typedef struct SMqClientTopic {
// subscribe info
int32_t sqlLen;
char* sql;
char* topicName;
int64_t topicId;
// statistics
int64_t consumeCnt;
// offset
int64_t committedOffset;
int64_t currentOffset;
//connection info
int32_t vgId;
SEpSet epSet;
} SMqClientTopic;
typedef struct tmq_resp_err_t {
int32_t code;
} tmq_resp_err_t;
typedef struct tmq_topic_vgroup_list_t {
char* topicName;
int32_t vgId;
int64_t committedOffset;
} tmq_topic_vgroup_list_t;
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
typedef struct tmq_conf_t{
char* clientId;
char* groupId;
char* ip;
uint16_t port;
tmq_commit_cb* commit_cb;
} tmq_conf_t;
struct tmq_t {
char groupId[256];
char clientId[256];
STscObj* pTscObj;
tmq_commit_cb* commit_cb;
SArray* clientTopics; // SArray<SMqClientTopic>
};
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
conf->commit_cb = cb;
}
SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
tmq_t* pTmq = (void*)param;
SArray* pArray = taosArrayInit(0, sizeof(SKv));
if (pArray == NULL) {
return NULL;
}
SKv kv = {0};
kv.key = malloc(256);
if (kv.key == NULL) {
taosArrayDestroy(pArray);
return NULL;
}
strcpy(kv.key, "groupId");
kv.keyLen = strlen("groupId") + 1;
kv.value = malloc(256);
if (kv.value == NULL) {
free(kv.key);
taosArrayDestroy(pArray);
return NULL;
}
strcpy(kv.value, pTmq->groupId);
kv.valueLen = strlen(pTmq->groupId) + 1;
taosArrayPush(pArray, &kv);
strcpy(kv.key, "clientUid");
kv.keyLen = strlen("clientUid") + 1;
*(uint32_t*)kv.value = pTmq->pTscObj->connId;
kv.valueLen = sizeof(uint32_t);
return NULL;
}
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
tmq_t* pTmq = malloc(sizeof(tmq_t));
if (pTmq == NULL) {
return NULL;
}
strcpy(pTmq->groupId, conf->groupId);
strcpy(pTmq->clientId, conf->clientId);
pTmq->pTscObj = (STscObj*)conn;
pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ;
return pTmq;
}
TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
STscObj *pTscObj = (STscObj*)taos; STscObj *pTscObj = (STscObj*)taos;
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
...@@ -351,6 +447,25 @@ _return: ...@@ -351,6 +447,25 @@ _return:
return pRequest; return pRequest;
} }
typedef struct tmq_message_t {
int32_t numOfRows;
char* topicName;
TAOS_ROW row[];
} tmq_message_t;
tmq_message_t* tmq_consume_poll(tmq_t* mq, int64_t blocking_time) {
return NULL;
}
tmq_resp_err_t* tmq_commit(tmq_t* mq, void* callback, int32_t async) {
return NULL;
}
void tmq_message_destroy(tmq_message_t* mq_message) {
}
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
STscObj *pTscObj = (STscObj *)taos; STscObj *pTscObj = (STscObj *)taos;
if (sqlLen > (size_t) tsMaxSQLStringLen) { if (sqlLen > (size_t) tsMaxSQLStringLen) {
...@@ -708,4 +823,4 @@ void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* p ...@@ -708,4 +823,4 @@ void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* p
pResultInfo->completed = (pRsp->completed == 1); pResultInfo->completed = (pRsp->completed == 1);
setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows); setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
} }
\ No newline at end of file
...@@ -71,6 +71,9 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -71,6 +71,9 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->pAppInfo->clusterId = pConnect->clusterId; pTscObj->pAppInfo->clusterId = pConnect->clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);
// pRequest->body.resInfo.pRspMsg = pMsg->pData; // pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
pTscObj->pAppInfo->numOfConns); pTscObj->pAppInfo->numOfConns);
...@@ -379,4 +382,4 @@ void initMsgHandleFp() { ...@@ -379,4 +382,4 @@ void initMsgHandleFp() {
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp; handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp; handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp;
} }
\ No newline at end of file
...@@ -48,13 +48,14 @@ int main(int argc, char** argv) { ...@@ -48,13 +48,14 @@ int main(int argc, char** argv) {
TEST(testCase, driverInit_Test) { taos_init(); } TEST(testCase, driverInit_Test) { taos_init(); }
//TEST(testCase, connect_Test) { TEST(testCase, connect_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// if (pConn == NULL) { if (pConn == NULL) {
// printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
// } }
// taos_close(pConn); sleep(3);
//} taos_close(pConn);
}
//TEST(testCase, create_user_Test) { //TEST(testCase, create_user_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...@@ -148,36 +149,27 @@ TEST(testCase, driverInit_Test) { taos_init(); } ...@@ -148,36 +149,27 @@ TEST(testCase, driverInit_Test) { taos_init(); }
//} //}
// //
//TEST(testCase, create_db_Test) { //TEST(testCase, create_db_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); //assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); //TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2");
// if (taos_errno(pRes) != 0) { //if (taos_errno(pRes) != 0) {
// printf("failed to drop database, reason:%s\n", taos_errstr(pRes)); //printf("error in create db, reason:%s\n", taos_errstr(pRes));
// } //}
//
// taos_free_result(pRes); //TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// //ASSERT_TRUE(pFields == NULL);
// pRes = taos_query(pConn, "create database abc1 vgroups 2");
// if (taos_errno(pRes) != 0) { //int32_t numOfFields = taos_num_fields(pRes);
// printf("error in create db, reason:%s\n", taos_errstr(pRes)); //ASSERT_EQ(numOfFields, 0);
// }
// //taos_free_result(pRes);
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL); //pRes = taos_query(pConn, "create database abc1 vgroups 4");
// //if (taos_errno(pRes) != 0) {
// int32_t numOfFields = taos_num_fields(pRes); //printf("error in create db, reason:%s\n", taos_errstr(pRes));
// ASSERT_EQ(numOfFields, 0); //}
// //taos_close(pConn);
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create database if not exists abc1 vgroups 4");
// if (taos_errno(pRes) != 0) {
// printf("failed to create database abc1, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//} //}
// //
//TEST(testCase, create_dnode_Test) { //TEST(testCase, create_dnode_Test) {
......
...@@ -89,7 +89,7 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { ...@@ -89,7 +89,7 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); tlen += taosEncodeSClientHbKey(buf, &pReq->connKey);
int kvNum = taosHashGetSize(pReq->info); int32_t kvNum = taosHashGetSize(pReq->info);
tlen += taosEncodeFixedI32(buf, kvNum); tlen += taosEncodeFixedI32(buf, kvNum);
SKv kv; SKv kv;
void* pIter = taosHashIterate(pReq->info, pIter); void* pIter = taosHashIterate(pReq->info, pIter);
...@@ -104,14 +104,15 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { ...@@ -104,14 +104,15 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
return tlen; return tlen;
} }
void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) { void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) {
ASSERT(pReq->info != NULL);
buf = taosDecodeSClientHbKey(buf, &pReq->connKey); buf = taosDecodeSClientHbKey(buf, &pReq->connKey);
// TODO: error handling // TODO: error handling
int kvNum; int32_t kvNum;
taosDecodeFixedI32(buf, &kvNum); buf = taosDecodeFixedI32(buf, &kvNum);
pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pReq->info == NULL) {
pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
for(int i = 0; i < kvNum; i++) { for(int i = 0; i < kvNum; i++) {
SKv kv; SKv kv;
buf = taosDecodeSKv(buf, &kv); buf = taosDecodeSKv(buf, &kv);
...@@ -121,12 +122,69 @@ void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) { ...@@ -121,12 +122,69 @@ void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) {
return buf; return buf;
} }
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq) { int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp) {
int tlen = 0;
tlen += taosEncodeSClientHbKey(buf, &pRsp->connKey);
tlen += taosEncodeFixedI32(buf, pRsp->status);
tlen += taosEncodeFixedI32(buf, pRsp->bodyLen);
tlen += taosEncodeBinary(buf, pRsp->body, pRsp->bodyLen);
return tlen;
}
void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp) {
buf = taosDecodeSClientHbKey(buf, &pRsp->connKey);
buf = taosDecodeFixedI32(buf, &pRsp->status);
buf = taosDecodeFixedI32(buf, &pRsp->bodyLen);
buf = taosDecodeBinary(buf, &pRsp->body, pRsp->bodyLen);
return buf;
}
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pBatchReq) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedI64(buf, pBatchReq->reqId);
int32_t reqNum = taosArrayGetSize(pBatchReq->reqs);
tlen += taosEncodeFixedI32(buf, reqNum);
for (int i = 0; i < reqNum; i++) {
SClientHbReq* pReq = taosArrayGet(pBatchReq->reqs, i);
tlen += tSerializeSClientHbReq(buf, pReq);
}
return tlen; return tlen;
} }
void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq) { void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pBatchReq) {
buf = taosDecodeFixedI64(buf, &pBatchReq->reqId);
if (pBatchReq->reqs == NULL) {
pBatchReq->reqs = taosArrayInit(0, sizeof(SClientHbReq));
}
int32_t reqNum;
buf = taosDecodeFixedI32(buf, &reqNum);
for (int i = 0; i < reqNum; i++) {
SClientHbReq req = {0};
buf = tDeserializeSClientHbReq(buf, &req);
taosArrayPush(pBatchReq->reqs, &req);
}
return buf;
}
int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp) {
int tlen = 0;
int32_t sz = taosArrayGetSize(pBatchRsp->rsps);
tlen += taosEncodeFixedI32(buf, sz);
for (int i = 0; i < sz; i++) {
SClientHbRsp* pRsp = taosArrayGet(pBatchRsp->rsps, i);
tlen += tSerializeSClientHbRsp(buf, pRsp);
}
return tlen;
}
void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp) {
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pBatchRsp->rsps = taosArrayInit(sz, sizeof(SClientHbRsp));
for (int i = 0; i < sz; i++) {
SClientHbRsp rsp = {0};
buf = tDeserializeSClientHbRsp(buf, &rsp);
taosArrayPush(pBatchRsp->rsps, &rsp);
}
return buf; return buf;
} }
......
...@@ -258,6 +258,39 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { ...@@ -258,6 +258,39 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
} }
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
char *batchReqStr = pReq->rpcMsg.pCont;
SClientHbBatchReq batchReq = {0};
tDeserializeSClientHbBatchReq(batchReqStr, &batchReq);
SArray *pArray = batchReq.reqs;
int sz = taosArrayGetSize(pArray);
SClientHbBatchRsp batchRsp = {0};
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
for (int i = 0; i < sz; i++) {
SClientHbReq* pHbReq = taosArrayGet(pArray, i);
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
} else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
SClientHbRsp rsp = {
.status = 0,
.connKey = pHbReq->connKey,
.bodyLen = 0,
.body = NULL
};
taosArrayPush(batchRsp.rsps, &rsp);
}
}
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
void* buf = rpcMallocCont(tlen);
void* bufCopy = buf;
tSerializeSClientHbBatchRsp(&bufCopy, &batchRsp);
pReq->contLen = tlen;
pReq->pCont = buf;
return 0;
#if 0
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
...@@ -327,6 +360,7 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { ...@@ -327,6 +360,7 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
pReq->contLen = sizeof(SConnectRsp); pReq->contLen = sizeof(SConnectRsp);
pReq->pCont = pRsp; pReq->pCont = pRsp;
return 0; return 0;
#endif
} }
static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) { static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) {
......
...@@ -96,6 +96,38 @@ TEST_F(MndTestProfile, 03_ConnectMsg_Show) { ...@@ -96,6 +96,38 @@ TEST_F(MndTestProfile, 03_ConnectMsg_Show) {
} }
TEST_F(MndTestProfile, 04_HeartBeatMsg) { TEST_F(MndTestProfile, 04_HeartBeatMsg) {
SClientHbBatchReq batchReq;
batchReq.reqs = taosArrayInit(0, sizeof(SClientHbReq));
SClientHbReq req = {0};
req.connKey = {.connId = 123, .hbType = HEARTBEAT_TYPE_MQ};
req.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
SKv kv;
kv.key = (void*)"abc";
kv.keyLen = 4;
kv.value = (void*)"bcd";
kv.valueLen = 4;
taosHashPut(req.info, kv.key, kv.keyLen, kv.value, kv.valueLen);
taosArrayPush(batchReq.reqs, &req);
int32_t tlen = tSerializeSClientHbBatchReq(NULL, &batchReq);
void* buf = (SClientHbBatchReq*)rpcMallocCont(tlen);
void* bufCopy = buf;
tSerializeSClientHbBatchReq(&bufCopy, &batchReq);
SRpcMsg* pMsg = test.SendReq(TDMT_MND_HEARTBEAT, buf, tlen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
char* pRspChar = (char*)pMsg->pCont;
SClientHbBatchRsp rsp = {0};
tDeserializeSClientHbBatchRsp(pRspChar, &rsp);
int sz = taosArrayGetSize(rsp.rsps);
ASSERT_EQ(sz, 1);
SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0);
EXPECT_EQ(pRsp->connKey.connId, 123);
EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ);
EXPECT_EQ(pRsp->status, 0);
#if 0
int32_t contLen = sizeof(SHeartBeatReq); int32_t contLen = sizeof(SHeartBeatReq);
SHeartBeatReq* pReq = (SHeartBeatReq*)rpcMallocCont(contLen); SHeartBeatReq* pReq = (SHeartBeatReq*)rpcMallocCont(contLen);
...@@ -129,9 +161,12 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) { ...@@ -129,9 +161,12 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) {
EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(pRsp->epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.port[0], 9031); EXPECT_EQ(pRsp->epSet.port[0], 9031);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
#endif
} }
TEST_F(MndTestProfile, 05_KillConnMsg) { TEST_F(MndTestProfile, 05_KillConnMsg) {
// temporary remove since kill will use new heartbeat msg
#if 0
{ {
int32_t contLen = sizeof(SKillConnReq); int32_t contLen = sizeof(SKillConnReq);
...@@ -190,6 +225,7 @@ TEST_F(MndTestProfile, 05_KillConnMsg) { ...@@ -190,6 +225,7 @@ TEST_F(MndTestProfile, 05_KillConnMsg) {
connId = pRsp->connId; connId = pRsp->connId;
} }
#endif
} }
TEST_F(MndTestProfile, 06_KillConnMsg_InvalidConn) { TEST_F(MndTestProfile, 06_KillConnMsg_InvalidConn) {
...@@ -204,6 +240,8 @@ TEST_F(MndTestProfile, 06_KillConnMsg_InvalidConn) { ...@@ -204,6 +240,8 @@ TEST_F(MndTestProfile, 06_KillConnMsg_InvalidConn) {
} }
TEST_F(MndTestProfile, 07_KillQueryMsg) { TEST_F(MndTestProfile, 07_KillQueryMsg) {
// temporary remove since kill will use new heartbeat msg
#if 0
{ {
int32_t contLen = sizeof(SKillQueryReq); int32_t contLen = sizeof(SKillQueryReq);
...@@ -252,6 +290,7 @@ TEST_F(MndTestProfile, 07_KillQueryMsg) { ...@@ -252,6 +290,7 @@ TEST_F(MndTestProfile, 07_KillQueryMsg) {
EXPECT_EQ(pRsp->epSet.port[0], 9031); EXPECT_EQ(pRsp->epSet.port[0], 9031);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
} }
#endif
} }
TEST_F(MndTestProfile, 08_KillQueryMsg_InvalidConn) { TEST_F(MndTestProfile, 08_KillQueryMsg_InvalidConn) {
......
...@@ -22,6 +22,7 @@ extern "C" { ...@@ -22,6 +22,7 @@ extern "C" {
#include "qworker.h" #include "qworker.h"
#include "vnode.h" #include "vnode.h"
typedef struct SQWorkerMgmt SQHandle; typedef struct SQWorkerMgmt SQHandle;
int vnodeQueryOpen(SVnode *pVnode); int vnodeQueryOpen(SVnode *pVnode);
......
...@@ -19,11 +19,22 @@ ...@@ -19,11 +19,22 @@
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); } int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, &pVnode->pQuery, pVnode, vnodePutReqToVQueryQ); }
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vTrace("query message is processed"); vTrace("query message is processing");
return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessQueryContinueMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
case TDMT_VND_SCHEDULE_DATA_SINK:
return qWorkerProcessDataSinkMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
} }
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
......
...@@ -182,6 +182,12 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE ...@@ -182,6 +182,12 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (NULL == pDispatcher->nextOutput.pData) {
assert(pDispatcher->queryEnd);
pOutput->useconds = pDispatcher->useconds;
pOutput->precision = pDispatcher->schema.precision;
return TSDB_CODE_SUCCESS;
}
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows; pOutput->numOfRows = pEntry->numOfRows;
......
...@@ -178,8 +178,10 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { ...@@ -178,8 +178,10 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = 0; int64_t st = 0;
*handle = pTaskInfo->dsHandle; if (handle) {
*handle = pTaskInfo->dsHandle;
}
while(1) { while(1) {
st = taosGetTimestampUs(); st = taosGetTimestampUs();
SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
......
...@@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, ...@@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
} }
if (pLogicPlan->info.type != QNODE_MODIFY) { if (pLogicPlan->info.type != QNODE_MODIFY) {
// char* str = NULL; char* str = NULL;
// queryPlanToString(pLogicPlan, &str); queryPlanToString(pLogicPlan, &str);
// printf("%s\n", str); printf("%s\n", str);
} }
code = optimizeQueryPlan(pLogicPlan); code = optimizeQueryPlan(pLogicPlan);
......
...@@ -23,7 +23,7 @@ extern "C" { ...@@ -23,7 +23,7 @@ extern "C" {
#include "tlockfree.h" #include "tlockfree.h"
#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000 #define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000
#define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000 #define QWORKER_DEFAULT_TASK_NUMBER 10000
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000 #define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
enum { enum {
...@@ -57,7 +57,6 @@ enum { ...@@ -57,7 +57,6 @@ enum {
QW_ADD_ACQUIRE, QW_ADD_ACQUIRE,
}; };
typedef struct SQWTaskStatus { typedef struct SQWTaskStatus {
SRWLatch lock; SRWLatch lock;
int32_t code; int32_t code;
...@@ -67,12 +66,15 @@ typedef struct SQWTaskStatus { ...@@ -67,12 +66,15 @@ typedef struct SQWTaskStatus {
bool drop; bool drop;
} SQWTaskStatus; } SQWTaskStatus;
typedef struct SQWorkerTaskHandlesCache { typedef struct SQWTaskCtx {
SRWLatch lock; SRWLatch lock;
int8_t sinkScheduled;
int8_t queryScheduled;
bool needRsp; bool needRsp;
qTaskInfo_t taskHandle; qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle; DataSinkHandle sinkHandle;
} SQWorkerTaskHandlesCache; } SQWTaskCtx;
typedef struct SQWSchStatus { typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second int32_t lastAccessTs; // timestamp in second
...@@ -82,11 +84,15 @@ typedef struct SQWSchStatus { ...@@ -82,11 +84,15 @@ typedef struct SQWSchStatus {
// Qnode/Vnode level task management // Qnode/Vnode level task management
typedef struct SQWorkerMgmt { typedef struct SQWorkerMgmt {
SQWorkerCfg cfg; SQWorkerCfg cfg;
SRWLatch schLock; int8_t nodeType;
SRWLatch resLock; int32_t nodeId;
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus SRWLatch schLock;
SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache SRWLatch ctxLock;
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx
void *nodeObj;
putReqToQueryQFp putToQueueFp;
} SQWorkerMgmt; } SQWorkerMgmt;
#define QW_GOT_RES_DATA(data) (true) #define QW_GOT_RES_DATA(data) (true)
...@@ -94,41 +100,69 @@ typedef struct SQWorkerMgmt { ...@@ -94,41 +100,69 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code)) #define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code)) #define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY_RESP(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED) #define QW_TASK_READY(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qid, tid) do { *(uint64_t *)(id) = (qid); *(uint64_t *)((char *)(id) + sizeof(qid)) = (tid); } while (0) #define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0)
#define QW_GET_QTID(id, qid, tid) do { (qid) = *(uint64_t *)(id); (tid) = *(uint64_t *)((char *)(id) + sizeof(qid)); } while (0) #define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0)
#define QW_IDS() sId, qId, tId
#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define QW_LOCK(type, _lock) do { \ #define QW_LOCK(type, _lock) do { \
if (QW_READ == (type)) { \ if (QW_READ == (type)) { \
if ((*(_lock)) < 0) assert(0); \ assert(atomic_load_32((_lock)) >= 0); \
taosRLockLatch(_lock); \ qDebug("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("QW RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ taosRLockLatch(_lock); \
qDebug("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \ } else { \
if ((*(_lock)) < 0) assert(0); \ assert(atomic_load_32((_lock)) >= 0); \
qDebug("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \ taosWLockLatch(_lock); \
qDebug("QW WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ qDebug("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \ } \
} while (0) } while (0)
#define QW_UNLOCK(type, _lock) do { \ #define QW_UNLOCK(type, _lock) do { \
if (QW_READ == (type)) { \ if (QW_READ == (type)) { \
if ((*(_lock)) <= 0) assert(0); \ assert(atomic_load_32((_lock)) > 0); \
qDebug("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \ taosRUnLockLatch(_lock); \
qDebug("QW RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ qDebug("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \ } else { \
if ((*(_lock)) <= 0) assert(0); \ assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
qDebug("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \ taosWUnLockLatch(_lock); \
qDebug("QW WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ qDebug("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \ } \
} while (0) } while (0)
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt);
int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch);
int32_t qwAcquireAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch);
int32_t qwAcquireTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task);
#ifdef __cplusplus #ifdef __cplusplus
......
此差异已折叠。
...@@ -42,6 +42,11 @@ int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { ...@@ -42,6 +42,11 @@ int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
return 0; return 0;
} }
int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) {
return 0;
}
void qwtRpcSendResponse(const SRpcMsg *pRsp) { void qwtRpcSendResponse(const SRpcMsg *pRsp) {
if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) { if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) {
SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont; SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont;
...@@ -258,7 +263,7 @@ TEST(seqTest, normalCase) { ...@@ -258,7 +263,7 @@ TEST(seqTest, normalCase) {
stubSetStringToPlan(); stubSetStringToPlan();
stubSetRpcSendResponse(); stubSetRpcSendResponse();
code = qWorkerInit(NULL, &mgmt); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1); statusMsg.sId = htobe64(1);
...@@ -328,7 +333,7 @@ TEST(seqTest, cancelFirst) { ...@@ -328,7 +333,7 @@ TEST(seqTest, cancelFirst) {
stubSetStringToPlan(); stubSetStringToPlan();
stubSetRpcSendResponse(); stubSetRpcSendResponse();
code = qWorkerInit(NULL, &mgmt); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1); statusMsg.sId = htobe64(1);
...@@ -402,7 +407,7 @@ TEST(seqTest, randCase) { ...@@ -402,7 +407,7 @@ TEST(seqTest, randCase) {
srand(time(NULL)); srand(time(NULL));
code = qWorkerInit(NULL, &mgmt); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
int32_t t = 0; int32_t t = 0;
...@@ -446,7 +451,7 @@ TEST(seqTest, multithreadRand) { ...@@ -446,7 +451,7 @@ TEST(seqTest, multithreadRand) {
srand(time(NULL)); srand(time(NULL));
code = qWorkerInit(NULL, &mgmt); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pthread_attr_t thattr; pthread_attr_t thattr;
......
...@@ -36,11 +36,31 @@ enum { ...@@ -36,11 +36,31 @@ enum {
SCH_WRITE, SCH_WRITE,
}; };
typedef struct SSchApiStat {
} SSchApiStat;
typedef struct SSchRuntimeStat {
} SSchRuntimeStat;
typedef struct SSchJobStat {
} SSchJobStat;
typedef struct SSchedulerStat {
SSchApiStat api;
SSchRuntimeStat runtime;
SSchJobStat job;
} SSchedulerStat;
typedef struct SSchedulerMgmt { typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId uint64_t sId; // schedulerId
SSchedulerCfg cfg; SSchedulerCfg cfg;
SHashObj *jobs; // key: queryId, value: SQueryJob* SHashObj *jobs; // key: queryId, value: SQueryJob*
SSchedulerStat stat;
} SSchedulerMgmt; } SSchedulerMgmt;
typedef struct SSchCallbackParam { typedef struct SSchCallbackParam {
......
...@@ -1462,34 +1462,37 @@ void scheduleFreeJob(void *job) { ...@@ -1462,34 +1462,37 @@ void scheduleFreeJob(void *job) {
} }
SSchJob *pJob = job; SSchJob *pJob = job;
uint64_t queryId = pJob->queryId;
if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) { if (SCH_GET_JOB_STATUS(pJob) > 0) {
SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob); if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) {
return; SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob);
} return;
}
schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING); schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING);
SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref)); SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref));
while (true) { while (true) {
int32_t ref = atomic_load_32(&pJob->ref); int32_t ref = atomic_load_32(&pJob->ref);
if (0 == ref) { if (0 == ref) {
break; break;
} else if (ref > 0) { } else if (ref > 0) {
usleep(1); usleep(1);
} else { } else {
assert(0); assert(0);
}
} }
}
SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob)); SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob));
if (pJob->status == JOB_TASK_STATUS_EXECUTING) { if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
schCancelJob(pJob); schCancelJob(pJob);
} }
schDropJobAllTasks(pJob); schDropJobAllTasks(pJob);
}
pJob->subPlans = NULL; // it is a reference to pDag->pSubplans pJob->subPlans = NULL; // it is a reference to pDag->pSubplans
...@@ -1515,6 +1518,8 @@ void scheduleFreeJob(void *job) { ...@@ -1515,6 +1518,8 @@ void scheduleFreeJob(void *job) {
tfree(pJob->res); tfree(pJob->res);
tfree(pJob); tfree(pJob);
qDebug("QID:%"PRIx64" job freed", queryId);
} }
void schedulerDestroy(void) { void schedulerDestroy(void) {
......
...@@ -79,6 +79,7 @@ void schtBuildQueryDag(SQueryDag *dag) { ...@@ -79,6 +79,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
scanPlan->level = 1; scanPlan->level = 1;
scanPlan->pParents = taosArrayInit(1, POINTER_BYTES); scanPlan->pParents = taosArrayInit(1, POINTER_BYTES);
scanPlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); scanPlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
scanPlan->msgType = TDMT_VND_QUERY;
mergePlan->id.queryId = qId; mergePlan->id.queryId = qId;
mergePlan->id.templateId = 0x4444444444; mergePlan->id.templateId = 0x4444444444;
...@@ -89,6 +90,7 @@ void schtBuildQueryDag(SQueryDag *dag) { ...@@ -89,6 +90,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES); mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES);
mergePlan->pParents = NULL; mergePlan->pParents = NULL;
mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
mergePlan->msgType = TDMT_VND_QUERY;
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
...@@ -163,6 +165,11 @@ void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { ...@@ -163,6 +165,11 @@ void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
} }
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
}
void schtSetPlanToString() { void schtSetPlanToString() {
static Stub stub; static Stub stub;
...@@ -190,6 +197,20 @@ void schtSetExecNode() { ...@@ -190,6 +197,20 @@ void schtSetExecNode() {
} }
} }
void schtSetRpcSendRequest() {
static Stub stub;
stub.set(rpcSendRequest, schtRpcSendRequest);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendRequest$", result);
for (const auto& f : result) {
stub.set(f.second, schtRpcSendRequest);
}
}
}
void *schtSendRsp(void *param) { void *schtSendRsp(void *param) {
SSchJob *job = NULL; SSchJob *job = NULL;
int32_t code = 0; int32_t code = 0;
......
...@@ -27,4 +27,11 @@ if (${BUILD_WITH_UV}) ...@@ -27,4 +27,11 @@ if (${BUILD_WITH_UV})
add_definitions(-DUSE_UV) add_definitions(-DUSE_UV)
endif(${BUILD_WITH_UV}) endif(${BUILD_WITH_UV})
if (${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
...@@ -21,10 +21,6 @@ ...@@ -21,10 +21,6 @@
extern "C" { extern "C" {
#endif #endif
#ifdef USE_UV
#else
#define RPC_CONN_TCP 2 #define RPC_CONN_TCP 2
extern int tsRpcOverhead; extern int tsRpcOverhead;
...@@ -75,7 +71,6 @@ typedef struct { ...@@ -75,7 +71,6 @@ typedef struct {
} SRpcDigest; } SRpcDigest;
#pragma pack(pop) #pragma pack(pop)
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -21,8 +21,6 @@ ...@@ -21,8 +21,6 @@
extern "C" { extern "C" {
#endif #endif
#ifdef USE_UV
#else
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosStopTcpServer(void *param); void taosStopTcpServer(void *param);
void taosCleanUpTcpServer(void *param); void taosCleanUpTcpServer(void *param);
...@@ -35,8 +33,6 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin ...@@ -35,8 +33,6 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
void taosCloseTcpConnection(void *chandle); void taosCloseTcpConnection(void *chandle);
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -16,15 +16,65 @@ ...@@ -16,15 +16,65 @@
#ifndef _TD_TRANSPORT_INT_H_ #ifndef _TD_TRANSPORT_INT_H_
#define _TD_TRANSPORT_INT_H_ #define _TD_TRANSPORT_INT_H_
#include "rpcHead.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#ifdef USE_UV #ifdef USE_UV
#else #include <stddef.h>
typedef void *queue[2];
/* Private macros. */
#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0]))
#define QUEUE_PREV(q) (*(queue **)&((*(q))[1]))
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Initialize an empty queue. */
#define QUEUE_INIT(q) \
{ \
QUEUE_NEXT(q) = (q); \
QUEUE_PREV(q) = (q); \
}
/* Return true if the queue has no element. */
#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q))
/* Insert an element at the back of a queue. */
#define QUEUE_PUSH(q, e) \
{ \
QUEUE_NEXT(e) = (q); \
QUEUE_PREV(e) = QUEUE_PREV(q); \
QUEUE_PREV_NEXT(e) = (e); \
QUEUE_PREV(q) = (e); \
}
/* Remove the given element from the queue. Any element can be removed at any *
* time. */
#define QUEUE_REMOVE(e) \
{ \
QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
}
/* Return the element at the front of the queue. */
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
/* Return the element at the back of the queue. */
#define QUEUE_TAIL(q) (QUEUE_PREV(q))
/* Iterate over the element of a queue. * Mutating the queue while iterating
* results in undefined behavior. */
#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q))
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type *)((void *)((char *)(e)-offsetof(type, field))))
#endif // USE_LIBUV
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -22,9 +22,6 @@ ...@@ -22,9 +22,6 @@
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#ifdef USE_UV
#else
typedef struct SConnHash { typedef struct SConnHash {
char fqdn[TSDB_FQDN_LEN]; char fqdn[TSDB_FQDN_LEN];
uint16_t port; uint16_t port;
...@@ -295,4 +292,3 @@ static void rpcUnlockCache(int64_t *lockedBy) { ...@@ -295,4 +292,3 @@ static void rpcUnlockCache(int64_t *lockedBy) {
assert(false); assert(false);
} }
} }
#endif
...@@ -13,9 +13,6 @@ ...@@ -13,9 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifdef USE_UV
#include <uv.h>
#endif
#include "lz4.h" #include "lz4.h"
#include "os.h" #include "os.h"
#include "rpcCache.h" #include "rpcCache.h"
...@@ -30,11 +27,23 @@ ...@@ -30,11 +27,23 @@
#include "tmd5.h" #include "tmd5.h"
#include "tmempool.h" #include "tmempool.h"
#include "tmsg.h" #include "tmsg.h"
#include "transportInt.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT;
int tsRpcMaxUdpSize = 15000; // bytes
int tsProgressTimer = 100;
// not configurable
int tsRpcMaxRetry;
int tsRpcHeadSize;
int tsRpcOverhead;
#ifndef USE_UV
typedef struct { typedef struct {
int sessions; // number of sessions allowed int sessions; // number of sessions allowed
int numOfThreads; // number of threads to process incoming messages int numOfThreads; // number of threads to process incoming messages
...@@ -50,235 +59,21 @@ typedef struct { ...@@ -50,235 +59,21 @@ typedef struct {
char secret[TSDB_PASSWORD_LEN]; // secret for the link char secret[TSDB_PASSWORD_LEN]; // secret for the link
char ckey[TSDB_PASSWORD_LEN]; // ciphering key char ckey[TSDB_PASSWORD_LEN]; // ciphering key
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t refCount; int32_t refCount;
void* parent; void * parent;
void* idPool; // handle to ID pool void * idPool; // handle to ID pool
void* tmrCtrl; // handle to timer void * tmrCtrl; // handle to timer
SHashObj* hash; // handle returned by hash utility SHashObj * hash; // handle returned by hash utility
void* tcphandle; // returned handle from TCP initialization void * tcphandle; // returned handle from TCP initialization
void* udphandle; // returned handle from UDP initialization void * udphandle; // returned handle from UDP initialization
void* pCache; // connection cache void * pCache; // connection cache
pthread_mutex_t mutex; pthread_mutex_t mutex;
struct SRpcConn* connList; // connection list struct SRpcConn *connList; // connection list
} SRpcInfo; } SRpcInfo;
#ifdef USE_UV
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
typedef struct SThreadObj {
pthread_t thread;
uv_pipe_t* pipe;
uv_loop_t* loop;
uv_async_t* workerAsync; //
int fd;
} SThreadObj;
typedef struct SServerObj {
uv_tcp_t server;
uv_loop_t* loop;
int workerIdx;
int numOfThread;
SThreadObj** pThreadObj;
uv_pipe_t** pipe;
} SServerObj;
typedef struct SConnCtx {
uv_tcp_t* pClient;
uv_timer_t* pTimer;
uv_async_t* pWorkerAsync;
int ref;
} SConnCtx;
static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void onTimeout(uv_timer_t* handle);
static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void onWrite(uv_write_t* req, int status);
static void onAccept(uv_stream_t* stream, int status);
void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void workerAsyncCB(uv_async_t* handle);
static void* workerThread(void* arg);
int32_t rpcInit() { return -1; }
void rpcCleanup() { return; };
void* rpcOpen(const SRpcInit* pInit) {
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) {
return NULL;
}
if (pInit->label) {
tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
}
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
SServerObj* srv = calloc(1, sizeof(SServerObj));
srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
srv->numOfThread = pRpc->numOfThreads;
srv->workerIdx = 0;
srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*));
srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*));
uv_loop_init(srv->loop);
for (int i = 0; i < srv->numOfThread; i++) {
srv->pThreadObj[i] = (SThreadObj*)calloc(1, sizeof(SThreadObj));
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
int fds[2];
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
return NULL;
}
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
srv->pThreadObj[i]->fd = fds[0];
srv->pThreadObj[i]->pipe = &(srv->pipe[i][1]); // init read
int err = pthread_create(&(srv->pThreadObj[i]->thread), NULL, workerThread, (void*)(srv->pThreadObj[i]));
if (err == 0) {
tError("sucess to create worker thread %d", i);
// printf("thread %d create\n", i);
} else {
tError("failed to create worker thread %d", i);
return NULL;
}
}
uv_tcp_init(srv->loop, &srv->server);
struct sockaddr_in bind_addr;
uv_ip4_addr("0.0.0.0", pInit->localPort, &bind_addr);
uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0);
int err = 0;
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) {
tError("Listen error %s\n", uv_err_name(err));
return NULL;
}
uv_run(srv->loop, UV_RUN_DEFAULT);
return pRpc;
}
void rpcClose(void* arg) { return; }
void* rpcMallocCont(int contLen) { return NULL; }
void rpcFreeCont(void* cont) { return; }
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; }
void rpcSendResponse(const SRpcMsg* pMsg) {}
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; }
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(suggested_size);
buf->len = suggested_size;
}
void onTimeout(uv_timer_t* handle) {
// opt
tDebug("time out");
}
void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt
tDebug("data already was read on a stream");
}
void onWrite(uv_write_t* req, int status) {
// opt
if (req) tDebug("data already was written on stream");
}
void workerAsyncCB(uv_async_t* handle) {
// opt
SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync);
}
void onAccept(uv_stream_t* stream, int status) {
if (status == -1) {
return;
}
SServerObj* pObj = container_of(stream, SServerObj, server);
tDebug("new conntion accepted by main server, dispatch to one worker thread");
uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pObj->loop, cli);
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));
uv_buf_t buf = uv_buf_init("a", 1);
// despatch to worker thread
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite);
} else {
uv_close((uv_handle_t*)cli, NULL);
}
}
void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
if (nread < 0) {
if (nread != UV_EOF) {
tError("read error %s", uv_err_name(nread));
}
// TODO(log other failure reason)
uv_close((uv_handle_t*)q, NULL);
return;
}
SThreadObj* pObj = (SThreadObj*)container_of(q, struct SThreadObj, pipe);
uv_pipe_t* pipe = (uv_pipe_t*)q;
if (!uv_pipe_pending_count(pipe)) {
tError("No pending count");
return;
}
uv_handle_type pending = uv_pipe_pending_type(pipe);
assert(pending == UV_TCP);
SConnCtx* pConn = malloc(sizeof(SConnCtx));
/* init conn timer*/
pConn->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pObj->loop, pConn->pTimer);
pConn->pClient = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
pConn->pWorkerAsync = pObj->workerAsync; // thread safty
uv_tcp_init(pObj->loop, pConn->pClient);
if (uv_accept(q, (uv_stream_t*)(pConn->pClient)) == 0) {
uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pClient, &fd);
tDebug("new connection created: %d", fd);
uv_timer_start(pConn->pTimer, onTimeout, 10, 0);
uv_read_start((uv_stream_t*)(pConn->pClient), allocBuffer, onRead);
} else {
uv_timer_stop(pConn->pTimer);
free(pConn->pTimer);
uv_close((uv_handle_t*)pConn->pClient, NULL);
free(pConn->pClient);
free(pConn);
}
}
void* workerThread(void* arg) {
SThreadObj* pObj = (SThreadObj*)arg;
int fd = pObj->fd;
pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pObj->loop);
uv_pipe_init(pObj->loop, pObj->pipe, 1);
uv_pipe_open(pObj->pipe, fd);
pObj->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB);
uv_read_start((uv_stream_t*)pObj->pipe, allocBuffer, onConnection);
}
#else
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)
typedef struct { typedef struct {
SRpcInfo * pRpc; // associated SRpcInfo SRpcInfo * pRpc; // associated SRpcInfo
SEpSet epSet; // ip list provided by app SEpSet epSet; // ip list provided by app
...@@ -299,6 +94,13 @@ typedef struct { ...@@ -299,6 +94,13 @@ typedef struct {
char msg[0]; // RpcHead starts from here char msg[0]; // RpcHead starts from here
} SRpcReqContext; } SRpcReqContext;
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead *)((char *)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)
typedef struct SRpcConn { typedef struct SRpcConn {
char info[48]; // debug info: label + pConn + ahandle char info[48]; // debug info: label + pConn + ahandle
int sid; // session ID int sid; // session ID
...@@ -336,15 +138,6 @@ typedef struct SRpcConn { ...@@ -336,15 +138,6 @@ typedef struct SRpcConn {
SRpcReqContext *pContext; // request context SRpcReqContext *pContext; // request context
} SRpcConn; } SRpcConn;
static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT;
int tsRpcMaxUdpSize = 15000; // bytes
int tsProgressTimer = 100;
// not configurable
int tsRpcMaxRetry;
int tsRpcHeadSize;
int tsRpcOverhead;
static int tsRpcRefId = -1; static int tsRpcRefId = -1;
static int32_t tsRpcNum = 0; static int32_t tsRpcNum = 0;
// static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT; // static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
...@@ -442,7 +235,8 @@ void *rpcOpen(const SRpcInit *pInit) { ...@@ -442,7 +235,8 @@ void *rpcOpen(const SRpcInit *pInit) {
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL; if (pRpc == NULL) return NULL;
if (pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); if (pInit->label) tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
if (pRpc->connType == TAOS_CONN_CLIENT) { if (pRpc->connType == TAOS_CONN_CLIENT) {
pRpc->numOfThreads = pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads;
......
...@@ -14,9 +14,6 @@ ...@@ -14,9 +14,6 @@
*/ */
#include "rpcTcp.h" #include "rpcTcp.h"
#ifdef USE_UV
#include <uv.h>
#endif
#include "os.h" #include "os.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "rpcLog.h" #include "rpcLog.h"
...@@ -24,9 +21,6 @@ ...@@ -24,9 +21,6 @@
#include "taoserror.h" #include "taoserror.h"
#include "tutil.h" #include "tutil.h"
#ifdef USE_UV
#else
typedef struct SFdObj { typedef struct SFdObj {
void * signature; void * signature;
SOCKET fd; // TCP socket FD SOCKET fd; // TCP socket FD
...@@ -665,5 +659,3 @@ static void taosFreeFdObj(SFdObj *pFdObj) { ...@@ -665,5 +659,3 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
tfree(pFdObj); tfree(pFdObj);
} }
#endif
...@@ -22,9 +22,6 @@ ...@@ -22,9 +22,6 @@
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#ifdef USE_UV
// no support upd currently
#else
#define RPC_MAX_UDP_CONNS 256 #define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000 #define RPC_MAX_UDP_PKTS 1000
#define RPC_UDP_BUF_TIME 5 // mseconds #define RPC_UDP_BUF_TIME 5 // mseconds
...@@ -260,4 +257,3 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c ...@@ -260,4 +257,3 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c
return ret; return ret;
} }
#endif
add_executable(transportTest "")
target_sources(transportTest
PRIVATE
"transportTests.cc"
)
target_include_directories(transportTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (transportTest
os
util
common
gtest_main
transport
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <chrono>
#include <iostream>
#include <string>
#include <thread>
#include "transportInt.h"
#include "trpc.h"
using namespace std;
int main() {
SRpcInit init = {.localPort = 6030, .label = "rpc", .numOfThreads = 5};
void* p = rpcOpen(&init);
while (1) {
std::cout << "cron task" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000));
}
}
...@@ -36,7 +36,7 @@ void* tmemmem(char* haystack, int hlen, char* needle, int nlen) { ...@@ -36,7 +36,7 @@ void* tmemmem(char* haystack, int hlen, char* needle, int nlen) {
char* limit; char* limit;
if (nlen == 0 || hlen < nlen) { if (nlen == 0 || hlen < nlen) {
return false; return NULL;
} }
limit = haystack + hlen - nlen + 1; limit = haystack + hlen - nlen + 1;
...@@ -54,10 +54,12 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { ...@@ -54,10 +54,12 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) {
ASSERT(pWal->fileInfoSet != NULL); ASSERT(pWal->fileInfoSet != NULL);
int sz = taosArrayGetSize(pWal->fileInfoSet); int sz = taosArrayGetSize(pWal->fileInfoSet);
ASSERT(sz > 0); ASSERT(sz > 0);
#if 0
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i); SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
} }
#endif
SWalFileInfo *pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz-1); SWalFileInfo *pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz-1);
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
...@@ -143,8 +145,6 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -143,8 +145,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
SWalFileInfo fileInfo; SWalFileInfo fileInfo;
memset(&fileInfo, -1, sizeof(SWalFileInfo)); memset(&fileInfo, -1, sizeof(SWalFileInfo));
sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer); sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
//get lastVer
//get size
taosArrayPush(pLogInfoArray, &fileInfo); taosArrayPush(pLogInfoArray, &fileInfo);
} }
} }
...@@ -158,60 +158,51 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -158,60 +158,51 @@ int walCheckAndRepairMeta(SWal* pWal) {
oldSz = taosArrayGetSize(pWal->fileInfoSet); oldSz = taosArrayGetSize(pWal->fileInfoSet);
} }
int newSz = taosArrayGetSize(pLogInfoArray); int newSz = taosArrayGetSize(pLogInfoArray);
// case 1. meta file not exist / cannot be parsed
if (oldSz < newSz) { if (oldSz > newSz) {
taosArrayPopFrontBatch(pWal->fileInfoSet, oldSz - newSz);
} else if (oldSz < newSz) {
for (int i = oldSz; i < newSz; i++) { for (int i = oldSz; i < newSz; i++) {
SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i); SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i);
taosArrayPush(pWal->fileInfoSet, pFileInfo); taosArrayPush(pWal->fileInfoSet, pFileInfo);
} }
pWal->writeCur = newSz - 1;
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pLogInfoArray, 0))->firstVer;
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer;
ASSERT(pWal->vers.lastVer != -1);
int code = walSaveMeta(pWal);
if (code < 0) {
taosArrayDestroy(pLogInfoArray);
return -1;
}
} }
taosArrayDestroy(pLogInfoArray);
// case 2. versions in meta not match log
// or some log not included in meta
// (e.g. program killed)
//
// case 3. other corrupt cases
//
#if 0
int sz = taosArrayGetSize(pLogInfoArray);
for (int i = 0; i < sz; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
if (i == 0 && pFileInfo->firstVer != walGetFirstVer(pWal)) {
//repair
}
if (i > 0) {
SWalFileInfo* pLastFileInfo = taosArrayGet(pLogInfoArray, i-1);
if (pLastFileInfo->lastVer != pFileInfo->firstVer) {
pWal->writeCur = newSz - 1;
if (newSz > 0) {
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
SWalFileInfo *pLastFileInfo = taosArrayGet(pWal->fileInfoSet, newSz-1);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
struct stat statbuf;
stat(fnameStr, &statbuf);
if (oldSz != newSz || pLastFileInfo->fileSize != statbuf.st_size) {
pLastFileInfo->fileSize = statbuf.st_size;
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer;
ASSERT(pWal->vers.lastVer != -1);
int code = walSaveMeta(pWal);
if (code < 0) {
taosArrayDestroy(pLogInfoArray);
return -1;
} }
} }
} }
#endif
// get last version of this file //TODO: set fileSize and lastVer if necessary
//
// rebuild meta
taosArrayDestroy(pLogInfoArray);
return 0; return 0;
} }
int walCheckAndRepairIdx(SWal* pWal) { int walCheckAndRepairIdx(SWal* pWal) {
// iterate all idx files // TODO: iterate all log files
// check first and last entry of each idx file valid // if idx not found, scan log and write idx
// if found, check complete by first and last entry of each idx file
// if idx incomplete, binary search last valid entry, and then build other part
return 0; return 0;
} }
......
...@@ -187,13 +187,19 @@ void *threadFunc(void *param) { ...@@ -187,13 +187,19 @@ void *threadFunc(void *param) {
int64_t curMs = 0; int64_t curMs = 0;
int64_t beginMs = taosGetTimestampMs(); int64_t beginMs = taosGetTimestampMs();
pInfo->startMs = beginMs; pInfo->startMs = beginMs;
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { int64_t t = pInfo->tableBeginIndex;
int64_t batch = (pInfo->tableEndIndex - t); for (; t <= pInfo->tableEndIndex;) {
batch = MIN(batch, batchNum); //int64_t batch = (pInfo->tableEndIndex - t);
//batch = MIN(batch, batchNum);
int32_t len = sprintf(qstr, "create table"); int32_t len = sprintf(qstr, "create table");
for (int32_t i = 0; i < batch; ++i) { for (int32_t i = 0; i < batchNum;) {
len += sprintf(qstr + len, " t%" PRId64 " using %s tags(%" PRId64 ")", t + i, stbName, t + i); len += sprintf(qstr + len, " %s_t%" PRId64 " using %s tags(%" PRId64 ")", stbName, t, stbName, t);
t++;
i++;
if (t > pInfo->tableEndIndex) {
break;
}
} }
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
...@@ -212,11 +218,11 @@ void *threadFunc(void *param) { ...@@ -212,11 +218,11 @@ void *threadFunc(void *param) {
curMs = taosGetTimestampMs(); curMs = taosGetTimestampMs();
if (curMs - beginMs > 10000) { if (curMs - beginMs > 10000) {
beginMs = curMs; beginMs = curMs;
//printf("==== tableBeginIndex: %"PRId64", t: %"PRId64"\n", pInfo->tableBeginIndex, t);
printCreateProgress(pInfo, t); printCreateProgress(pInfo, t);
} }
t += (batch - 1);
} }
printCreateProgress(pInfo, pInfo->tableEndIndex); printCreateProgress(pInfo, t);
} }
if (insertData) { if (insertData) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册