提交 30f562e5 编写于 作者: dengyihao's avatar dengyihao

Merge branch '3.0' of https://github.com/taosdata/TDengine into ddd

......@@ -213,6 +213,7 @@ endif(${BUILD_WITH_TRAFT})
# LIBUV
if(${BUILD_WITH_UV})
add_compile_options(-Wno-sign-compare)
if (${TD_WINDOWS})
file(READ "libuv/include/uv.h" CONTENTS)
string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}")
......
......@@ -1920,13 +1920,14 @@ typedef struct {
} SVCreateTSmaReq;
typedef struct {
int8_t type; // 0 status report, 1 update data
char indexName[TSDB_INDEX_NAME_LEN]; //
STimeWindow windows;
int8_t type; // 0 status report, 1 update data
int64_t indexUid;
int64_t skey; // start TS key of interval/sliding window
} STSmaMsg;
typedef struct {
int64_t ver; // use a general definition
int64_t indexUid;
char indexName[TSDB_INDEX_NAME_LEN];
} SVDropTSmaReq;
......@@ -2323,9 +2324,9 @@ struct SRpcMsg;
struct SEpSet;
struct SMgmtWrapper;
typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* rpcMsg);
typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg);
typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg);
typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp);
#ifdef __cplusplus
}
......
......@@ -190,6 +190,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
......
......@@ -29,6 +29,7 @@ typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SSnode SSnode;
typedef struct {
int32_t reserved;
} SSnodeLoad;
typedef struct {
......
......@@ -460,7 +460,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL;
SQuery* pQueryNode = NULL;
char* pStr = NULL;
char* astStr = NULL;
terrno = TSDB_CODE_SUCCESS;
if (taos == NULL || topicName == NULL || sql == NULL) {
......@@ -488,17 +488,17 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
// todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &pStr, NULL), _return);
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
/*printf("%s\n", pStr);*/
SName name = { .acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T };
SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
strcpy(name.dbname, pRequest->pDb);
strcpy(name.tname, topicName);
SCMCreateTopicReq req = {
.igExists = 1,
.ast = (char*)pStr,
.ast = (char*)astStr,
.sql = (char*)sql,
};
tNameExtractFullName(&name, req.name);
......@@ -512,7 +512,11 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
tSerializeSCMCreateTopicReq(buf, tlen, &req);
/*printf("formatted: %s\n", dagStr);*/
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
pRequest->type = TDMT_MND_CREATE_TOPIC;
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
......
......@@ -2641,12 +2641,14 @@ int32_t tSerializeSVDropTSmaReq(void **buf, SVDropTSmaReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->ver);
tlen += taosEncodeFixedI64(buf, pReq->indexUid);
tlen += taosEncodeString(buf, pReq->indexName);
return tlen;
}
void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) {
buf = taosDecodeFixedI64(buf, &(pReq->ver));
buf = taosDecodeFixedI64(buf, &(pReq->indexUid));
buf = taosDecodeStringTo(buf, pReq->indexName);
return buf;
......
......@@ -148,4 +148,5 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)mmProcessWriteMsg);
}
......@@ -24,6 +24,8 @@
extern "C" {
#endif
typedef enum { VND_WRITE_QUEUE, VND_QUERY_QUEUE, VND_FETCH_QUEUE, VND_APPLY_QUEUE, VND_SYNC_QUEUE } EVndQueueType;
typedef struct SVnodesMgmt {
SHashObj *hash;
SRWLatch latch;
......@@ -102,7 +104,8 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pMsg);
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
......
......@@ -296,7 +296,10 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
vnodeOpt.nthreads = tsNumOfCommitThreads;
vnodeOpt.putToQueryQFp = vmPutMsgToQueryQueue;
vnodeOpt.putToFetchQFp = vmPutMsgToQueryQueue;
vnodeOpt.sendReqFp = dndSendReqToDnode;
vnodeOpt.sendMnodeReqFp = dndSendReqToMnode;
vnodeOpt.sendRspFp = dndSendRsp;
if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode since %s", terrstr());
goto _OVER;
......
......@@ -265,6 +265,7 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
......
......@@ -16,46 +16,109 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
dndSendRsp(pWrapper, &rsp);
}
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg);
switch (msgType) {
case TDMT_DND_CREATE_VNODE:
code = vmProcessCreateVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_ALTER_VNODE:
code = vmProcessAlterVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_DROP_VNODE:
code = vmProcessDropVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_SYNC_VNODE:
code = vmProcessSyncVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_COMPACT_VNODE:
code = vmProcessCompactVnodeReq(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dError("msg:%p, not processed in vnode-mgmt queue", pMsg);
}
if (msgType & 1u) {
if (code != 0 && terrno != 0) code = terrno;
vmSendRsp(pMgmt->pWrapper, pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed in vnode query queue", pMsg);
vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed in vnode fetch queue", pMsg);
vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
if (pArray == NULL) {
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
return;
}
for (int32_t i = 0; i < numOfMsgs; ++i) {
SNodeMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, will be processed in vnode write queue", pMsg);
void *ptr = taosArrayPush(pArray, &pMsg);
assert(ptr != NULL);
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
dTrace("msg:%p, will be processed in vnode-write queue", pMsg);
if (taosArrayPush(pArray, &pMsg) == NULL) {
dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
vmSendRsp(pVnode->pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY);
}
}
vnodeProcessWMsgs(pVnode->pImpl, pArray);
for (size_t i = 0; i < numOfMsgs; i++) {
SRpcMsg *pRsp = NULL;
numOfMsgs = taosArrayGetSize(pArray);
for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
SRpcMsg *pRsp = NULL;
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
if (pRsp != NULL) {
pRsp->ahandle = pRpc->ahandle;
dndSendRsp(pVnode->pWrapper, pRsp);
free(pRsp);
} else {
if (code != 0) code = terrno;
SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
dndSendRsp(pVnode->pWrapper, &rpcRsp);
if (code != 0 && terrno != 0) code = terrno;
vmSendRsp(pVnode->pWrapper, pMsg, code);
}
}
for (size_t i = 0; i < numOfMsgs; i++) {
for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont);
......@@ -89,93 +152,112 @@ static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOf
}
}
static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) {
SRpcMsg *pMsg = &pNodeMsg->rpcMsg;
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueueType qtype) {
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
SMsgHead *pHead = pMsg->pCont;
SMsgHead *pHead = pRpc->pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId);
dError("vgId:%d, failed to write msg:%p to queue since %s", pHead->vgId, pMsg, terrstr());
return -1;
}
return pVnode;
}
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg);
if (pVnode == NULL) return -1;
switch (qtype) {
case VND_QUERY_QUEUE:
dTrace("msg:%p, will be written into vnode-query queue", pMsg);
code = taosWriteQitem(pVnode->pQueryQ, pMsg);
break;
case VND_FETCH_QUEUE:
dTrace("msg:%p, will be written into vnode-fetch queue", pMsg);
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
break;
case VND_WRITE_QUEUE:
dTrace("msg:%p, will be written into vnode-write queue", pMsg);
code = taosWriteQitem(pVnode->pWriteQ, pMsg);
case VND_SYNC_QUEUE:
dTrace("msg:%p, will be written into vnode-sync queue", pMsg);
code = taosWriteQitem(pVnode->pSyncQ, pMsg);
default:
terrno = TSDB_CODE_INVALID_PARA;
break;
}
int32_t code = taosWriteQitem(pVnode->pWriteQ, pMsg);
vmReleaseVnode(pMgmt, pVnode);
return code;
}
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg);
if (pVnode == NULL) return -1;
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_SYNC_QUEUE);
}
int32_t code = taosWriteQitem(pVnode->pSyncQ, pMsg);
vmReleaseVnode(pMgmt, pVnode);
return code;
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_WRITE_QUEUE);
}
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg);
if (pVnode == NULL) return -1;
int32_t code = taosWriteQitem(pVnode->pQueryQ, pMsg);
vmReleaseVnode(pMgmt, pVnode);
return code;
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_QUERY_QUEUE);
}
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg);
if (pVnode == NULL) return -1;
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_FETCH_QUEUE);
}
int32_t code = taosWriteQitem(pVnode->pFetchQ, pMsg);
vmReleaseVnode(pMgmt, pVnode);
return code;
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
}
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQueueType qtype) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
int32_t code = -1;
SMsgHead *pHead = pRpc->pCont;
// pHead->vgId = htonl(pHead->vgId);
int32_t code = -1;
SMsgHead *pHead = pRpc->pCont;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1;
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg != NULL) {
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
pMsg->rpcMsg = *pRpc;
code = taosWriteQitem(pVnode->pQueryQ, pMsg);
switch (qtype) {
case VND_QUERY_QUEUE:
dTrace("msg:%p, will be put into vnode-query queue", pMsg);
code = taosWriteQitem(pVnode->pQueryQ, pMsg);
break;
case VND_FETCH_QUEUE:
dTrace("msg:%p, will be put into vnode-fetch queue", pMsg);
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
break;
case VND_APPLY_QUEUE:
dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
code = taosWriteQitem(pVnode->pApplyQ, pMsg);
break;
case VND_WRITE_QUEUE:
case VND_SYNC_QUEUE:
default:
terrno = TSDB_CODE_INVALID_PARA;
break;
}
}
vmReleaseVnode(pMgmt, pVnode);
return code;
}
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pRpc) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
int32_t code = -1;
SMsgHead *pHead = pRpc->pCont;
// pHead->vgId = htonl(pHead->vgId);
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_QUERY_QUEUE);
}
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1;
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_FETCH_QUEUE);
}
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg != NULL) {
pMsg->rpcMsg = *pRpc;
code = taosWriteQitem(pVnode->pApplyQ, pMsg);
}
vmReleaseVnode(pMgmt, pVnode);
return code;
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_APPLY_QUEUE);
}
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
......@@ -191,6 +273,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
return -1;
}
dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId);
return 0;
}
......@@ -205,43 +288,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pSyncQ = NULL;
pVnode->pFetchQ = NULL;
pVnode->pQueryQ = NULL;
}
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, will be processed in vnode mgmt queue", pMsg);
switch (msgType) {
case TDMT_DND_CREATE_VNODE:
code = vmProcessCreateVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_ALTER_VNODE:
code = vmProcessAlterVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_DROP_VNODE:
code = vmProcessDropVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_SYNC_VNODE:
code = vmProcessSyncVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_COMPACT_VNODE:
code = vmProcessCompactVnodeReq(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dError("msg:%p, not processed in mgmt queue", pMsg);
}
if (msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle};
dndSendRsp(pMgmt->pWrapper, &rsp);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
dDebug("vgId:%d, vnode queue is freed", pVnode->vgId);
}
int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
......@@ -275,7 +322,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
if (tWWorkerInit(pWPool) != 0) return -1;
if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
dError("failed to start vnode-mgmt worker since %s", terrstr());
return -1;
}
......@@ -291,9 +338,3 @@ void vmStopWorker(SVnodesMgmt *pMgmt) {
tWWorkerCleanup(&pMgmt->syncPool);
dDebug("vnode workers is closed");
}
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
}
\ No newline at end of file
......@@ -802,6 +802,32 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p
return 0;
}
static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bool useRpcMalloc) {
SDropDbRsp dropRsp = {0};
if (pDb != NULL) {
memcpy(dropRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
dropRsp.uid = pDb->uid;
}
int32_t rspLen = tSerializeSDropDbRsp(NULL, 0, &dropRsp);
void *pRsp = NULL;
if (useRpcMalloc) {
pRsp = rpcMallocCont(rspLen);
} else {
pRsp = malloc(rspLen);
}
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSDropDbRsp(pRsp, rspLen, &dropRsp);
*pRspLen = rspLen;
*ppRsp = pRsp;
return 0;
}
static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_DB, &pReq->rpcMsg);
......@@ -814,18 +840,9 @@ static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) {
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
SDropDbRsp dropRsp = {0};
memcpy(dropRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
dropRsp.uid = pDb->uid;
int32_t rspLen = tSerializeSDropDbRsp(NULL, 0, &dropRsp);
void *pRsp = malloc(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto DROP_DB_OVER;
}
tSerializeSDropDbRsp(pRsp, rspLen, &dropRsp);
int32_t rspLen = 0;
void *pRsp = NULL;
if (mndBuildDropDbRsp(pDb, &rspLen, &pRsp, false) < 0) goto DROP_DB_OVER;
mndTransSetRpcRsp(pTrans, pRsp, rspLen);
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER;
......@@ -854,7 +871,7 @@ static int32_t mndProcessDropDbReq(SNodeMsg *pReq) {
pDb = mndAcquireDb(pMnode, dropReq.db);
if (pDb == NULL) {
if (dropReq.ignoreNotExists) {
code = 0;
code = mndBuildDropDbRsp(pDb, &pReq->rspLen, &pReq->pRsp, true);
goto DROP_DB_OVER;
} else {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
......
......@@ -77,9 +77,9 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
}
static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
int32_t taskId = pHead->streamTaskId;
SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId);
SStreamExecMsgHead *pHead = pMsg->pCont;
int32_t taskId = pHead->streamTaskId;
SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId);
if (pTask == NULL) {
return -1;
}
......
......@@ -55,6 +55,8 @@ int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
#ifdef __cplusplus
......
......@@ -62,6 +62,7 @@ typedef struct {
typedef struct {
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
PutToQueueFp putToQueryQFp;
PutToQueueFp putToFetchQFp;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
......@@ -125,9 +126,8 @@ void vnodeDestroy(const char *path);
*
* @param pVnode The vnode object.
* @param pMsgs The array of SRpcMsg
* @return int 0 for success, -1 for failure
*/
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
/**
* @brief Apply a write request message.
......
......@@ -46,15 +46,18 @@ typedef struct SVnodeTask {
typedef struct SVnodeMgr {
td_mode_flag_t vnodeInitFlag;
// For commit
bool stop;
uint16_t nthreads;
TdThread* threads;
bool stop;
uint16_t nthreads;
TdThread* threads;
TdThreadMutex mutex;
TdThreadCond hasTask;
TD_DLIST(SVnodeTask) queue;
// For vnode Mgmt
PutToQueueFp putToQueryQFp;
SendReqFp sendReqFp;
PutToQueueFp putToQueryQFp;
PutToQueueFp putToFetchQFp;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SVnodeMgr;
extern SVnodeMgr vnodeMgr;
......@@ -85,7 +88,10 @@ struct SVnode {
int vnodeScheduleTask(SVnodeTask* task);
int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq);
void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq);
int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq);
int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq);
int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq);
void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp);
#define vFatal(...) \
do { \
......
......@@ -433,3 +433,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
return 0;
}
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) {
//
return 0;
}
......@@ -1289,7 +1289,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) {
}
if (tsdbSaveFSStatus(pRepo, pRepo->fs->cstatus) < 0) {
tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
......
......@@ -26,7 +26,10 @@ int vnodeInit(const SVnodeOpt *pOption) {
vnodeMgr.stop = false;
vnodeMgr.putToQueryQFp = pOption->putToQueryQFp;
vnodeMgr.putToFetchQFp = pOption->putToFetchQFp;
vnodeMgr.sendReqFp = pOption->sendReqFp;
vnodeMgr.sendMnodeReqFp = pOption->sendMnodeReqFp;
vnodeMgr.sendRspFp = pOption->sendRspFp;
// Start commit handers
if (pOption->nthreads > 0) {
......@@ -90,15 +93,23 @@ int vnodeScheduleTask(SVnodeTask* pTask) {
}
int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
if (pVnode == NULL || pVnode->pMeta == NULL || vnodeMgr.putToQueryQFp == NULL) {
terrno = TSDB_CODE_VND_APP_ERROR;
return -1;
}
return (*vnodeMgr.putToQueryQFp)(pVnode->pWrapper, pReq);
}
void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {
(*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq);
int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq) {
return (*vnodeMgr.putToFetchQFp)(pVnode->pWrapper, pReq);
}
int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {
return (*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq);
}
int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq) {
return (*vnodeMgr.sendMnodeReqFp)(pVnode->pWrapper, pReq);
}
void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp) {
(*vnodeMgr.sendRspFp)(pVnode->pWrapper, pRsp);
}
/* ------------------------ STATIC METHODS ------------------------ */
......
......@@ -24,9 +24,7 @@ int vnodeQueryOpen(SVnode *pVnode) {
(putReqToQueryQFp)vnodePutToVQueryQ, (sendReqFp)vnodeSendReq);
}
void vnodeQueryClose(SVnode *pVnode) {
qWorkerDestroy((void **)&pVnode->pQuery);
}
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in query queue is processing");
......@@ -68,6 +66,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_EXEC:
return tqProcessTaskExec(pVnode->pTq, pMsg);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
default:
......
......@@ -16,7 +16,7 @@
#include "tq.h"
#include "vnd.h"
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SNodeMsg *pMsg;
SRpcMsg *pRpc;
......@@ -40,7 +40,8 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
// TODO: Integrate RAFT module here
return 0;
// No results are returned because error handling is difficult
// return 0;
}
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
......
......@@ -68,7 +68,7 @@ gitPullBranchInfo $TDengineBrVer
compileTDengineVersion
taos_dir=${projectDir}/debug/tools/shell
taosd_dir=${projectDir}/debug/source/dnode/mgmt/daemon
taosd_dir=${projectDir}/debug/source/dnode/mgmt/main
exec_process_dir=${projectDir}/debug/tests/test/c
rm -f /usr/bin/taos
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database d0
sql show databases
if $rows != 1 then
return -1
endi
print $data00 $data01 $data02
sql use d0
print =============== create super table, include column type for count/sum/min/max/first
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double, c4 bigint) tags (t1 int unsigned)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags(1000)
sql create table ct2 using stb tags(2000)
sql create table ct3 using stb tags(3000)
sql show tables
if $rows != 3 then
return -1
endi
print =============== insert data, include NULL
sql insert into ct1 values (now+0s, 10, 2.0, 3.0, 90)(now+1s, NULL, NULL, NULL, NULL)(now+2s, NULL, 2.1, 3.1, 91)(now+3s, 11, NULL, 3.2, 92)(now+4s, 12, 2.2, NULL, 93)(now+5s, 13, 2.3, 3.3, NULL)
sql insert into ct1 values (now+6s, NULL, 2.4, 3.4, 94)
sql insert into ct1 values (now+7s, 14, NULL, 3.5, 95)
sql insert into ct1 values (now+8s, 15, 2.5, NULL, 96)
sql insert into ct1 values (now+9s, 16, 2.6, 3.6, NULL)
sql insert into ct1 values (now+10s, NULL, NULL, NULL, NULL)
sql insert into ct1 values (now+11s, -2147483648, 2.7, 3.7, 97)
#===================================================================
#===================================================================
print =============== query data from child table
sql select * from ct1
print ===> select * from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
if $rows != 12 then
return -1
endi
if $data01 != 10 then
return -1
endi
if $data02 != 2.00000 then
return -1
endi
if $data03 != 3.000000000 then
return -1
endi
#if $data41 != -14 then
# return -1
#endi
#if $data42 != -2.40000 then
# return -1
#endi
#if $data43 != -3.400000000 then
# return -1
#endi
print =============== select count(*) from child table
sql select count(*) from ct1
print ===> select count(*) from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
if $rows != 1 then
return -1
endi
print $data00 $data01 $data02
if $data00 != 4 then
return -1
endi
print =============== select count(column) from child table
sql select count(ts), count(c1), count(c2), count(c3) from ct1
print ===> select count(ts), count(c1), count(c2), count(c3) from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
if $data00 != 4 then
return -1
endi
if $data01 != 4 then
return -1
endi
if $data02 != 4 then
return -1
endi
if $data03 != 4 then
return -1
endi
#print =============== select first(*)/first(column) from child table
#sql select first(*) from ct1
#sql select first(ts), first(c1), first(c2), first(c3) from ct1
print =============== select min(column) from child table
sql select min(c1), min(c2), min(c3) from ct1
print ===> select min(c1), min(c2), min(c3) from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
if $rows != 1 then
return -1
endi
if $data00 != 10 then
return -1
endi
if $data01 != 2.00000 then
return -1
endi
if $data02 != 3.000000000 then
return -1
endi
print =============== select max(column) from child table
sql select max(c1), max(c2), max(c3) from ct1
print ===> select max(c1), max(c2), max(c3) from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
if $rows != 1 then
return -1
endi
if $data00 != 13 then
return -1
endi
if $data01 != 2.30000 then
return -1
endi
if $data02 != 3.300000000 then
return -1
endi
print =============== select sum(column) from child table
sql select sum(c1), sum(c2), sum(c3) from ct1
print ===> select sum(c1), sum(c2), sum(c3) from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
if $rows != 1 then
return -1
endi
if $data00 != 46 then
return -1
endi
if $data01 != 8.599999905 then
return -1
endi
if $data02 != 12.600000000 then
return -1
endi
print =============== select column, from child table
sql select c1, c2, c3 from ct1
print ===> select c1, c2, c3 from ct1
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
#if $rows != 4 then
# return -1
#endi
#if $data00 != 10 then
# return -1
#endi
#if $data01 != 2.00000 then
# return -1
#endi
#if $data02 != 3.000000000 then
# return -1
#endi
#if $data10 != 11 then
# return -1
#endi
#if $data11 != 2.10000 then
# return -1
#endi
#if $data12 != 3.100000000 then
# return -1
#endi
#if $data30 != 13 then
# return -1
#endi
#if $data31 != 2.30000 then
# return -1
#endi
#if $data32 != 3.300000000 then
# return -1
#endi
#===================================================================
#===================================================================
return
#print =============== query data from stb
#sql select * from stb
#print ===>
#print ===> rows: $rows
#print ===> rows0: $data00 $data01 $data02 $data03 $data04
#if $rows != 4 then
# return -1
#endi
#print =============== select count(*) from supter table
#sql select count(*) from stb
#if $rows != 1 then
# return -1
#endi
#
#print $data00 $data01 $data02
#if $data00 != 8 then
# return -1
#endi
#
#print =============== select count(column) from supter table
#sql select count(ts), count(c1), count(c2), count(c3) from stb
#print $data00 $data01 $data02 $data03
#if $data00 != 8 then
# return -1
#endi
#if $data01 != 8 then
# return -1
#endi
#if $data02 != 8 then
# return -1
#endi
#if $data03 != 8 then
# return -1
#endi
#===================================================================
#===================================================================
print =============== stop and restart taosd, then again do query above
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
sleep 2000
sql select * from ct1
if $rows != 4 then # after fix bug, modify 4 to 7
return -1
endi
if $data01 != 10 then
return -1
endi
if $data02 != 2.00000 then
return -1
endi
if $data03 != 3.000000000 then
return -1
endi
#if $data41 != -14 then
# return -1
#endi
#if $data42 != -2.40000 then
# return -1
#endi
#if $data43 != -3.400000000 then
# return -1
#endi
print =============== select count(*) from child table
sql select count(*) from ct1
if $rows != 1 then
return -1
endi
print $data00 $data01 $data02
if $data00 != 4 then
return -1
endi
print =============== select count(column) from child table
sql select count(ts), count(c1), count(c2), count(c3) from ct1
print $data00 $data01 $data02 $data03
if $data00 != 4 then
return -1
endi
if $data01 != 4 then
return -1
endi
if $data02 != 4 then
return -1
endi
if $data03 != 4 then
return -1
endi
#print =============== select first(*)/first(column) from child table
#sql select first(*) from ct1
#sql select first(ts), first(c1), first(c2), first(c3) from ct1
print =============== select min(column) from child table
sql select min(c1), min(c2), min(c3) from ct1
print $data00 $data01 $data02 $data03
if $rows != 1 then
return -1
endi
if $data00 != 10 then
return -1
endi
if $data01 != 2.00000 then
return -1
endi
if $data02 != 3.000000000 then
return -1
endi
print =============== select max(column) from child table
sql select max(c1), max(c2), max(c3) from ct1
print $data00 $data01 $data02 $data03
if $rows != 1 then
return -1
endi
if $data00 != 13 then
return -1
endi
if $data01 != 2.30000 then
return -1
endi
if $data02 != 3.300000000 then
return -1
endi
print =============== select sum(column) from child table
sql select sum(c1), sum(c2), sum(c3) from ct1
print $data00 $data01 $data02 $data03
if $rows != 1 then
return -1
endi
if $data00 != 46 then
return -1
endi
if $data01 != 8.599999905 then
return -1
endi
if $data02 != 12.600000000 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -31,60 +31,91 @@ if $rows != 2 then
return -1
endi
print =============== insert data into child table
print =============== insert data into child table ct1 (s)
sql insert into ct1 values ( '2022-01-01 01:01:01.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:06.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:10.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:16.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:20.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:26.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:30.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:36.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:40.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:46.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:50.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:56.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:00.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:06.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:10.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:16.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:20.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:26.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:30.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:02:36.000', 1 )
sql insert into ct1 values ( '2022-01-01 01:01:06.000', 2 )
sql insert into ct1 values ( '2022-01-01 01:01:10.000', 3 )
sql insert into ct1 values ( '2022-01-01 01:01:16.000', 4 )
sql insert into ct1 values ( '2022-01-01 01:01:20.000', 5 )
sql insert into ct1 values ( '2022-01-01 01:01:26.000', 6 )
sql insert into ct1 values ( '2022-01-01 01:01:30.000', 7 )
sql insert into ct1 values ( '2022-01-01 01:01:36.000', 8 )
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s)
print ===> $rows $data00 $data01 $data02 $data03 $data04
if $rows != 10 then
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
if $rows != 5 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data40 != 1 then
return -1
endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(10s)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(10s)
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
if $rows != 5 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data40 != 1 then
return -1
endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(5s)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(5s)
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
print ===> rows5: $data50 $data51 $data52 $data53 $data54
print ===> rows6: $data60 $data61 $data62 $data63 $data64
print ===> rows7: $data70 $data71 $data72 $data73 $data74
if $rows != 8 then
return -1
endi
if $data00 != 2 then
return -1
endi
if $data04 != 2 then
if $data70 != 1 then
return -1
endi
print =============== insert data into child table ct2 (d)
sql insert into ct2 values ( '2022-01-01 01:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-01 12:00:01.000', 2 )
sql insert into ct2 values ( '2022-01-01 23:00:01.000', 3 )
sql insert into ct2 values ( '2022-01-02 10:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-03 10:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-04 10:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-05 10:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-06 10:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-07 10:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-08 10:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-09 10:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-10 10:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-01 10:00:01.000', 2 )
sql insert into ct2 values ( '2022-01-01 20:00:01.000', 3 )
sql insert into ct2 values ( '2022-01-02 10:00:01.000', 4 )
sql insert into ct2 values ( '2022-01-02 20:00:01.000', 5 )
sql insert into ct2 values ( '2022-01-03 10:00:01.000', 6 )
sql insert into ct2 values ( '2022-01-03 20:00:01.000', 7 )
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2w)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h)
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
if $rows != 11 then
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
if $rows != 4 then
return -1
endi
if $data00 != 1 then
......@@ -94,8 +125,37 @@ if $data10 != 2 then
return -1
endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) sliding(12h)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) sliding(12h)
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
print ===> rows5: $data50 $data51 $data52 $data53 $data54
print ===> rows6: $data60 $data61 $data62 $data63 $data64
print ===> rows7: $data70 $data71 $data72 $data73 $data74
if $rows != 7 then
return -1
endi
if $data00 != 2 then
return -1
endi
if $data60 != 1 then
return -1
endi
return
sql select count(*) from car interval(1n, 10d) order by ts desc
# tdSql.checkData(0, 1, 1)
# tdSql.checkData(1, 1, 2)
......
......@@ -306,8 +306,9 @@ int32_t init_env() {
}
//const char* sql = "select * from tu1";
sprintf(sqlStr, "select * from %s%d", g_stConfInfo.stbName, 0);
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));
sprintf(sqlStr, "create topic test_stb_topic_1 as select * from %s%d", g_stConfInfo.stbName, 0);
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/
pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册