提交 76b206fe 编写于 作者: dengyihao's avatar dengyihao

Merge branch 'ddd' into sche

...@@ -213,6 +213,7 @@ endif(${BUILD_WITH_TRAFT}) ...@@ -213,6 +213,7 @@ endif(${BUILD_WITH_TRAFT})
# LIBUV # LIBUV
if(${BUILD_WITH_UV}) if(${BUILD_WITH_UV})
add_compile_options(-Wno-sign-compare)
if (${TD_WINDOWS}) if (${TD_WINDOWS})
file(READ "libuv/include/uv.h" CONTENTS) file(READ "libuv/include/uv.h" CONTENTS)
string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}") string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}")
......
...@@ -1942,13 +1942,14 @@ typedef struct { ...@@ -1942,13 +1942,14 @@ typedef struct {
} SVCreateTSmaReq; } SVCreateTSmaReq;
typedef struct { typedef struct {
int8_t type; // 0 status report, 1 update data int8_t type; // 0 status report, 1 update data
char indexName[TSDB_INDEX_NAME_LEN]; // int64_t indexUid;
STimeWindow windows; int64_t skey; // start TS key of interval/sliding window
} STSmaMsg; } STSmaMsg;
typedef struct { typedef struct {
int64_t ver; // use a general definition int64_t ver; // use a general definition
int64_t indexUid;
char indexName[TSDB_INDEX_NAME_LEN]; char indexName[TSDB_INDEX_NAME_LEN];
} SVDropTSmaReq; } SVDropTSmaReq;
...@@ -2345,9 +2346,9 @@ struct SRpcMsg; ...@@ -2345,9 +2346,9 @@ struct SRpcMsg;
struct SEpSet; struct SEpSet;
struct SMgmtWrapper; struct SMgmtWrapper;
typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* rpcMsg); typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg); typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg); typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -190,6 +190,7 @@ enum { ...@@ -190,6 +190,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
......
...@@ -29,6 +29,7 @@ typedef struct SMgmtWrapper SMgmtWrapper; ...@@ -29,6 +29,7 @@ typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SSnode SSnode; typedef struct SSnode SSnode;
typedef struct { typedef struct {
int32_t reserved;
} SSnodeLoad; } SSnodeLoad;
typedef struct { typedef struct {
......
...@@ -52,8 +52,8 @@ typedef struct { ...@@ -52,8 +52,8 @@ typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
int32_t rspLen; int32_t rspLen;
void *pRsp; void * pRsp;
void *pNode; void * pNode;
} SNodeMsg; } SNodeMsg;
typedef struct SRpcInit { typedef struct SRpcInit {
...@@ -87,7 +87,15 @@ typedef struct { ...@@ -87,7 +87,15 @@ typedef struct {
} SRpcCtxVal; } SRpcCtxVal;
typedef struct { typedef struct {
SHashObj *args; int32_t msgType;
void * val;
int32_t len;
void (*free)(void *arg);
} SRpcBrokenlinkVal;
typedef struct {
SHashObj * args;
SRpcBrokenlinkVal brokenVal;
} SRpcCtx; } SRpcCtx;
int32_t rpcInit(); int32_t rpcInit();
......
...@@ -460,7 +460,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i ...@@ -460,7 +460,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
STscObj* pTscObj = (STscObj*)taos; STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
SQuery* pQueryNode = NULL; SQuery* pQueryNode = NULL;
char* pStr = NULL; char* astStr = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
if (taos == NULL || topicName == NULL || sql == NULL) { if (taos == NULL || topicName == NULL || sql == NULL) {
...@@ -489,17 +489,17 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i ...@@ -489,17 +489,17 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
// todo check for invalid sql statement and return with error code // todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &pStr, NULL), _return); CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
/*printf("%s\n", pStr);*/ /*printf("%s\n", pStr);*/
SName name = { .acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T }; SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
strcpy(name.dbname, pRequest->pDb); strcpy(name.dbname, pRequest->pDb);
strcpy(name.tname, topicName); strcpy(name.tname, topicName);
SCMCreateTopicReq req = { SCMCreateTopicReq req = {
.igExists = 1, .igExists = 1,
.ast = (char*)pStr, .ast = (char*)astStr,
.sql = (char*)sql, .sql = (char*)sql,
}; };
tNameExtractFullName(&name, req.name); tNameExtractFullName(&name, req.name);
...@@ -513,7 +513,11 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i ...@@ -513,7 +513,11 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
tSerializeSCMCreateTopicReq(buf, tlen, &req); tSerializeSCMCreateTopicReq(buf, tlen, &req);
/*printf("formatted: %s\n", dagStr);*/ /*printf("formatted: %s\n", dagStr);*/
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
pRequest->type = TDMT_MND_CREATE_TOPIC; pRequest->type = TDMT_MND_CREATE_TOPIC;
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
......
...@@ -2716,12 +2716,14 @@ int32_t tSerializeSVDropTSmaReq(void **buf, SVDropTSmaReq *pReq) { ...@@ -2716,12 +2716,14 @@ int32_t tSerializeSVDropTSmaReq(void **buf, SVDropTSmaReq *pReq) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->ver); tlen += taosEncodeFixedI64(buf, pReq->ver);
tlen += taosEncodeFixedI64(buf, pReq->indexUid);
tlen += taosEncodeString(buf, pReq->indexName); tlen += taosEncodeString(buf, pReq->indexName);
return tlen; return tlen;
} }
void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) { void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) {
buf = taosDecodeFixedI64(buf, &(pReq->ver)); buf = taosDecodeFixedI64(buf, &(pReq->ver));
buf = taosDecodeFixedI64(buf, &(pReq->indexUid));
buf = taosDecodeStringTo(buf, pReq->indexName); buf = taosDecodeStringTo(buf, pReq->indexName);
return buf; return buf;
......
...@@ -148,4 +148,5 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) { ...@@ -148,4 +148,5 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)mmProcessWriteMsg);
} }
...@@ -24,6 +24,8 @@ ...@@ -24,6 +24,8 @@
extern "C" { extern "C" {
#endif #endif
typedef enum { VND_WRITE_QUEUE, VND_QUERY_QUEUE, VND_FETCH_QUEUE, VND_APPLY_QUEUE, VND_SYNC_QUEUE } EVndQueueType;
typedef struct SVnodesMgmt { typedef struct SVnodesMgmt {
SHashObj *hash; SHashObj *hash;
SRWLatch latch; SRWLatch latch;
...@@ -102,7 +104,8 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); ...@@ -102,7 +104,8 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
......
...@@ -296,7 +296,10 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { ...@@ -296,7 +296,10 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
vnodeOpt.nthreads = tsNumOfCommitThreads; vnodeOpt.nthreads = tsNumOfCommitThreads;
vnodeOpt.putToQueryQFp = vmPutMsgToQueryQueue; vnodeOpt.putToQueryQFp = vmPutMsgToQueryQueue;
vnodeOpt.putToFetchQFp = vmPutMsgToQueryQueue;
vnodeOpt.sendReqFp = dndSendReqToDnode; vnodeOpt.sendReqFp = dndSendReqToDnode;
vnodeOpt.sendMnodeReqFp = dndSendReqToMnode;
vnodeOpt.sendRspFp = dndSendRsp;
if (vnodeInit(&vnodeOpt) != 0) { if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode since %s", terrstr()); dError("failed to init vnode since %s", terrstr());
goto _OVER; goto _OVER;
......
...@@ -265,6 +265,7 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { ...@@ -265,6 +265,7 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
......
...@@ -16,46 +16,109 @@ ...@@ -16,46 +16,109 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "vmInt.h" #include "vmInt.h"
static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
dndSendRsp(pWrapper, &rsp);
}
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg);
switch (msgType) {
case TDMT_DND_CREATE_VNODE:
code = vmProcessCreateVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_ALTER_VNODE:
code = vmProcessAlterVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_DROP_VNODE:
code = vmProcessDropVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_SYNC_VNODE:
code = vmProcessSyncVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_COMPACT_VNODE:
code = vmProcessCompactVnodeReq(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dError("msg:%p, not processed in vnode-mgmt queue", pMsg);
}
if (msgType & 1u) {
if (code != 0 && terrno != 0) code = terrno;
vmSendRsp(pMgmt->pWrapper, pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed in vnode query queue", pMsg); dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
} }
static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed in vnode fetch queue", pMsg); dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
} }
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
if (pArray == NULL) {
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
return;
}
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
SNodeMsg *pMsg = NULL; SNodeMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg); if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
dTrace("msg:%p, will be processed in vnode write queue", pMsg);
void *ptr = taosArrayPush(pArray, &pMsg); dTrace("msg:%p, will be processed in vnode-write queue", pMsg);
assert(ptr != NULL); if (taosArrayPush(pArray, &pMsg) == NULL) {
dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
vmSendRsp(pVnode->pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY);
}
} }
vnodeProcessWMsgs(pVnode->pImpl, pArray); vnodeProcessWMsgs(pVnode->pImpl, pArray);
for (size_t i = 0; i < numOfMsgs; i++) { numOfMsgs = taosArrayGetSize(pArray);
SRpcMsg *pRsp = NULL; for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp); SRpcMsg *pRsp = NULL;
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
if (pRsp != NULL) { if (pRsp != NULL) {
pRsp->ahandle = pRpc->ahandle; pRsp->ahandle = pRpc->ahandle;
dndSendRsp(pVnode->pWrapper, pRsp); dndSendRsp(pVnode->pWrapper, pRsp);
free(pRsp); free(pRsp);
} else { } else {
if (code != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code}; vmSendRsp(pVnode->pWrapper, pMsg, code);
dndSendRsp(pVnode->pWrapper, &rpcRsp);
} }
} }
for (size_t i = 0; i < numOfMsgs; i++) { for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont); rpcFreeCont(pMsg->rpcMsg.pCont);
...@@ -89,93 +152,112 @@ static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOf ...@@ -89,93 +152,112 @@ static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOf
} }
} }
static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) { static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueueType qtype) {
SRpcMsg *pMsg = &pNodeMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
SMsgHead *pHead = pMsg->pCont; SMsgHead *pHead = pRpc->pCont;
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId); dError("vgId:%d, failed to write msg:%p to queue since %s", pHead->vgId, pMsg, terrstr());
return -1;
} }
return pVnode; switch (qtype) {
} case VND_QUERY_QUEUE:
dTrace("msg:%p, will be written into vnode-query queue", pMsg);
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { code = taosWriteQitem(pVnode->pQueryQ, pMsg);
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); break;
if (pVnode == NULL) return -1; case VND_FETCH_QUEUE:
dTrace("msg:%p, will be written into vnode-fetch queue", pMsg);
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
break;
case VND_WRITE_QUEUE:
dTrace("msg:%p, will be written into vnode-write queue", pMsg);
code = taosWriteQitem(pVnode->pWriteQ, pMsg);
case VND_SYNC_QUEUE:
dTrace("msg:%p, will be written into vnode-sync queue", pMsg);
code = taosWriteQitem(pVnode->pSyncQ, pMsg);
default:
terrno = TSDB_CODE_INVALID_PARA;
break;
}
int32_t code = taosWriteQitem(pVnode->pWriteQ, pMsg);
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
return code; return code;
} }
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_SYNC_QUEUE);
if (pVnode == NULL) return -1; }
int32_t code = taosWriteQitem(pVnode->pSyncQ, pMsg); int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
vmReleaseVnode(pMgmt, pVnode); return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_WRITE_QUEUE);
return code;
} }
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_QUERY_QUEUE);
if (pVnode == NULL) return -1;
int32_t code = taosWriteQitem(pVnode->pQueryQ, pMsg);
vmReleaseVnode(pMgmt, pVnode);
return code;
} }
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_FETCH_QUEUE);
if (pVnode == NULL) return -1; }
int32_t code = taosWriteQitem(pVnode->pFetchQ, pMsg); int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
vmReleaseVnode(pMgmt, pVnode); SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
return code; dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
} }
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQueueType qtype) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt; SVnodesMgmt *pMgmt = pWrapper->pMgmt;
int32_t code = -1;
int32_t code = -1; SMsgHead *pHead = pRpc->pCont;
SMsgHead *pHead = pRpc->pCont;
// pHead->vgId = htonl(pHead->vgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1; if (pVnode == NULL) return -1;
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg != NULL) { if (pMsg != NULL) {
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
pMsg->rpcMsg = *pRpc; pMsg->rpcMsg = *pRpc;
code = taosWriteQitem(pVnode->pQueryQ, pMsg); switch (qtype) {
case VND_QUERY_QUEUE:
dTrace("msg:%p, will be put into vnode-query queue", pMsg);
code = taosWriteQitem(pVnode->pQueryQ, pMsg);
break;
case VND_FETCH_QUEUE:
dTrace("msg:%p, will be put into vnode-fetch queue", pMsg);
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
break;
case VND_APPLY_QUEUE:
dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
code = taosWriteQitem(pVnode->pApplyQ, pMsg);
break;
case VND_WRITE_QUEUE:
case VND_SYNC_QUEUE:
default:
terrno = TSDB_CODE_INVALID_PARA;
break;
}
} }
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
return code; return code;
} }
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pRpc) { int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt; return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_QUERY_QUEUE);
}
int32_t code = -1;
SMsgHead *pHead = pRpc->pCont;
// pHead->vgId = htonl(pHead->vgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
if (pVnode == NULL) return -1; return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_FETCH_QUEUE);
}
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
if (pMsg != NULL) { return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_APPLY_QUEUE);
pMsg->rpcMsg = *pRpc;
code = taosWriteQitem(pVnode->pApplyQ, pMsg);
}
vmReleaseVnode(pMgmt, pVnode);
return code;
} }
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
...@@ -191,6 +273,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { ...@@ -191,6 +273,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
return -1; return -1;
} }
dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId);
return 0; return 0;
} }
...@@ -205,43 +288,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { ...@@ -205,43 +288,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pSyncQ = NULL; pVnode->pSyncQ = NULL;
pVnode->pFetchQ = NULL; pVnode->pFetchQ = NULL;
pVnode->pQueryQ = NULL; pVnode->pQueryQ = NULL;
} dDebug("vgId:%d, vnode queue is freed", pVnode->vgId);
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, will be processed in vnode mgmt queue", pMsg);
switch (msgType) {
case TDMT_DND_CREATE_VNODE:
code = vmProcessCreateVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_ALTER_VNODE:
code = vmProcessAlterVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_DROP_VNODE:
code = vmProcessDropVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_SYNC_VNODE:
code = vmProcessSyncVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_COMPACT_VNODE:
code = vmProcessCompactVnodeReq(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dError("msg:%p, not processed in mgmt queue", pMsg);
}
if (msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle};
dndSendRsp(pMgmt->pWrapper, &rsp);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
} }
int32_t vmStartWorker(SVnodesMgmt *pMgmt) { int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
...@@ -275,7 +322,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { ...@@ -275,7 +322,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
if (tWWorkerInit(pWPool) != 0) return -1; if (tWWorkerInit(pWPool) != 0) return -1;
if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) { if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr()); dError("failed to start vnode-mgmt worker since %s", terrstr());
return -1; return -1;
} }
...@@ -291,9 +338,3 @@ void vmStopWorker(SVnodesMgmt *pMgmt) { ...@@ -291,9 +338,3 @@ void vmStopWorker(SVnodesMgmt *pMgmt) {
tWWorkerCleanup(&pMgmt->syncPool); tWWorkerCleanup(&pMgmt->syncPool);
dDebug("vnode workers is closed"); dDebug("vnode workers is closed");
} }
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
}
\ No newline at end of file
...@@ -802,6 +802,32 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p ...@@ -802,6 +802,32 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p
return 0; return 0;
} }
static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bool useRpcMalloc) {
SDropDbRsp dropRsp = {0};
if (pDb != NULL) {
memcpy(dropRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
dropRsp.uid = pDb->uid;
}
int32_t rspLen = tSerializeSDropDbRsp(NULL, 0, &dropRsp);
void *pRsp = NULL;
if (useRpcMalloc) {
pRsp = rpcMallocCont(rspLen);
} else {
pRsp = malloc(rspLen);
}
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSDropDbRsp(pRsp, rspLen, &dropRsp);
*pRspLen = rspLen;
*ppRsp = pRsp;
return 0;
}
static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) { static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_DB, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_DB, &pReq->rpcMsg);
...@@ -814,18 +840,9 @@ static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) { ...@@ -814,18 +840,9 @@ static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) {
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
SDropDbRsp dropRsp = {0}; int32_t rspLen = 0;
memcpy(dropRsp.db, pDb->name, TSDB_DB_FNAME_LEN); void *pRsp = NULL;
dropRsp.uid = pDb->uid; if (mndBuildDropDbRsp(pDb, &rspLen, &pRsp, false) < 0) goto DROP_DB_OVER;
int32_t rspLen = tSerializeSDropDbRsp(NULL, 0, &dropRsp);
void *pRsp = malloc(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto DROP_DB_OVER;
}
tSerializeSDropDbRsp(pRsp, rspLen, &dropRsp);
mndTransSetRpcRsp(pTrans, pRsp, rspLen); mndTransSetRpcRsp(pTrans, pRsp, rspLen);
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER;
...@@ -854,7 +871,7 @@ static int32_t mndProcessDropDbReq(SNodeMsg *pReq) { ...@@ -854,7 +871,7 @@ static int32_t mndProcessDropDbReq(SNodeMsg *pReq) {
pDb = mndAcquireDb(pMnode, dropReq.db); pDb = mndAcquireDb(pMnode, dropReq.db);
if (pDb == NULL) { if (pDb == NULL) {
if (dropReq.ignoreNotExists) { if (dropReq.ignoreNotExists) {
code = 0; code = mndBuildDropDbRsp(pDb, &pReq->rspLen, &pReq->pRsp, true);
goto DROP_DB_OVER; goto DROP_DB_OVER;
} else { } else {
terrno = TSDB_CODE_MND_DB_NOT_EXIST; terrno = TSDB_CODE_MND_DB_NOT_EXIST;
......
...@@ -77,9 +77,9 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) { ...@@ -77,9 +77,9 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
} }
static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) { static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont; SStreamExecMsgHead *pHead = pMsg->pCont;
int32_t taskId = pHead->streamTaskId; int32_t taskId = pHead->streamTaskId;
SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId); SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId);
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
......
...@@ -55,6 +55,8 @@ int tqCommit(STQ*); ...@@ -55,6 +55,8 @@ int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -62,6 +62,7 @@ typedef struct { ...@@ -62,6 +62,7 @@ typedef struct {
typedef struct { typedef struct {
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
PutToQueueFp putToQueryQFp; PutToQueueFp putToQueryQFp;
PutToQueueFp putToFetchQFp;
SendReqFp sendReqFp; SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp; SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp; SendRspFp sendRspFp;
...@@ -125,9 +126,8 @@ void vnodeDestroy(const char *path); ...@@ -125,9 +126,8 @@ void vnodeDestroy(const char *path);
* *
* @param pVnode The vnode object. * @param pVnode The vnode object.
* @param pMsgs The array of SRpcMsg * @param pMsgs The array of SRpcMsg
* @return int 0 for success, -1 for failure
*/ */
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs); void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
/** /**
* @brief Apply a write request message. * @brief Apply a write request message.
......
...@@ -46,15 +46,18 @@ typedef struct SVnodeTask { ...@@ -46,15 +46,18 @@ typedef struct SVnodeTask {
typedef struct SVnodeMgr { typedef struct SVnodeMgr {
td_mode_flag_t vnodeInitFlag; td_mode_flag_t vnodeInitFlag;
// For commit // For commit
bool stop; bool stop;
uint16_t nthreads; uint16_t nthreads;
TdThread* threads; TdThread* threads;
TdThreadMutex mutex; TdThreadMutex mutex;
TdThreadCond hasTask; TdThreadCond hasTask;
TD_DLIST(SVnodeTask) queue; TD_DLIST(SVnodeTask) queue;
// For vnode Mgmt // For vnode Mgmt
PutToQueueFp putToQueryQFp; PutToQueueFp putToQueryQFp;
SendReqFp sendReqFp; PutToQueueFp putToFetchQFp;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SVnodeMgr; } SVnodeMgr;
extern SVnodeMgr vnodeMgr; extern SVnodeMgr vnodeMgr;
...@@ -85,7 +88,10 @@ struct SVnode { ...@@ -85,7 +88,10 @@ struct SVnode {
int vnodeScheduleTask(SVnodeTask* task); int vnodeScheduleTask(SVnodeTask* task);
int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq); int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq);
void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq); int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq);
int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq);
int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq);
void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp);
#define vFatal(...) \ #define vFatal(...) \
do { \ do { \
......
...@@ -433,3 +433,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -433,3 +433,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
return 0; return 0;
} }
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) {
//
return 0;
}
...@@ -1289,7 +1289,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) { ...@@ -1289,7 +1289,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) {
} }
if (tsdbSaveFSStatus(pRepo, pRepo->fs->cstatus) < 0) { if (tsdbSaveFSStatus(pRepo, pRepo->fs->cstatus) < 0) {
tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
} }
......
...@@ -26,7 +26,10 @@ int vnodeInit(const SVnodeOpt *pOption) { ...@@ -26,7 +26,10 @@ int vnodeInit(const SVnodeOpt *pOption) {
vnodeMgr.stop = false; vnodeMgr.stop = false;
vnodeMgr.putToQueryQFp = pOption->putToQueryQFp; vnodeMgr.putToQueryQFp = pOption->putToQueryQFp;
vnodeMgr.putToFetchQFp = pOption->putToFetchQFp;
vnodeMgr.sendReqFp = pOption->sendReqFp; vnodeMgr.sendReqFp = pOption->sendReqFp;
vnodeMgr.sendMnodeReqFp = pOption->sendMnodeReqFp;
vnodeMgr.sendRspFp = pOption->sendRspFp;
// Start commit handers // Start commit handers
if (pOption->nthreads > 0) { if (pOption->nthreads > 0) {
...@@ -90,15 +93,23 @@ int vnodeScheduleTask(SVnodeTask* pTask) { ...@@ -90,15 +93,23 @@ int vnodeScheduleTask(SVnodeTask* pTask) {
} }
int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
if (pVnode == NULL || pVnode->pMeta == NULL || vnodeMgr.putToQueryQFp == NULL) {
terrno = TSDB_CODE_VND_APP_ERROR;
return -1;
}
return (*vnodeMgr.putToQueryQFp)(pVnode->pWrapper, pReq); return (*vnodeMgr.putToQueryQFp)(pVnode->pWrapper, pReq);
} }
void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) { int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq) {
(*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq); return (*vnodeMgr.putToFetchQFp)(pVnode->pWrapper, pReq);
}
int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {
return (*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq);
}
int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq) {
return (*vnodeMgr.sendMnodeReqFp)(pVnode->pWrapper, pReq);
}
void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp) {
(*vnodeMgr.sendRspFp)(pVnode->pWrapper, pRsp);
} }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
......
...@@ -24,9 +24,7 @@ int vnodeQueryOpen(SVnode *pVnode) { ...@@ -24,9 +24,7 @@ int vnodeQueryOpen(SVnode *pVnode) {
(putReqToQueryQFp)vnodePutToVQueryQ, (sendReqFp)vnodeSendReq); (putReqToQueryQFp)vnodePutToVQueryQ, (sendReqFp)vnodeSendReq);
} }
void vnodeQueryClose(SVnode *pVnode) { void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
qWorkerDestroy((void **)&pVnode->pQuery);
}
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in query queue is processing"); vTrace("message in query queue is processing");
...@@ -67,6 +65,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -67,6 +65,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return vnodeGetTableMeta(pVnode, pMsg); return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_EXEC:
return tqProcessTaskExec(pVnode->pTq, pMsg);
case TDMT_VND_QUERY_HEARTBEAT: case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
default: default:
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "tq.h" #include "tq.h"
#include "vnd.h" #include "vnd.h"
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SNodeMsg *pMsg; SNodeMsg *pMsg;
SRpcMsg *pRpc; SRpcMsg *pRpc;
...@@ -40,7 +40,8 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { ...@@ -40,7 +40,8 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
// TODO: Integrate RAFT module here // TODO: Integrate RAFT module here
return 0; // No results are returned because error handling is difficult
// return 0;
} }
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
......
...@@ -154,15 +154,7 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) { ...@@ -154,15 +154,7 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
// add more search type // add more search type
} }
char* dst = NULL; ctx->data = strdup((char*)data);
if (data != NULL) {
char* src = (char*)data;
size_t len = strlen(src);
dst = (char*)calloc(1, len * sizeof(char) + 1);
memcpy(dst, src, len);
}
ctx->data = dst;
ctx->type = atype; ctx->type = atype;
ctx->stdata = (void*)sv; ctx->stdata = (void*)sv;
return ctx; return ctx;
......
...@@ -228,8 +228,8 @@ typedef struct SConnBuffer { ...@@ -228,8 +228,8 @@ typedef struct SConnBuffer {
typedef void (*AsyncCB)(uv_async_t* handle); typedef void (*AsyncCB)(uv_async_t* handle);
typedef struct { typedef struct {
void* pThrd; void* pThrd;
queue qmsg; queue qmsg;
TdThreadMutex mtx; // protect qmsg; TdThreadMutex mtx; // protect qmsg;
} SAsyncItem; } SAsyncItem;
...@@ -273,10 +273,52 @@ void transCloseClient(void* arg); ...@@ -273,10 +273,52 @@ void transCloseClient(void* arg);
void transCloseServer(void* arg); void transCloseServer(void* arg);
void transCtxInit(STransCtx* ctx); void transCtxInit(STransCtx* ctx);
void transCtxDestroy(STransCtx* ctx); void transCtxCleanup(STransCtx* ctx);
void transCtxClear(STransCtx* ctx); void transCtxClear(STransCtx* ctx);
void transCtxMerge(STransCtx* dst, STransCtx* src); void transCtxMerge(STransCtx* dst, STransCtx* src);
void* transCtxDumpVal(STransCtx* ctx, int32_t key); void* transCtxDumpVal(STransCtx* ctx, int32_t key);
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType);
// queue sending msgs
typedef struct {
SArray* q;
void (*free)(void* arg);
} STransQueue;
/*
* init queue
* note: queue'size is small, default 1
*/
void transQueueInit(STransQueue* queue, void (*free)(void* arg));
/*
* put arg into queue
* if queue'size > 1, return false; else return true
*/
bool transQueuePush(STransQueue* queue, void* arg);
/*
* pop head from queue
*/
void* transQueuePop(STransQueue* queue);
/*
* get head from queue
*/
void* transQueueGet(STransQueue* queue);
/*
* queue empty or not
*/
bool transQueueEmpty(STransQueue* queue);
/*
* clear queue
*/
void transQueueClear(STransQueue* queue);
/*
* destroy queue
*/
void transQueueDestroy(STransQueue* queue);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,13 +25,14 @@ typedef struct SCliConn { ...@@ -25,13 +25,14 @@ typedef struct SCliConn {
void* hostThrd; void* hostThrd;
SConnBuffer readBuf; SConnBuffer readBuf;
void* data; void* data;
SArray* cliMsgs; // SArray* cliMsgs;
queue conn; STransQueue cliMsgs;
uint64_t expireTime; queue conn;
int hThrdIdx; uint64_t expireTime;
bool broken; // link broken or not int hThrdIdx;
STransCtx ctx; STransCtx ctx;
bool broken; // link broken or not
ConnStatus status; // ConnStatus status; //
int release; // 1: release int release; // 1: release
// spi configure // spi configure
...@@ -56,14 +57,14 @@ typedef struct SCliMsg { ...@@ -56,14 +57,14 @@ typedef struct SCliMsg {
} SCliMsg; } SCliMsg;
typedef struct SCliThrdObj { typedef struct SCliThrdObj {
TdThread thread; TdThread thread;
uv_loop_t* loop; uv_loop_t* loop;
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
uv_timer_t timer; uv_timer_t timer;
void* pool; // conn pool void* pool; // conn pool
// msg queue // msg queue
queue msg; queue msg;
TdThreadMutex msgMtx; TdThreadMutex msgMtx;
uint64_t nextTimeout; // next timeout uint64_t nextTimeout; // next timeout
...@@ -181,12 +182,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd); ...@@ -181,12 +182,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
static void* cliWorkThread(void* arg); static void* cliWorkThread(void* arg);
bool cliMaySendCachedMsg(SCliConn* conn) { bool cliMaySendCachedMsg(SCliConn* conn) {
if (taosArrayGetSize(conn->cliMsgs) > 0) { if (!transQueueEmpty(&conn->cliMsgs)) {
cliSend(conn); cliSend(conn);
return true; return true;
} else {
return false;
} }
return false;
} }
void cliHandleResp(SCliConn* conn) { void cliHandleResp(SCliConn* conn) {
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
...@@ -195,6 +195,7 @@ void cliHandleResp(SCliConn* conn) { ...@@ -195,6 +195,7 @@ void cliHandleResp(SCliConn* conn) {
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
STransMsg transMsg = {0}; STransMsg transMsg = {0};
transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.contLen = transContLenFromMsg(pHead->msgLen);
transMsg.pCont = transContFromHead((char*)pHead); transMsg.pCont = transContFromHead((char*)pHead);
...@@ -204,15 +205,14 @@ void cliHandleResp(SCliConn* conn) { ...@@ -204,15 +205,14 @@ void cliHandleResp(SCliConn* conn) {
CONN_SHOULD_RELEASE(conn, pHead); CONN_SHOULD_RELEASE(conn, pHead);
SCliMsg* pMsg = NULL; SCliMsg* pMsg = transQueuePop(&conn->cliMsgs);
if (taosArrayGetSize(conn->cliMsgs) > 0) {
pMsg = taosArrayGetP(conn->cliMsgs, 0);
taosArrayRemove(conn->cliMsgs, 0);
}
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
if (transMsg.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
}
} else { } else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
} }
...@@ -264,7 +264,7 @@ _RETURN: ...@@ -264,7 +264,7 @@ _RETURN:
} }
void cliHandleExcept(SCliConn* pConn) { void cliHandleExcept(SCliConn* pConn) {
if (taosArrayGetSize(pConn->cliMsgs) == 0) { if (transQueueEmpty(&pConn->cliMsgs)) {
if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) { if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) {
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
return; return;
...@@ -274,11 +274,7 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -274,11 +274,7 @@ void cliHandleExcept(SCliConn* pConn) {
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
do { do {
SCliMsg* pMsg = NULL; SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
if (taosArrayGetSize(pConn->cliMsgs) > 0) {
pMsg = taosArrayGetP(pConn->cliMsgs, 0);
taosArrayRemove(pConn->cliMsgs, 0);
}
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
...@@ -289,6 +285,9 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -289,6 +285,9 @@ void cliHandleExcept(SCliConn* pConn) {
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
if (transMsg.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
}
} else { } else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
} }
...@@ -303,7 +302,7 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -303,7 +302,7 @@ void cliHandleExcept(SCliConn* pConn) {
} }
destroyCmsg(pMsg); destroyCmsg(pMsg);
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
} while (taosArrayGetSize(pConn->cliMsgs) > 0); } while (!transQueueEmpty(&pConn->cliMsgs));
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
} }
...@@ -380,21 +379,20 @@ static void addConnToPool(void* pool, SCliConn* conn) { ...@@ -380,21 +379,20 @@ static void addConnToPool(void* pool, SCliConn* conn) {
SCliThrdObj* thrd = conn->hostThrd; SCliThrdObj* thrd = conn->hostThrd;
CONN_HANDLE_THREAD_QUIT(thrd); CONN_HANDLE_THREAD_QUIT(thrd);
char key[128] = {0}; STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
transCtxCleanup(&conn->ctx);
transQueueClear(&conn->cliMsgs);
conn->status = ConnNormal;
transCtxDestroy(&conn->ctx); char key[128] = {0};
tstrncpy(key, conn->ip, strlen(conn->ip)); tstrncpy(key, conn->ip, strlen(conn->ip));
tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port)); tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port));
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
conn->status = ConnNormal;
// list already create before // list already create before
assert(plist != NULL); assert(plist != NULL);
taosArrayClear(conn->cliMsgs);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn);
assert(!QUEUE_IS_EMPTY(&plist->conn)); assert(!QUEUE_IS_EMPTY(&plist->conn));
} }
...@@ -445,7 +443,8 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { ...@@ -445,7 +443,8 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
conn->writeReq.data = conn; conn->writeReq.data = conn;
conn->connReq.data = conn; conn->connReq.data = conn;
conn->cliMsgs = taosArrayInit(2, sizeof(void*));
transQueueInit(&conn->cliMsgs, NULL);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
conn->hostThrd = pThrd; conn->hostThrd = pThrd;
conn->status = ConnNormal; conn->status = ConnNormal;
...@@ -465,18 +464,18 @@ static void cliDestroy(uv_handle_t* handle) { ...@@ -465,18 +464,18 @@ static void cliDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
free(conn->ip); free(conn->ip);
free(conn->stream); free(conn->stream);
transCtxDestroy(&conn->ctx); transCtxCleanup(&conn->ctx);
taosArrayDestroy(conn->cliMsgs); transQueueDestroy(&conn->cliMsgs);
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
free(conn); free(conn);
} }
static bool cliHandleNoResp(SCliConn* conn) { static bool cliHandleNoResp(SCliConn* conn) {
bool res = false; bool res = false;
SArray* msgs = conn->cliMsgs; if (!transQueueEmpty(&conn->cliMsgs)) {
if (taosArrayGetSize(msgs) > 0) { SCliMsg* pMsg = transQueueGet(&conn->cliMsgs);
SCliMsg* pMsg = taosArrayGetP(msgs, 0);
if (REQUEST_NO_RESP(&pMsg->msg)) { if (REQUEST_NO_RESP(&pMsg->msg)) {
taosArrayRemove(msgs, 0); transQueuePop(&conn->cliMsgs);
// taosArrayRemove(msgs, 0);
destroyCmsg(pMsg); destroyCmsg(pMsg);
res = true; res = true;
} }
...@@ -509,8 +508,9 @@ static void cliSendCb(uv_write_t* req, int status) { ...@@ -509,8 +508,9 @@ static void cliSendCb(uv_write_t* req, int status) {
void cliSend(SCliConn* pConn) { void cliSend(SCliConn* pConn) {
CONN_HANDLE_BROKEN(pConn); CONN_HANDLE_BROKEN(pConn);
assert(taosArrayGetSize(pConn->cliMsgs) > 0); // assert(taosArrayGetSize(pConn->cliMsgs) > 0);
SCliMsg* pCliMsg = taosArrayGetP(pConn->cliMsgs, 0); assert(!transQueueEmpty(&pConn->cliMsgs));
SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs);
STransConnCtx* pCtx = pCliMsg->ctx; STransConnCtx* pCtx = pCliMsg->ctx;
SCliThrdObj* pThrd = pConn->hostThrd; SCliThrdObj* pThrd = pConn->hostThrd;
...@@ -600,9 +600,8 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -600,9 +600,8 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
if (T_REF_VAL_GET(conn) == 2) { if (T_REF_VAL_GET(conn) == 2) {
transUnrefCliHandle(conn); transUnrefCliHandle(conn);
taosArrayPush(conn->cliMsgs, &pMsg); if (!transQueuePush(&conn->cliMsgs, pMsg)) {
if (taosArrayGetSize(conn->cliMsgs) >= 2) { return;
return; // send one by one
} }
cliSend(conn); cliSend(conn);
} else { } else {
...@@ -643,17 +642,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -643,17 +642,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->hThrdIdx = pCtx->hThrdIdx; conn->hThrdIdx = pCtx->hThrdIdx;
transCtxMerge(&conn->ctx, &pCtx->appCtx); transCtxMerge(&conn->ctx, &pCtx->appCtx);
if (taosArrayGetSize(conn->cliMsgs) > 0) { if (!transQueuePush(&conn->cliMsgs, pMsg)) {
taosArrayPush(conn->cliMsgs, &pMsg);
return; return;
} }
taosArrayPush(conn->cliMsgs, &pMsg);
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
cliSend(conn); cliSend(conn);
} else { } else {
conn = cliCreateConn(pThrd); conn = cliCreateConn(pThrd);
taosArrayPush(conn->cliMsgs, &pMsg); transQueuePush(&conn->cliMsgs, pMsg);
conn->hThrdIdx = pCtx->hThrdIdx; conn->hThrdIdx = pCtx->hThrdIdx;
conn->ip = strdup(pMsg->ctx->ip); conn->ip = strdup(pMsg->ctx->ip);
......
...@@ -228,7 +228,7 @@ void transCtxInit(STransCtx* ctx) { ...@@ -228,7 +228,7 @@ void transCtxInit(STransCtx* ctx) {
// init transCtx // init transCtx
ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK); ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
} }
void transCtxDestroy(STransCtx* ctx) { void transCtxCleanup(STransCtx* ctx) {
if (ctx->args == NULL) { if (ctx->args == NULL) {
return; return;
} }
...@@ -238,12 +238,14 @@ void transCtxDestroy(STransCtx* ctx) { ...@@ -238,12 +238,14 @@ void transCtxDestroy(STransCtx* ctx) {
iter->free(iter->val); iter->free(iter->val);
iter = taosHashIterate(ctx->args, iter); iter = taosHashIterate(ctx->args, iter);
} }
taosHashCleanup(ctx->args); taosHashCleanup(ctx->args);
} }
void transCtxMerge(STransCtx* dst, STransCtx* src) { void transCtxMerge(STransCtx* dst, STransCtx* src) {
if (dst->args == NULL) { if (dst->args == NULL) {
dst->args = src->args; dst->args = src->args;
dst->brokenVal = src->brokenVal;
src->args = NULL; src->args = NULL;
return; return;
} }
...@@ -275,5 +277,58 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key) { ...@@ -275,5 +277,58 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
memcpy(ret, (char*)cVal->val, cVal->len); memcpy(ret, (char*)cVal->val, cVal->len);
return (void*)ret; return (void*)ret;
} }
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
char* ret = calloc(1, ctx->brokenVal.len);
memcpy(ret, (char*)(ctx->brokenVal.val), ctx->brokenVal.len);
*msgType = ctx->brokenVal.msgType;
return (void*)ret;
}
void transQueueInit(STransQueue* queue, void (*free)(void* arg)) {
queue->q = taosArrayInit(2, sizeof(void*));
queue->free = free;
}
bool transQueuePush(STransQueue* queue, void* arg) {
taosArrayPush(queue->q, &arg);
if (taosArrayGetSize(queue->q) > 1) {
return false;
}
return true;
}
void* transQueuePop(STransQueue* queue) {
if (taosArrayGetSize(queue->q) == 0) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, 0);
taosArrayRemove(queue->q, 0);
return ptr;
}
void* transQueueGet(STransQueue* queue) {
if (taosArrayGetSize(queue->q) == 0) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, 0);
return ptr;
}
bool transQueueEmpty(STransQueue* queue) {
//
return taosArrayGetSize(queue->q) == 0;
}
void transQueueClear(STransQueue* queue) {
if (queue->free != NULL) {
for (int i = 0; i < taosArrayGetSize(queue->q); i++) {
void* p = taosArrayGetP(queue->q, i);
queue->free(p);
}
}
taosArrayClear(queue->q);
}
void transQueueDestroy(STransQueue* queue) {
transQueueClear(queue);
taosArrayDestroy(queue->q);
}
#endif #endif
...@@ -37,7 +37,7 @@ typedef struct SSrvConn { ...@@ -37,7 +37,7 @@ typedef struct SSrvConn {
void* pTransInst; // rpc init void* pTransInst; // rpc init
void* ahandle; // void* ahandle; //
void* hostThrd; void* hostThrd;
SArray* srvMsgs; STransQueue srvMsgs;
SSrvRegArg regArg; SSrvRegArg regArg;
bool broken; // conn broken; bool broken; // conn broken;
...@@ -62,12 +62,12 @@ typedef struct SSrvMsg { ...@@ -62,12 +62,12 @@ typedef struct SSrvMsg {
} SSrvMsg; } SSrvMsg;
typedef struct SWorkThrdObj { typedef struct SWorkThrdObj {
TdThread thread; TdThread thread;
uv_pipe_t* pipe; uv_pipe_t* pipe;
uv_os_fd_t fd; uv_os_fd_t fd;
uv_loop_t* loop; uv_loop_t* loop;
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
queue msg; queue msg;
TdThreadMutex msgMtx; TdThreadMutex msgMtx;
queue conn; queue conn;
...@@ -76,7 +76,7 @@ typedef struct SWorkThrdObj { ...@@ -76,7 +76,7 @@ typedef struct SWorkThrdObj {
} SWorkThrdObj; } SWorkThrdObj;
typedef struct SServerObj { typedef struct SServerObj {
TdThread thread; TdThread thread;
uv_tcp_t server; uv_tcp_t server;
uv_loop_t* loop; uv_loop_t* loop;
...@@ -106,8 +106,7 @@ static const char* notify = "a"; ...@@ -106,8 +106,7 @@ static const char* notify = "a";
srvMsg->msg = tmsg; \ srvMsg->msg = tmsg; \
srvMsg->type = Release; \ srvMsg->type = Release; \
srvMsg->pConn = conn; \ srvMsg->pConn = conn; \
taosArrayPush(conn->srvMsgs, &srvMsg); \ if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
if (taosArrayGetSize(conn->srvMsgs) > 1) { \
return; \ return; \
} \ } \
uvStartSendRespInternal(srvMsg); \ uvStartSendRespInternal(srvMsg); \
...@@ -271,20 +270,16 @@ void uvOnSendCb(uv_write_t* req, int status) { ...@@ -271,20 +270,16 @@ void uvOnSendCb(uv_write_t* req, int status) {
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
if (status == 0) { if (status == 0) {
tTrace("server conn %p data already was written on stream", conn); tTrace("server conn %p data already was written on stream", conn);
if (conn->srvMsgs != NULL) { if (!transQueueEmpty(&conn->srvMsgs)) {
assert(taosArrayGetSize(conn->srvMsgs) >= 1); SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
taosArrayRemove(conn->srvMsgs, 0);
if (msg->type == Release && conn->status != ConnNormal) { if (msg->type == Release && conn->status != ConnNormal) {
conn->status = ConnNormal; conn->status = ConnNormal;
transUnrefSrvHandle(conn); transUnrefSrvHandle(conn);
} }
destroySmsg(msg); destroySmsg(msg);
// send second data, just use for push // send second data, just use for push
if (taosArrayGetSize(conn->srvMsgs) > 0) { if (!transQueueEmpty(&conn->srvMsgs)) {
tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs);
msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
if (msg->type == Register && conn->status == ConnAcquire) { if (msg->type == Register && conn->status == ConnAcquire) {
conn->regArg.notifyCount = 0; conn->regArg.notifyCount = 0;
conn->regArg.init = 1; conn->regArg.init = 1;
...@@ -294,7 +289,7 @@ void uvOnSendCb(uv_write_t* req, int status) { ...@@ -294,7 +289,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
(pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg)); memset(&conn->regArg, 0, sizeof(conn->regArg));
} }
taosArrayRemove(conn->srvMsgs, 0); transQueuePop(&conn->srvMsgs);
free(msg); free(msg);
} else { } else {
uvStartSendRespInternal(msg); uvStartSendRespInternal(msg);
...@@ -373,10 +368,7 @@ static void uvStartSendResp(SSrvMsg* smsg) { ...@@ -373,10 +368,7 @@ static void uvStartSendResp(SSrvMsg* smsg) {
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
} }
taosArrayPush(pConn->srvMsgs, &smsg); if (!transQueuePush(&pConn->srvMsgs, smsg)) {
if (taosArrayGetSize(pConn->srvMsgs) > 1) {
tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
return; return;
} }
uvStartSendRespInternal(smsg); uvStartSendRespInternal(smsg);
...@@ -608,14 +600,15 @@ static SSrvConn* createConn(void* hThrd) { ...@@ -608,14 +600,15 @@ static SSrvConn* createConn(void* hThrd) {
QUEUE_INIT(&pConn->queue); QUEUE_INIT(&pConn->queue);
QUEUE_PUSH(&pThrd->conn, &pConn->queue); QUEUE_PUSH(&pThrd->conn, &pConn->queue);
pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); //
tTrace("server conn %p created", pConn); transQueueInit(&pConn->srvMsgs, NULL);
memset(&pConn->regArg, 0, sizeof(pConn->regArg)); memset(&pConn->regArg, 0, sizeof(pConn->regArg));
pConn->broken = false; pConn->broken = false;
pConn->status = ConnNormal; pConn->status = ConnNormal;
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
tTrace("server conn %p created", pConn);
return pConn; return pConn;
} }
...@@ -625,11 +618,7 @@ static void destroyConn(SSrvConn* conn, bool clear) { ...@@ -625,11 +618,7 @@ static void destroyConn(SSrvConn* conn, bool clear) {
} }
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) { transQueueDestroy(&conn->srvMsgs);
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i);
destroySmsg(msg);
}
conn->srvMsgs = taosArrayDestroy(conn->srvMsgs);
if (clear) { if (clear) {
tTrace("server conn %p to be destroyed", conn); tTrace("server conn %p to be destroyed", conn);
uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
...@@ -724,8 +713,7 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { ...@@ -724,8 +713,7 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
// release handle to rpc init // release handle to rpc init
SSrvConn* conn = msg->pConn; SSrvConn* conn = msg->pConn;
if (conn->status == ConnAcquire) { if (conn->status == ConnAcquire) {
taosArrayPush(conn->srvMsgs, &msg); if (!transQueuePush(&conn->srvMsgs, msg)) {
if (taosArrayGetSize(conn->srvMsgs) > 1) {
return; return;
} }
uvStartSendRespInternal(msg); uvStartSendRespInternal(msg);
...@@ -744,8 +732,7 @@ void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) { ...@@ -744,8 +732,7 @@ void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
SSrvConn* conn = msg->pConn; SSrvConn* conn = msg->pConn;
tDebug("server conn %p register brokenlink callback", conn); tDebug("server conn %p register brokenlink callback", conn);
if (conn->status == ConnAcquire) { if (conn->status == ConnAcquire) {
if (taosArrayGetSize(conn->srvMsgs) > 0) { if (!transQueuePush(&conn->srvMsgs, msg)) {
taosArrayPush(conn->srvMsgs, &msg);
return; return;
} }
conn->regArg.notifyCount = 0; conn->regArg.notifyCount = 0;
......
...@@ -364,9 +364,12 @@ TEST_F(TransEnv, srvReleaseHandle) { ...@@ -364,9 +364,12 @@ TEST_F(TransEnv, srvReleaseHandle) {
SRpcMsg resp = {0}; SRpcMsg resp = {0};
tr->SetSrvContinueSend(processReleaseHandleCb); tr->SetSrvContinueSend(processReleaseHandleCb);
// tr->Restart(processReleaseHandleCb); // tr->Restart(processReleaseHandleCb);
void *handle = NULL; void * handle = NULL;
SRpcMsg req = {0};
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.persistHandle = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
...@@ -378,8 +381,11 @@ TEST_F(TransEnv, srvReleaseHandle) { ...@@ -378,8 +381,11 @@ TEST_F(TransEnv, srvReleaseHandle) {
} }
TEST_F(TransEnv, cliReleaseHandleExcept) { TEST_F(TransEnv, cliReleaseHandleExcept) {
SRpcMsg resp = {0}; SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.persistHandle = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
...@@ -396,8 +402,10 @@ TEST_F(TransEnv, cliReleaseHandleExcept) { ...@@ -396,8 +402,10 @@ TEST_F(TransEnv, cliReleaseHandleExcept) {
} }
TEST_F(TransEnv, srvContinueSend) { TEST_F(TransEnv, srvContinueSend) {
tr->SetSrvContinueSend(processContinueSend); tr->SetSrvContinueSend(processContinueSend);
SRpcMsg req = {0}, resp = {0};
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
SRpcMsg req = {0}, resp = {0}; memset(&req, 0, sizeof(req));
memset(&resp, 0, sizeof(resp));
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
...@@ -410,8 +418,10 @@ TEST_F(TransEnv, srvPersistHandleExcept) { ...@@ -410,8 +418,10 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
tr->SetSrvContinueSend(processContinueSend); tr->SetSrvContinueSend(processContinueSend);
// tr->SetCliPersistFp(cliPersistHandle); // tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
SRpcMsg req = {.handle = resp.handle}; memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
...@@ -428,8 +438,10 @@ TEST_F(TransEnv, srvPersistHandleExcept) { ...@@ -428,8 +438,10 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
TEST_F(TransEnv, cliPersistHandleExcept) { TEST_F(TransEnv, cliPersistHandleExcept) {
tr->SetSrvContinueSend(processContinueSend); tr->SetSrvContinueSend(processContinueSend);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
SRpcMsg req = {.handle = resp.handle}; memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
...@@ -450,8 +462,11 @@ TEST_F(TransEnv, multiCliPersistHandleExcept) { ...@@ -450,8 +462,11 @@ TEST_F(TransEnv, multiCliPersistHandleExcept) {
TEST_F(TransEnv, queryExcept) { TEST_F(TransEnv, queryExcept) {
tr->SetSrvContinueSend(processRegisterFailure); tr->SetSrvContinueSend(processRegisterFailure);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.persistHandle = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
...@@ -466,8 +481,10 @@ TEST_F(TransEnv, queryExcept) { ...@@ -466,8 +481,10 @@ TEST_F(TransEnv, queryExcept) {
} }
TEST_F(TransEnv, noResp) { TEST_F(TransEnv, noResp) {
SRpcMsg resp = {0}; SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
SRpcMsg req = {.noResp = 1}; memset(&req, 0, sizeof(req));
req.noResp = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
......
...@@ -144,7 +144,7 @@ class TransCtxEnv : public ::testing::Test { ...@@ -144,7 +144,7 @@ class TransCtxEnv : public ::testing::Test {
// TODO // TODO
} }
virtual void TearDown() { virtual void TearDown() {
transCtxDestroy(ctx); transCtxCleanup(ctx);
// formate // formate
} }
STransCtx *ctx; STransCtx *ctx;
......
...@@ -68,7 +68,7 @@ gitPullBranchInfo $TDengineBrVer ...@@ -68,7 +68,7 @@ gitPullBranchInfo $TDengineBrVer
compileTDengineVersion compileTDengineVersion
taos_dir=${projectDir}/debug/tools/shell taos_dir=${projectDir}/debug/tools/shell
taosd_dir=${projectDir}/debug/source/dnode/mgmt/daemon taosd_dir=${projectDir}/debug/source/dnode/mgmt/main
exec_process_dir=${projectDir}/debug/tests/test/c exec_process_dir=${projectDir}/debug/tests/test/c
rm -f /usr/bin/taos rm -f /usr/bin/taos
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database d0
sql show databases
if $rows != 1 then
return -1
endi
print $data00 $data01 $data02
sql use d0
print =============== create super table, include column type for count/sum/min/max/first
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double, c4 bigint) tags (t1 int unsigned)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags(1000)
sql create table ct2 using stb tags(2000)
sql create table ct3 using stb tags(3000)
sql show tables
if $rows != 3 then
return -1
endi
print =============== insert data, include NULL
sql insert into ct1 values (now+0s, 10, 2.0, 3.0, 90)(now+1s, NULL, NULL, NULL, NULL)(now+2s, NULL, 2.1, 3.1, 91)(now+3s, 11, NULL, 3.2, 92)(now+4s, 12, 2.2, NULL, 93)(now+5s, 13, 2.3, 3.3, NULL)
sql insert into ct1 values (now+6s, NULL, 2.4, 3.4, 94)
sql insert into ct1 values (now+7s, 14, NULL, 3.5, 95)
sql insert into ct1 values (now+8s, 15, 2.5, NULL, 96)
sql insert into ct1 values (now+9s, 16, 2.6, 3.6, NULL)
sql insert into ct1 values (now+10s, NULL, NULL, NULL, NULL)
sql insert into ct1 values (now+11s, -2147483648, 2.7, 3.7, 97)
#===================================================================
#===================================================================
print =============== query data from child table
sql select * from ct1
print ===> select * from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
if $rows != 12 then
return -1
endi
if $data01 != 10 then
return -1
endi
if $data02 != 2.00000 then
return -1
endi
if $data03 != 3.000000000 then
return -1
endi
#if $data41 != -14 then
# return -1
#endi
#if $data42 != -2.40000 then
# return -1
#endi
#if $data43 != -3.400000000 then
# return -1
#endi
print =============== select count(*) from child table
sql select count(*) from ct1
print ===> select count(*) from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
if $rows != 1 then
return -1
endi
print $data00 $data01 $data02
if $data00 != 4 then
return -1
endi
print =============== select count(column) from child table
sql select count(ts), count(c1), count(c2), count(c3) from ct1
print ===> select count(ts), count(c1), count(c2), count(c3) from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
if $data00 != 4 then
return -1
endi
if $data01 != 4 then
return -1
endi
if $data02 != 4 then
return -1
endi
if $data03 != 4 then
return -1
endi
#print =============== select first(*)/first(column) from child table
#sql select first(*) from ct1
#sql select first(ts), first(c1), first(c2), first(c3) from ct1
print =============== select min(column) from child table
sql select min(c1), min(c2), min(c3) from ct1
print ===> select min(c1), min(c2), min(c3) from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
if $rows != 1 then
return -1
endi
if $data00 != 10 then
return -1
endi
if $data01 != 2.00000 then
return -1
endi
if $data02 != 3.000000000 then
return -1
endi
print =============== select max(column) from child table
sql select max(c1), max(c2), max(c3) from ct1
print ===> select max(c1), max(c2), max(c3) from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
if $rows != 1 then
return -1
endi
if $data00 != 13 then
return -1
endi
if $data01 != 2.30000 then
return -1
endi
if $data02 != 3.300000000 then
return -1
endi
print =============== select sum(column) from child table
sql select sum(c1), sum(c2), sum(c3) from ct1
print ===> select sum(c1), sum(c2), sum(c3) from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
if $rows != 1 then
return -1
endi
if $data00 != 46 then
return -1
endi
if $data01 != 8.599999905 then
return -1
endi
if $data02 != 12.600000000 then
return -1
endi
print =============== select column, from child table
sql select c1, c2, c3 from ct1
print ===> select c1, c2, c3 from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
#if $rows != 4 then
# return -1
#endi
#if $data00 != 10 then
# return -1
#endi
#if $data01 != 2.00000 then
# return -1
#endi
#if $data02 != 3.000000000 then
# return -1
#endi
#if $data10 != 11 then
# return -1
#endi
#if $data11 != 2.10000 then
# return -1
#endi
#if $data12 != 3.100000000 then
# return -1
#endi
#if $data30 != 13 then
# return -1
#endi
#if $data31 != 2.30000 then
# return -1
#endi
#if $data32 != 3.300000000 then
# return -1
#endi
#===================================================================
#===================================================================
return
#print =============== query data from stb
#sql select * from stb
#print ===>
#print ===> rows: $rows
#print ===> rows0: $data00 $data01 $data02 $data03 $data04
#if $rows != 4 then
# return -1
#endi
#print =============== select count(*) from supter table
#sql select count(*) from stb
#if $rows != 1 then
# return -1
#endi
#
#print $data00 $data01 $data02
#if $data00 != 8 then
# return -1
#endi
#
#print =============== select count(column) from supter table
#sql select count(ts), count(c1), count(c2), count(c3) from stb
#print $data00 $data01 $data02 $data03
#if $data00 != 8 then
# return -1
#endi
#if $data01 != 8 then
# return -1
#endi
#if $data02 != 8 then
# return -1
#endi
#if $data03 != 8 then
# return -1
#endi
#===================================================================
#===================================================================
print =============== stop and restart taosd, then again do query above
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
sleep 2000
sql select * from ct1
if $rows != 4 then # after fix bug, modify 4 to 7
return -1
endi
if $data01 != 10 then
return -1
endi
if $data02 != 2.00000 then
return -1
endi
if $data03 != 3.000000000 then
return -1
endi
#if $data41 != -14 then
# return -1
#endi
#if $data42 != -2.40000 then
# return -1
#endi
#if $data43 != -3.400000000 then
# return -1
#endi
print =============== select count(*) from child table
sql select count(*) from ct1
if $rows != 1 then
return -1
endi
print $data00 $data01 $data02
if $data00 != 4 then
return -1
endi
print =============== select count(column) from child table
sql select count(ts), count(c1), count(c2), count(c3) from ct1
print $data00 $data01 $data02 $data03
if $data00 != 4 then
return -1
endi
if $data01 != 4 then
return -1
endi
if $data02 != 4 then
return -1
endi
if $data03 != 4 then
return -1
endi
#print =============== select first(*)/first(column) from child table
#sql select first(*) from ct1
#sql select first(ts), first(c1), first(c2), first(c3) from ct1
print =============== select min(column) from child table
sql select min(c1), min(c2), min(c3) from ct1
print $data00 $data01 $data02 $data03
if $rows != 1 then
return -1
endi
if $data00 != 10 then
return -1
endi
if $data01 != 2.00000 then
return -1
endi
if $data02 != 3.000000000 then
return -1
endi
print =============== select max(column) from child table
sql select max(c1), max(c2), max(c3) from ct1
print $data00 $data01 $data02 $data03
if $rows != 1 then
return -1
endi
if $data00 != 13 then
return -1
endi
if $data01 != 2.30000 then
return -1
endi
if $data02 != 3.300000000 then
return -1
endi
print =============== select sum(column) from child table
sql select sum(c1), sum(c2), sum(c3) from ct1
print $data00 $data01 $data02 $data03
if $rows != 1 then
return -1
endi
if $data00 != 46 then
return -1
endi
if $data01 != 8.599999905 then
return -1
endi
if $data02 != 12.600000000 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -31,60 +31,91 @@ if $rows != 2 then ...@@ -31,60 +31,91 @@ if $rows != 2 then
return -1 return -1
endi endi
print =============== insert data into child table print =============== insert data into child table ct1 (s)
sql insert into ct1 values ( '2022-01-01 01:01:01.000', 1 ) sql insert into ct1 values ( '2022-01-01 01:01:01.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:06.000', 1 ) sql insert into ct1 values ( '2022-01-01 01:01:06.000', 2 )
sql insert into ct1 values ( '2022-01-01 01:01:10.000', 1 ) sql insert into ct1 values ( '2022-01-01 01:01:10.000', 3 )
sql insert into ct1 values ( '2022-01-01 01:01:16.000', 1 ) sql insert into ct1 values ( '2022-01-01 01:01:16.000', 4 )
sql insert into ct1 values ( '2022-01-01 01:01:20.000', 1 ) sql insert into ct1 values ( '2022-01-01 01:01:20.000', 5 )
sql insert into ct1 values ( '2022-01-01 01:01:26.000', 1 ) sql insert into ct1 values ( '2022-01-01 01:01:26.000', 6 )
sql insert into ct1 values ( '2022-01-01 01:01:30.000', 1 ) sql insert into ct1 values ( '2022-01-01 01:01:30.000', 7 )
sql insert into ct1 values ( '2022-01-01 01:01:36.000', 1 ) sql insert into ct1 values ( '2022-01-01 01:01:36.000', 8 )
sql insert into ct1 values ( '2022-01-01 01:01:40.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:46.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:50.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:56.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:00.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:06.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:10.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:16.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:20.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:26.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:30.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:36.000', 1 )
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s)
print ===> $rows $data00 $data01 $data02 $data03 $data04 print ===> rows: $rows
if $rows != 10 then print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
if $rows != 5 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data40 != 1 then
return -1
endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(10s)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(10s)
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
if $rows != 5 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data40 != 1 then
return -1
endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(5s)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(5s)
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
print ===> rows5: $data50 $data51 $data52 $data53 $data54
print ===> rows6: $data60 $data61 $data62 $data63 $data64
print ===> rows7: $data70 $data71 $data72 $data73 $data74
if $rows != 8 then
return -1 return -1
endi endi
if $data00 != 2 then if $data00 != 2 then
return -1 return -1
endi endi
if $data04 != 2 then if $data70 != 1 then
return -1 return -1
endi endi
print =============== insert data into child table ct2 (d)
sql insert into ct2 values ( '2022-01-01 01:00:01.000', 1 ) sql insert into ct2 values ( '2022-01-01 01:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-01 12:00:01.000', 2 ) sql insert into ct2 values ( '2022-01-01 10:00:01.000', 2 )
sql insert into ct2 values ( '2022-01-01 23:00:01.000', 3 ) sql insert into ct2 values ( '2022-01-01 20:00:01.000', 3 )
sql insert into ct2 values ( '2022-01-02 10:00:01.000', 1 ) sql insert into ct2 values ( '2022-01-02 10:00:01.000', 4 )
sql insert into ct2 values ( '2022-01-03 10:00:01.000', 1 ) sql insert into ct2 values ( '2022-01-02 20:00:01.000', 5 )
sql insert into ct2 values ( '2022-01-04 10:00:01.000', 1 ) sql insert into ct2 values ( '2022-01-03 10:00:01.000', 6 )
sql insert into ct2 values ( '2022-01-05 10:00:01.000', 1 ) sql insert into ct2 values ( '2022-01-03 20:00:01.000', 7 )
sql insert into ct2 values ( '2022-01-06 10:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-07 10:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-08 10:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-09 10:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-10 10:00:01.000', 1 )
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2w) print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h)
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04 print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14 print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24 print ===> rows2: $data20 $data21 $data22 $data23 $data24
if $rows != 11 then print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
if $rows != 4 then
return -1 return -1
endi endi
if $data00 != 1 then if $data00 != 1 then
...@@ -94,8 +125,37 @@ if $data10 != 2 then ...@@ -94,8 +125,37 @@ if $data10 != 2 then
return -1 return -1
endi endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) sliding(12h)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) sliding(12h)
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
print ===> rows5: $data50 $data51 $data52 $data53 $data54
print ===> rows6: $data60 $data61 $data62 $data63 $data64
print ===> rows7: $data70 $data71 $data72 $data73 $data74
if $rows != 7 then
return -1
endi
if $data00 != 2 then
return -1
endi
if $data60 != 1 then
return -1
endi
return return
sql select count(*) from car interval(1n, 10d) order by ts desc sql select count(*) from car interval(1n, 10d) order by ts desc
# tdSql.checkData(0, 1, 1) # tdSql.checkData(0, 1, 1)
# tdSql.checkData(1, 1, 2) # tdSql.checkData(1, 1, 2)
......
...@@ -306,8 +306,9 @@ int32_t init_env() { ...@@ -306,8 +306,9 @@ int32_t init_env() {
} }
//const char* sql = "select * from tu1"; //const char* sql = "select * from tu1";
sprintf(sqlStr, "select * from %s%d", g_stConfInfo.stbName, 0); sprintf(sqlStr, "create topic test_stb_topic_1 as select * from %s%d", g_stConfInfo.stbName, 0);
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr)); /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/
pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册