diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 5a141867e7b56693eece323a8e039e7afe94b1dd..6d341c61c76c85df04bf9cc97bd6666c18c83fe8 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -88,9 +88,9 @@ int32_t create_stream() { /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ - pRes = taos_query(pConn, - "create stream stream1 trigger at_once into abc2.outstb as select _wstartts, sum(k) from st1 " - "partition by tbname interval(10m) "); + pRes = taos_query( + pConn, + "create stream stream1 trigger at_once into abc1.outstb as select _wstartts, sum(k) from st1 interval(10m) "); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; @@ -107,11 +107,4 @@ int main(int argc, char* argv[]) { code = init_env(); } create_stream(); -#if 0 - tmq_t* tmq = build_consumer(); - tmq_list_t* topic_list = build_topic_list(); - /*perf_loop(tmq, topic_list);*/ - /*basic_consume_loop(tmq, topic_list);*/ - sync_consume_loop(tmq, topic_list); -#endif } diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index e99377f9b4b27871506d1739520060b8caa51417..b56f7552666dd23650f9911dc23e03b9ae2fa707 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -34,7 +34,6 @@ typedef enum { WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, - MERGE_QUEUE, QUEUE_MAX, } EQueueType; diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 611bff49f116c3a10363c607474ebb90eb275186..3d0ef2e052ef20978156b76b1dd7d552391b4a8b 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -16,8 +16,8 @@ #ifndef _TD_SNODE_H_ #define _TD_SNODE_H_ -#include "tmsgcb.h" #include "tmsg.h" +#include "tmsgcb.h" #include "trpc.h" #ifdef __cplusplus @@ -68,8 +68,8 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); * @param pMsg The request message * @param pRsp The response message */ -void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg); -void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg); +int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg); +int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 8ed95b60101472b347b10b285fcacaa3123cce57..c4f71e57a8174c62cf331e4afec35604786282a0 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -36,7 +36,6 @@ typedef struct SPlanContext { int64_t watermark; char* pMsg; int32_t msgLen; - // double filesFactor; } SPlanContext; // Create the physical plan for the query, according to the AST. diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2c9d66a828e658cd286d2e5bf4edcc28d7ea37a9..2b3a1f265089bf3f10caee10458d035ac2f90169 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -152,7 +152,7 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput); typedef struct { char* qmsg; // followings are not applicable to encoder and decoder - void* inputHandle; + // void* inputHandle; void* executor; } STaskExec; @@ -240,12 +240,13 @@ struct SStreamTask { int8_t inputType; int8_t status; - int8_t sourceType; int8_t execType; int8_t sinkType; int8_t dispatchType; int16_t dispatchMsgType; + int8_t dataScan; + // node info int32_t childId; int32_t nodeId; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 66ab627e3245678fe1b04ab9c8d2b8876022361a..52a69f95b47da1238e7c4b05c39ed53e867cf4ad 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -95,9 +95,12 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 34a205232e69f057d879188fe4ed98df5b378e3a..8d93ddd66c7fb8aa27b550ebaa497739ae662545 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -55,7 +55,9 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num taosGetQitem(qall, (void **)&pMsg); dTrace("msg:%p, get from snode-unique queue", pMsg); - sndProcessUMsg(pMgmt->pSnode, pMsg); + if (sndProcessUMsg(pMgmt->pSnode, pMsg) < 0) { + ASSERT(0); + } dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->pCont); @@ -67,7 +69,9 @@ static void smProcessSharedQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SSnodeMgmt *pMgmt = pInfo->ahandle; dTrace("msg:%p, get from snode-shared queue", pMsg); - sndProcessSMsg(pMgmt->pSnode, pMsg); + if (sndProcessSMsg(pMgmt->pSnode, pMsg) < 0) { + ASSERT(0); + } dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->pCont); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 09fdd4b087f739dc29e86484a45f48f1af13d412..95dd5732c6a7ad1de2c0c8d3f753445163a6c3e1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -170,10 +170,6 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp dTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg); taosWriteQitem(pVnode->pSyncQ, pMsg); break; - case MERGE_QUEUE: - dTrace("vgId:%d, msg:%p put into vnode-merge queue", pVnode->vgId, pMsg); - taosWriteQitem(pVnode->pMergeQ, pMsg); - break; case APPLY_QUEUE: dTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg); taosWriteQitem(pVnode->pApplyQ, pMsg); @@ -196,8 +192,6 @@ int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsg int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); } -int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, MERGE_QUEUE); } - int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dTrace("msg:%p, put into vnode-mgmt queue", pMsg); taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg); @@ -243,9 +237,6 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { case FETCH_QUEUE: size = taosQueueItemSize(pVnode->pFetchQ); break; - case MERGE_QUEUE: - size = taosQueueItemSize(pVnode->pMergeQ); - break; default: break; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 39bb6798aa869277bd6e510829d3049e0bd02b87..3ff0c39bc31a19f47aace0bac90c5aac25707342 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -63,9 +63,8 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64 .topicQuery = false, .streamQuery = true, .rSmaQuery = true, - .triggerType = STREAM_TRIGGER_AT_ONCE, + .triggerType = triggerType, .watermark = watermark, - /*.filesFactor = filesFactor,*/ }; if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { @@ -270,7 +269,6 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); // source - pTask->sourceType = TASK_SOURCE__MERGE; pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; // exec @@ -316,7 +314,6 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* #endif pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg); // source - pTask->sourceType = TASK_SOURCE__MERGE; pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; // exec @@ -427,6 +424,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SStreamTask* pTask = tNewSStreamTask(pStream->uid); mndAddTaskToTaskSet(taskSourceLevel, pTask); + pTask->dataScan = 1; + // input pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; @@ -470,6 +469,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SStreamTask* pTask = tNewSStreamTask(pStream->uid); mndAddTaskToTaskSet(taskOneLevel, pTask); + pTask->dataScan = 1; + // input pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 2802537dcd31323a703cd6a4d2222bb8fd9d3d68..8916e2a31cd79c3486c00841979e14fcaf25b2df 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -56,7 +56,6 @@ SStreamTask* sndMetaGetTask(SStreamMeta* pMeta, int32_t taskId); int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId); - int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId); int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index cbbe071c5fa386e178273303ae7d2b35bf1b7ad2..8ef48ccbf9324888cad8b227fb7fa434843078aa 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -76,45 +76,158 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) { return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t)); } -static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) { - /*SStreamExecMsgHead *pHead = pMsg->pCont;*/ - /*int32_t taskId = pHead->streamTaskId;*/ - /*SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId);*/ - /*if (pTask == NULL) {*/ - /*return -1;*/ - /*}*/ +static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) { + SStreamMeta *pMeta = pNode->pMeta; + char *msg = pMsg->pCont; + int32_t msgLen = pMsg->contLen; + + SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + return -1; + } + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t *)msg, msgLen); + if (tDecodeSStreamTask(&decoder, pTask) < 0) { + ASSERT(0); + } + tDecoderClear(&decoder); + + pTask->status = TASK_STATUS__IDLE; + + pTask->inputQueue = streamQueueOpen(); + pTask->outputQueue = streamQueueOpen(); + pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + pTask->outputStatus = TASK_INPUT_STATUS__NORMAL; + + if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL; + + pTask->pMsgCb = &pNode->msgCb; + + ASSERT(pTask->execType != TASK_EXEC__NONE); + + SReadHandle handle = { + .pMsgCb = &pNode->msgCb, + }; + + /*pTask->exec.inputHandle = NULL;*/ + pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); + ASSERT(pTask->exec.executor); + + streamSetupTrigger(pTask); + + qInfo("deploy stream: stream id %ld task id %d child id %d on snode", pTask->streamId, pTask->taskId, pTask->childId); + + return 0; + +FAIL: + if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); + if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); + return -1; +} + +static int32_t sndProcessTaskRunReq(SSnode *pNode, SRpcMsg *pMsg) { + SStreamMeta *pMeta = pNode->pMeta; + SStreamTaskRunReq *pReq = pMsg->pCont; + int32_t taskId = pReq->taskId; + SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); + streamTaskProcessRunReq(pTask, &pNode->msgCb); return 0; } -void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { +static int32_t sndProcessTaskDispatchReq(SSnode *pNode, SRpcMsg *pMsg) { + SStreamMeta *pMeta = pNode->pMeta; + + char *msgStr = pMsg->pCont; + char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + SStreamDispatchReq req; + SDecoder decoder; + tDecoderInit(&decoder, msgBody, msgLen); + tDecodeStreamDispatchReq(&decoder, &req); + int32_t taskId = req.taskId; + SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); + SRpcMsg rsp = { + .info = pMsg->info, + .code = 0, + }; + streamProcessDispatchReq(pTask, &pNode->msgCb, &req, &rsp); + return 0; +} + +static int32_t sndProcessTaskRecoverReq(SSnode *pNode, SRpcMsg *pMsg) { + SStreamMeta *pMeta = pNode->pMeta; + + SStreamTaskRecoverReq *pReq = pMsg->pCont; + int32_t taskId = pReq->taskId; + SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); + streamProcessRecoverReq(pTask, &pNode->msgCb, pReq, pMsg); + return 0; +} + +static int32_t sndProcessTaskDispatchRsp(SSnode *pNode, SRpcMsg *pMsg) { + SStreamMeta *pMeta = pNode->pMeta; + + SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t taskId = pRsp->taskId; + SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); + streamProcessDispatchRsp(pTask, &pNode->msgCb, pRsp); + return 0; +} + +static int32_t sndProcessTaskRecoverRsp(SSnode *pNode, SRpcMsg *pMsg) { + SStreamMeta *pMeta = pNode->pMeta; + + SStreamTaskRecoverRsp *pRsp = pMsg->pCont; + int32_t taskId = pRsp->taskId; + SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); + streamProcessRecoverRsp(pTask, pRsp); + return 0; +} + +static int32_t sndProcessTaskDropReq(SSnode *pNode, SRpcMsg *pMsg) { + SStreamMeta *pMeta = pNode->pMeta; + + char *msg = pMsg->pCont; + int32_t msgLen = pMsg->contLen; + SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; + int32_t code = taosHashRemove(pMeta->pHash, &pReq->taskId, sizeof(int32_t)); + ASSERT(code == 0); + if (code == 0) { + // sendrsp + } + return code; +} + +int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { // stream deploy // stream stop/resume // operator exec - if (pMsg->msgType == TDMT_STREAM_TASK_DEPLOY) { - void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - SStreamTask *pTask = taosMemoryMalloc(sizeof(SStreamTask)); - if (pTask == NULL) { + switch (pMsg->msgType) { + case TDMT_STREAM_TASK_DEPLOY: + return sndProcessTaskDeployReq(pSnode, pMsg); + case TDMT_VND_STREAM_TASK_DROP: + return sndProcessTaskDropReq(pSnode, pMsg); + default: ASSERT(0); - return; - } - SDecoder decoder; - tDecoderInit(&decoder, msg, pMsg->contLen - sizeof(SMsgHead)); - tDecodeSStreamTask(&decoder, pTask); - tDecoderClear(&decoder); - - sndMetaDeployTask(pSnode->pMeta, pTask); - /*} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/ - /*sndProcessTaskExecReq(pSnode, pMsg);*/ - } else { - ASSERT(0); } + return 0; } -void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { - // operator exec - /*if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/ - /*sndProcessTaskExecReq(pSnode, pMsg);*/ - /*} else {*/ - ASSERT(0); - /*}*/ +int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { + switch (pMsg->msgType) { + case TDMT_STREAM_TASK_RUN: + return sndProcessTaskRunReq(pSnode, pMsg); + case TDMT_STREAM_TASK_DISPATCH: + return sndProcessTaskDispatchReq(pSnode, pMsg); + case TDMT_STREAM_TASK_RECOVER: + return sndProcessTaskRecoverReq(pSnode, pMsg); + case TDMT_STREAM_TASK_DISPATCH_RSP: + return sndProcessTaskDispatchRsp(pSnode, pMsg); + case TDMT_STREAM_TASK_RECOVER_RSP: + return sndProcessTaskRecoverRsp(pSnode, pMsg); + default: + ASSERT(0); + } + return 0; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e83b992f06c39250b5e5b48a37b60843c4504823..70d89102e67dd933a6228402f647a212d953f63a 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -116,7 +116,7 @@ typedef void *tsdbReaderT; #define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_RR_ORDER 3 -tsdbReaderT *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, +tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId, void *pMemRef); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d399b5e3bf0001128dcdda812a875dca163d919b..70b6e24b07c4a990921517684d4d65ce5f677a34 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -121,7 +121,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSu int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); -tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, +tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, uint64_t taskId); tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, void* pMemRef); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 70b09ec701af09888f37434367b72bcfe3c46938..06a119b076af7062ef6da9b7d02073d503818102 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -125,10 +125,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { if (offset.type == TMQ_OFFSET__SNAPSHOT) { tqDebug("receive offset commit msg to %s on vg %d, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey, - pTq->pVnode->config.vgId, offset.uid, offset.ts); + TD_VID(pTq->pVnode), offset.uid, offset.ts); } else if (offset.type == TMQ_OFFSET__LOG) { tqDebug("receive offset commit msg to %s on vg %d, offset(type:log) version: %ld", offset.subKey, - pTq->pVnode->config.vgId, offset.version); + TD_VID(pTq->pVnode), offset.version); } else { ASSERT(0); } @@ -159,7 +159,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (pOffset != NULL) { ASSERT(pOffset->type == TMQ_OFFSET__LOG); tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey, - pTq->pVnode->config.vgId, pOffset->version); + TD_VID(pTq->pVnode), pOffset->version); fetchOffset = pOffset->version + 1; } else { if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { @@ -167,13 +167,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { fetchOffset = walGetCommittedVer(pTq->pWal); } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) { - tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId, - pTq->pVnode->config.vgId, pReq->subKey); + tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode), + pReq->subKey); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; return -1; } tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey, - pTq->pVnode->config.vgId, pReq->currentOffset, fetchOffset); + TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); } } @@ -183,14 +183,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); /*ASSERT(pHandle);*/ if (pHandle == NULL) { - tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, pTq->pVnode->config.vgId, + tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode), pReq->subKey); return -1; } if (pHandle->consumerId != consumerId) { tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld", - consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId); + consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId); return -1; } @@ -304,7 +304,6 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { return 0; } -// TODO: persist meta into tdb int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); @@ -346,10 +345,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.execTb.suid = req.suid; SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); tsdbGetCtbIdList(pTq->pVnode->pMeta, req.suid, tbUidList); - tqDebug("vg %d, tq try get suid: %ld", pTq->pVnode->config.vgId, req.suid); + tqDebug("vg %d, tq try get suid: %ld", TD_VID(pTq->pVnode), req.suid); for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); - tqDebug("vg %d, idx %d, uid: %ld", pTq->pVnode->config.vgId, i, tbUid); + tqDebug("vg %d, idx %d, uid: %ld", TD_VID(pTq->pVnode), i, tbUid); } for (int32_t i = 0; i < 5; i++) { tqReadHandleSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList); @@ -400,16 +399,21 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { // exec if (pTask->execType != TASK_EXEC__NONE) { // expand runners - STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); - SReadHandle handle = { - .reader = pStreamReader, - .meta = pTq->pVnode->pMeta, - .pMsgCb = &pTq->pVnode->msgCb, - .vnode = pTq->pVnode, - }; - pTask->exec.inputHandle = pStreamReader; - pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); - ASSERT(pTask->exec.executor); + if (pTask->dataScan) { + STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + SReadHandle handle = { + .reader = pStreamReader, + .meta = pTq->pVnode->pMeta, + .pMsgCb = &pTq->pVnode->msgCb, + .vnode = pTq->pVnode, + }; + /*pTask->exec.inputHandle = pStreamReader;*/ + pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); + ASSERT(pTask->exec.executor); + } else { + pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL); + ASSERT(pTask->exec.executor); + } } // sink @@ -431,7 +435,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { streamSetupTrigger(pTask); - tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, pTq->pVnode->config.vgId); + tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, TD_VID(pTq->pVnode)); taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); @@ -464,7 +468,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { continue; } - if (streamLaunchByWrite(pTask, pTq->pVnode->config.vgId, &pTq->pVnode->msgCb) < 0) { + if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode), &pTq->pVnode->msgCb) < 0) { continue; } } else { @@ -534,9 +538,9 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); + ASSERT(code == 0); if (code == 0) { // sendrsp } - ASSERT(code == 0); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e7a40eeeb94f8b81984f0878d52c247be9c9c322..ce73246e51adcb3906b16a56084d44cf2fb0bed4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -500,7 +500,7 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle return TSDB_CODE_SUCCESS; } -tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, +tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, uint64_t taskId) { STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); if (pTsdbReadHandle == NULL) { @@ -508,7 +508,7 @@ tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableLi } if (emptyQueryTimewindow(pTsdbReadHandle)) { - return (tsdbReaderT*)pTsdbReadHandle; + return (tsdbReaderT)pTsdbReadHandle; } // todo apply the lastkey of table check to avoid to load header file diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 00643be0e191c787e3d30f2c37784942d6cbc14f..ae0f669d656a458a310339e461ad2e55918ee211 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -412,6 +412,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } + EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTblScanNode->scan.pScanCols->length); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize); @@ -426,27 +427,57 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_NEW(level + 1, "I/O: "); int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo); - for (int32_t i = 0; i < nodeNum; ++i) { + struct STableScanAnalyzeInfo info = {0}; + + int32_t maxIndex = 0; + int32_t totalRows = 0; + for(int32_t i = 0; i < nodeNum; ++i) { SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, i); STableScanAnalyzeInfo *pScanInfo = (STableScanAnalyzeInfo *)execInfo->verboseInfo; - EXPLAIN_ROW_APPEND("total_blocks=%d", pScanInfo->totalBlocks); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + info.totalBlocks += pScanInfo->totalBlocks; + info.loadBlocks += pScanInfo->loadBlocks; + info.totalRows += pScanInfo->totalRows; + info.skipBlocks += pScanInfo->skipBlocks; + info.filterTime += pScanInfo->filterTime; + info.loadBlockStatis += pScanInfo->loadBlockStatis; + info.totalCheckedRows += pScanInfo->totalCheckedRows; + info.filterOutBlocks += pScanInfo->filterOutBlocks; + + if (pScanInfo->totalRows > totalRows) { + totalRows = pScanInfo->totalRows; + maxIndex = i; + } + } - EXPLAIN_ROW_APPEND("load_blocks=%d", pScanInfo->loadBlocks); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND("total_blocks=%.1f", ((double)info.totalBlocks) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND("load_block_SMAs=%d", pScanInfo->loadBlockStatis); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND("load_blocks=%.1f", ((double)info.loadBlocks) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND("total_rows=%" PRIu64, pScanInfo->totalRows); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND("load_block_SMAs=%.1f", ((double)info.loadBlockStatis) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND("check_rows=%" PRIu64, pScanInfo->totalCheckedRows); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - } + EXPLAIN_ROW_APPEND("total_rows=%.1f", ((double)info.totalRows) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND("check_rows=%.1f", ((double)info.totalCheckedRows) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_END(); + + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + //Rows out: Avg 4166.7 rows x 24 workers. Max 4187 rows (seg7) with 0.220 ms to first row, 1.738 ms to end, start offset by 1.470 ms. + SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, maxIndex); + STableScanAnalyzeInfo *p1 = (STableScanAnalyzeInfo *)execInfo->verboseInfo; + + EXPLAIN_ROW_NEW(level + 1, " "); + EXPLAIN_ROW_APPEND("max_row_task=%d, total_rows:%" PRId64 ", ep:%s (cost=%.3f..%.3f)", maxIndex, p1->totalRows, "tbd", + execInfo->startupCost, execInfo->totalCost); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 756eb5f3758514d201b44c3d914bf1b46f5b871e..c33b6622e3a5b4e341517478a6a0195e81800434 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -369,6 +369,8 @@ typedef struct SSysTableScanInfo { typedef struct SBlockDistInfo { SSDataBlock* pResBlock; void* pHandle; + SReadHandle readHandle; + uint64_t uid; // table uid } SBlockDistInfo; // todo remove this @@ -740,7 +742,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b1d076e8f57e1970fc42e0834032d374e8ed149e..c99b3c1058d8f1e87312261ccdb1f39c3c526828 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -99,7 +99,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO } qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { - if (msg == NULL || streamReadHandle == NULL) { + if (msg == NULL) { return NULL; } diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index ff281dacbdbae5d1cdf600fc6fb44ad2a4d2bdec..a9e1e03178b0a28780ecfdd696d44e244d2648a1 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -13,10 +13,10 @@ * along with this program. If not, see . */ -#include "os.h" -#include "tref.h" #include "dataSinkMgt.h" +#include "os.h" #include "tmsg.h" +#include "tref.h" #include "tudf.h" #include "executor.h" @@ -24,15 +24,13 @@ #include "query.h" static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT; -int32_t exchangeObjRefPool = -1; +int32_t exchangeObjRefPool = -1; -static void initRefPool() { - exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); -} +static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); } int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) { - assert(readHandle != NULL && pSubplan != NULL); + assert(pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; taosThreadOnce(&initPoolOnce, initRefPool); @@ -47,57 +45,57 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, if (code != TSDB_CODE_SUCCESS) { goto _error; } - + if (handle) { void* pSinkParam = NULL; code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } - + code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam); } - _error: +_error: // if failed to add ref for all tables in this query, abort current query return code; } #ifdef TEST_IMPL // wait moment -int waitMoment(SQInfo* pQInfo){ - if(pQInfo->sql) { - int ms = 0; +int waitMoment(SQInfo* pQInfo) { + if (pQInfo->sql) { + int ms = 0; char* pcnt = strstr(pQInfo->sql, " count(*)"); - if(pcnt) return 0; - + if (pcnt) return 0; + char* pos = strstr(pQInfo->sql, " t_"); - if(pos){ + if (pos) { pos += 3; ms = atoi(pos); - while(*pos >= '0' && *pos <= '9'){ - pos ++; + while (*pos >= '0' && *pos <= '9') { + pos++; } char unit_char = *pos; - if(unit_char == 'h'){ - ms *= 3600*1000; - } else if(unit_char == 'm'){ - ms *= 60*1000; - } else if(unit_char == 's'){ + if (unit_char == 'h') { + ms *= 3600 * 1000; + } else if (unit_char == 'm') { + ms *= 60 * 1000; + } else if (unit_char == 's') { ms *= 1000; } } - if(ms == 0) return 0; + if (ms == 0) return 0; printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql); - - if(ms < 1000) { + + if (ms < 1000) { taosMsleep(ms); } else { int used_ms = 0; - while(used_ms < ms) { + while (used_ms < ms) { taosMsleep(1000); used_ms += 1000; - if(isTaskKilled(pQInfo)){ + if (isTaskKilled(pQInfo)) { printf("test check query is canceled, sleep break.%s\n", pQInfo->sql); break; } @@ -108,15 +106,14 @@ int waitMoment(SQInfo* pQInfo){ } #endif -int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { +int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); *pRes = NULL; int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { - qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, - (void*)curOwner); + qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; return pTaskInfo->code; } @@ -152,18 +149,18 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { cleanUpUdfs(); - int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0; + int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0; uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows; qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", - GET_TASKID(pTaskInfo), current, total, 0, el/1000.0); + GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0); atomic_store_64(&pTaskInfo->owner, 0); return pTaskInfo->code; } int32_t qKillTask(qTaskInfo_t qinfo) { - SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; if (pTaskInfo == NULL) { return TSDB_CODE_QRY_INVALID_QHANDLE; @@ -182,7 +179,7 @@ int32_t qKillTask(qTaskInfo_t qinfo) { } int32_t qAsyncKillTask(qTaskInfo_t qinfo) { - SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; if (pTaskInfo == NULL) { return TSDB_CODE_QRY_INVALID_QHANDLE; @@ -195,7 +192,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) { } int32_t qIsTaskCompleted(qTaskInfo_t qinfo) { - SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; if (pTaskInfo == NULL) { return TSDB_CODE_QRY_INVALID_QHANDLE; @@ -205,18 +202,18 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) { } void qDestroyTask(qTaskInfo_t qTaskHandle) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle; - qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows); + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle; + qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows); - queryCostStatis(pTaskInfo); // print the query cost summary + queryCostStatis(pTaskInfo); // print the query cost summary doDestroyTask(pTaskInfo); } -int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) { - SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo; - int32_t capacity = 0; +int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + int32_t capacity = 0; - return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum); + return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum); } int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 58918667f309c1730224c54b43b963d74be7f66b..5c038ed7098d79f2de2e5d36b19724aa916f096c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -141,8 +141,8 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx, int32_t numOfExprs); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); -static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId); - +static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, + uint64_t groupId); // setup the output buffer for each operator static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) { @@ -1230,8 +1230,9 @@ void initResultRow(SResultRow* pResultRow) { * offset[0] offset[1] offset[2] */ // TODO refactor: some function move away -void setFunctionResultOutput(SOperatorInfo *pOperator, SOptrBasicInfo *pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; +void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, + int32_t numOfExprs) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset; @@ -1380,9 +1381,10 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR } } -void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId) { +void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, + uint64_t groupId) { // for simple group by query without interval, all the tables belong to one group result. - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset; @@ -2040,8 +2042,8 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf } int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, - int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, - SArray* pColList) { + int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, + SArray* pColList) { if (pColList == NULL) { // data from other sources blockCompressDecode(pRes, numOfOutput, numOfRows, pData); pRes->info.rows = numOfRows; @@ -2165,8 +2167,9 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx } SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; - code = extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, - pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); + code = + extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, + pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); if (code != 0) { taosMemoryFreeClear(pDataInfo->pRsp); goto _error; @@ -2281,7 +2284,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; int32_t code = extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, - pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); + pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 @@ -2673,8 +2676,8 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) { } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, - numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)"); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, + pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)"); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); @@ -2812,7 +2815,8 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan // todo add more information about exchange operation int32_t type = pOperator->operatorType; if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN) { + type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || + type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN) { *order = TSDB_ORDER_ASC; *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; @@ -2840,7 +2844,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SAggOperatorInfo* pAggInfo = pOperator->info; SExprSupp* pSup = &pOperator->exprSupp; - SOptrBasicInfo* pInfo = &pAggInfo->binfo; SOperatorInfo* downstream = pOperator->pDownstream[0]; int64_t st = taosGetTimestampUs(); @@ -3130,7 +3133,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { SProjectOperatorInfo* pProjectInfo = pOperator->info; SOptrBasicInfo* pInfo = &pProjectInfo->binfo; - SExprSupp* pSup = &pOperator->exprSupp; + SExprSupp* pSup = &pOperator->exprSupp; SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); @@ -3403,8 +3406,8 @@ void cleanupAggSup(SAggSupporter* pAggSup) { destroyDiskbasedBuf(pAggSup->pResultBuf); } -int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, - size_t keyBufSize, const char* pkey) { +int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, + const char* pkey) { initExprSupp(pSup, pExprInfo, numOfCols); doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey); for (int32_t i = 0; i < numOfCols; ++i) { @@ -3457,12 +3460,12 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* initBasicInfo(&pInfo->binfo, pResultBlock); initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr); - pInfo->groupId = INT32_MIN; - pOperator->name = "TableAggregate"; + pInfo->groupId = INT32_MIN; + pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo, @@ -3578,25 +3581,26 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) return pList; } -SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, + SExecTaskInfo* pTaskInfo) { SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - int32_t numOfCols = 0; + int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols); SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc); SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset}; SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset}; - pInfo->limit = limit; - pInfo->slimit = slimit; - pInfo->curOffset = limit.offset; - pInfo->curSOffset = slimit.offset; - pInfo->binfo.pRes = pResBlock; + pInfo->limit = limit; + pInfo->slimit = slimit; + pInfo->curOffset = limit.offset; + pInfo->curSOffset = slimit.offset; + pInfo->binfo.pRes = pResBlock; pInfo->pFilterNode = pProjPhyNode->node.pConditions; int32_t numOfRows = 4096; @@ -3614,12 +3618,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); - pOperator->name = "ProjectOperator"; + pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo, NULL, NULL, NULL); @@ -3639,7 +3643,7 @@ _error: static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { SIndefOperatorInfo* pIndefInfo = pOperator->info; SOptrBasicInfo* pInfo = &pIndefInfo->binfo; - SExprSupp* pSup = &pOperator->exprSupp; + SExprSupp* pSup = &pOperator->exprSupp; SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); @@ -3677,7 +3681,7 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { SExprSupp* pScalarSup = &pIndefInfo->scalarSup; if (pScalarSup->pExprInfo != NULL) { code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs, - pIndefInfo->pPseudoColInfo); + pIndefInfo->pPseudoColInfo); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } @@ -3686,8 +3690,8 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); - code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pOperator->exprSupp.numOfExprs, - pIndefInfo->pPseudoColInfo); + code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, + pOperator->exprSupp.numOfExprs, pIndefInfo->pPseudoColInfo); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } @@ -3745,14 +3749,14 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy pInfo->binfo.pRes = pResBlock; pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); - pOperator->name = "IndefinitOperator"; + pOperator->name = "IndefinitOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.numOfExprs = numOfExpr; - pOperator->pTaskInfo = pTaskInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL, destroyIndefinitOperatorInfo, NULL, NULL, NULL); @@ -3827,11 +3831,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* goto _error; } - int32_t num = 0; - SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); - SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num); - SInterval* pInterval = &((SIntervalAggOperatorInfo*)downstream->info)->interval; - int32_t type = convertFillType(pPhyFillNode->mode); + int32_t num = 0; + SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); + SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num); + SInterval* pInterval = &((SIntervalAggOperatorInfo*)downstream->info)->interval; + int32_t type = convertFillType(pPhyFillNode->mode); SResultInfo* pResultInfo = &pOperator->resultInfo; initResultSizeInfo(pOperator, 4096); @@ -3842,16 +3846,16 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* goto _error; } - pInfo->pRes = pResBlock; + pInfo->pRes = pResBlock; pInfo->multigroupResult = multigroupResult; - pOperator->name = "FillOperator"; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; + pOperator->name = "FillOperator"; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = num; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = num; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL); @@ -4050,10 +4054,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STimeWindowAggSupp twSup = { .waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN}; tsdbReaderT pDataReader = NULL; - if (pHandle->vnode) { - pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); - } else { - getTableList(pHandle->meta, pScanPhyNode, pTableListInfo, pTagCond); + + if (pHandle) { + if (pHandle->vnode) { + pDataReader = + doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); + } else { + getTableList(pHandle->meta, pScanPhyNode, pTableListInfo, pTagCond); + } } if (pDataReader == NULL && terrno != 0) { @@ -4087,6 +4095,46 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { + SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*) pPhyNode; + pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); + + if (pBlockNode->tableType == TSDB_SUPER_TABLE) { + int32_t code = tsdbGetAllTableList(pHandle->meta, pBlockNode->uid, pTableListInfo->pTableList); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = terrno; + return NULL; + } + } else { // Create one table group. + STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid}; + taosArrayPush(pTableListInfo->pTableList, &info); + } + + SQueryTableDataCond cond = {0}; + + { + cond.order = TSDB_ORDER_ASC; + cond.numOfCols = 1; + cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); + if (cond.colList == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + cond.colList->colId = 1; + cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP; + cond.colList->bytes = sizeof(TSKEY); + + cond.numOfTWindows = 1; + cond.twindows = taosMemoryCalloc(1, sizeof(STimeWindow)); + cond.twindows[0] = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + cond.suid = pBlockNode->suid; + cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER; + } + tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + cleanupQueryTableDataCond(&cond); + + return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); } else { ASSERT(0); } @@ -4270,7 +4318,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { SColumn c = {0}; c.slotId = pNode->slotId; c.colId = pNode->slotId; - c.type = pValNode->node.type; + c.type = pValNode->node.type; c.bytes = pValNode->node.resType.bytes; c.scale = pValNode->node.resType.scale; c.precision = pValNode->node.resType.precision; @@ -4284,8 +4332,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { - int32_t code = - getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond); + int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4374,7 +4421,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) { ASSERT(length == *(int32_t*)result); const char* data = result + sizeof(int32_t); - code = ops->fpSet.decodeResultRow(ops, (char*) data); + code = ops->fpSet.decodeResultRow(ops, (char*)data); if (code != TDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6f0187fa5315f5052d63d0a73879b7df5428a08d..c47e9dd4aee34289f975ae7d3d3d32a6536be52f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -367,13 +368,14 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSDataBlock* pBlock = pTableScanInfo->pResBlock; int64_t st = taosGetTimestampUs(); while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { - if (isTaskKilled(pOperator->pTaskInfo)) { - longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } // process this data block based on the probabilities @@ -396,7 +398,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { continue; } - uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); if (groupId) { pBlock->info.groupId = *groupId; } @@ -525,7 +527,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, goto _error; } - //taosSsleep(20); + // taosSsleep(20); SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; @@ -589,44 +591,76 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pInfo->dataReader = pReadHandle; // pInfo->prevGroupId = -1; - pOperator->name = "TableSeqScanOperator"; + pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL); return pOperator; } +static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) { + int32_t rowLen = 0; + + SMetaReader mr = {0}; + metaReaderInit(&mr, pMeta, 0); + metaGetTableEntryByUid(&mr, uid); + if (mr.me.type == TSDB_SUPER_TABLE) { + int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols; + for(int32_t i = 0; i < numOfCols; ++i) { + rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes; + } + } else if (mr.me.type == TSDB_CHILD_TABLE) { + uint64_t suid = mr.me.ctbEntry.suid; + metaGetTableEntryByUid(&mr, suid); + int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols; + + for(int32_t i = 0; i < numOfCols; ++i) { + rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes; + } + } else if (mr.me.type == TSDB_NORMAL_TABLE) { + int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols; + for(int32_t i = 0; i < numOfCols; ++i) { + rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes; + } + } + + metaReaderClear(&mr); + return rowLen; +} + static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } - STableScanInfo* pTableScanInfo = pOperator->info; + SBlockDistInfo* pBlockScanInfo = pOperator->info; - STableBlockDistInfo blockDistInfo = {0}; - blockDistInfo.maxRows = INT_MIN; - blockDistInfo.minRows = INT_MAX; + STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; + blockDistInfo.rowSize = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid); - tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &blockDistInfo); - blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader); + tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); + blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle); - SSDataBlock* pBlock = pTableScanInfo->pResBlock; - pBlock->info.rows = 1; + SSDataBlock* pBlock = pBlockScanInfo->pResBlock; - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); + int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId; + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId); int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo); char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); varDataSetLen(p, len); + blockDataEnsureCapacity(pBlock, 1); colDataAppend(pColInfo, 0, p, false); taosMemoryFree(p); + pBlock->info.rows = 1; + pOperator->status = OP_EXEC_DONE; return pBlock; } @@ -636,7 +670,8 @@ static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) { blockDataDestroy(pDistInfo->pResBlock); } -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, + SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) { SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -644,21 +679,20 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* goto _error; } - pInfo->pHandle = dataReader; - - pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - - SColumnInfoData infoData = {0}; - infoData.info.type = TSDB_DATA_TYPE_VARCHAR; - infoData.info.bytes = 1024; - - taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData); + pInfo->pHandle = dataReader; + pInfo->readHandle = *readHandle; + pInfo->uid = uid; + pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); - pOperator->name = "DataBlockInfoScanOperator"; - // pOperator->operatorType = OP_TableBlockInfoScan; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols); + initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols); + + pOperator->name = "DataBlockDistScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, @@ -706,9 +740,8 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup; int64_t gap = pInfo->sessionSup.gap; int32_t winIndex = 0; - SResultWindowInfo* pCurWin = - getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], INT64_MIN, - pSDB->info.groupId, gap, &winIndex); + SResultWindowInfo* pCurWin = + getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex); win = pCurWin->win; pInfo->updateResIndex += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, pInfo->updateResIndex, gap, NULL); @@ -789,23 +822,23 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { if (!pResult) { return NULL; } - + if (pResult->info.groupId == pInfo->groupId) { return pResult; } } -/* Todo(liuyao) for partition by column - SSDataBlock* pBlock = createOneDataBlock(pResult, true); - blockDataCleanup(pResult); - for (int32_t i = 0; i < pBlock->info.rows; i++) { - uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i); - if (id == pInfo->groupId) { - copyOneRow(pResult, pBlock, i); + /* Todo(liuyao) for partition by column + SSDataBlock* pBlock = createOneDataBlock(pResult, true); + blockDataCleanup(pResult); + for (int32_t i = 0; i < pBlock->info.rows; i++) { + uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i); + if (id == pInfo->groupId) { + copyOneRow(pResult, pBlock, i); + } } - } - return pResult; -*/ + return pResult; + */ } static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) { @@ -820,7 +853,7 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex); pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); int32_t i = 0; - for ( ; i < size; i++) { + for (; i < size; i++) { rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex); uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); if (pInfo->groupId != id) { @@ -1050,9 +1083,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan; SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); - - STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; int32_t numOfCols = 0; pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); @@ -1069,16 +1099,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan } } - // set the extract column id to streamHandle - tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds); - SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList); - int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList); - if (code != 0) { - taosArrayDestroy(tableIdList); - goto _error; - } - taosArrayDestroy(tableIdList); - pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES); if (pInfo->pBlockLists == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1090,10 +1110,31 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan goto _error; } - if (pSTInfo->interval.interval > 0 && pDataReader) { - pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark); - } else { - pInfo->pUpdateInfo = NULL; + if (pHandle) { + SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; + if (pSTInfo->interval.interval > 0) { + pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark); + } else { + pInfo->pUpdateInfo = NULL; + } + pInfo->pOperatorDumy = pTableScanDummy; + pInfo->interval = pSTInfo->interval; + + pInfo->readHandle = *pHandle; + ASSERT(pHandle->reader); + pInfo->streamBlockReader = pHandle->reader; + pInfo->tableUid = pScanPhyNode->uid; + + // set the extract column id to streamHandle + tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds); + SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList); + int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList); + if (code != 0) { + taosArrayDestroy(tableIdList); + goto _error; + } + taosArrayDestroy(tableIdList); } // create the pseduo columns info @@ -1101,19 +1142,14 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); } - pInfo->readHandle = *pHandle; - pInfo->tableUid = pScanPhyNode->uid; - pInfo->streamBlockReader = pHandle->reader; pInfo->pRes = createResDataBlock(pDescNode); pInfo->pUpdateRes = createResDataBlock(pDescNode); pInfo->pCondition = pScanPhyNode->node.pConditions; pInfo->pDataReader = pDataReader; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - pInfo->pOperatorDumy = pTableScanDummy; - pInfo->interval = pSTInfo->interval; pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; pInfo->groupId = 0; - + pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->blocking = false; @@ -1392,15 +1428,15 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { // table comment pColInfoData = taosArrayGet(p->pDataBlock, 8); - if(pInfo->pCur->mr.me.ctbEntry.commentLen > 0) { + if (pInfo->pCur->mr.me.ctbEntry.commentLen > 0) { char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ctbEntry.comment); colDataAppend(pColInfoData, numOfRows, comment, false); - }else if(pInfo->pCur->mr.me.ctbEntry.commentLen == 0) { + } else if (pInfo->pCur->mr.me.ctbEntry.commentLen == 0) { char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(comment, ""); colDataAppend(pColInfoData, numOfRows, comment, false); - }else{ + } else { colDataAppendNULL(pColInfoData, numOfRows); } @@ -1428,15 +1464,15 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { // table comment pColInfoData = taosArrayGet(p->pDataBlock, 8); - if(pInfo->pCur->mr.me.ntbEntry.commentLen > 0) { + if (pInfo->pCur->mr.me.ntbEntry.commentLen > 0) { char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ntbEntry.comment); colDataAppend(pColInfoData, numOfRows, comment, false); - }else if(pInfo->pCur->mr.me.ntbEntry.commentLen == 0) { + } else if (pInfo->pCur->mr.me.ntbEntry.commentLen == 0) { char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(comment, ""); colDataAppend(pColInfoData, numOfRows, comment, false); - }else{ + } else { colDataAppendNULL(pColInfoData, numOfRows); } @@ -1536,7 +1572,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { } extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen, - pOperator->exprSupp.numOfExprs, startTs, NULL, pInfo->scanCols); + pOperator->exprSupp.numOfExprs, startTs, NULL, pInfo->scanCols); // todo log the filter info doFilterResult(pInfo); @@ -1833,7 +1869,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi int32_t num = 0; int32_t numOfExprs = 0; SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); - SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID); + SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID); + + + initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); pInfo->pTableList = pTableListInfo; pInfo->pColMatchInfo = colList; @@ -1842,13 +1881,12 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pInfo->curPos = 0; pInfo->pFilterNode = pPhyNode->node.pConditions; - pOperator->name = "TagScanOperator"; + pOperator->name = "TagScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; + pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfExprs; pOperator->pTaskInfo = pTaskInfo; initResultSizeInfo(pOperator, 4096); @@ -1918,8 +1956,7 @@ typedef struct STableMergeScanInfo { int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { - int32_t code = - getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond); + int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -1956,7 +1993,7 @@ _error: static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableMergeScanInfo* pInfo = pOperator->info; SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; @@ -2143,9 +2180,8 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) { int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = - tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, - numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, + pInfo->pSortInputBlock, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL); @@ -2209,8 +2245,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, code); } - SSDataBlock* pBlock = - getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); + SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -2243,20 +2278,20 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { typedef struct STableMergeScanExecInfo { SFileBlockLoadRecorder blockRecorder; - SSortExecInfo sortExecInfo; + SSortExecInfo sortExecInfo; } STableMergeScanExecInfo; int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { ASSERT(pOptr != NULL); // TODO: merge these two info into one struct STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo)); - STableMergeScanInfo* pInfo = pOptr->info; + STableMergeScanInfo* pInfo = pOptr->info; execInfo->blockRecorder = pInfo->readRecorder; execInfo->sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); *pOptrExplain = execInfo; *len = sizeof(STableMergeScanExecInfo); - + return TSDB_CODE_SUCCESS; } @@ -2271,8 +2306,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; int32_t numOfCols = 0; - SArray* pColList = - extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { @@ -2286,16 +2320,16 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - pInfo->readHandle = *readHandle; - pInfo->interval = extractIntervalInfo(pTableScanNode); + pInfo->readHandle = *readHandle; + pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->sample.sampleRatio = pTableScanNode->ratio; - pInfo->sample.seed = taosGetTimestampSec(); - pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; - pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; - pInfo->dataReaders = dataReaders; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColList; - pInfo->curTWinIdx = 0; + pInfo->sample.seed = taosGetTimestampSec(); + pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; + pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + pInfo->dataReaders = dataReaders; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColList; + pInfo->curTWinIdx = 0; pInfo->pResBlock = createResDataBlock(pDescNode); @@ -2313,22 +2347,22 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); int32_t rowSize = pInfo->pResBlock->info.rowSize; - pInfo->bufPageSize = getProperSortPageSize(rowSize); + pInfo->bufPageSize = getProperSortPageSize(rowSize); // todo the total available buffer should be determined by total capacity of buffer of this task. // the additional one is reserved for merge result - pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1); - pInfo->hasGroupId = false; + pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1); + pInfo->hasGroupId = false; pInfo->prefetchedTuple = NULL; - pOperator->name = "TableMergeScanOperator"; + pOperator->name = "TableMergeScanOperator"; // TODO : change it pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; initResultSizeInfo(pOperator, 1024); pOperator->fpSet = diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index c243c1c175ee0e3a909af3db4252127bf9aa404d..e691d562c6b4426015a7a7d10906d6c4b307cb5f 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -192,6 +192,8 @@ int32_t twaFunction(SqlFunctionCtx *pCtx); int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); + +bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t blockDistFunction(SqlFunctionCtx *pCtx); int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 522ee09b3cac7e4865be72ab381ff6853bf1301d..cfad00f45898a60dc196edae522c67246d5afb69 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -419,7 +419,7 @@ static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return TSDB_CODE_SUCCESS; } -int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) { +int32_t topBotCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) { int32_t code = nodesListMakeAppend(pParameters, pPartialRes); if (TSDB_CODE_SUCCESS == code) { code = nodesListStrictAppend(*pParameters, nodesCloneNode(nodesListGetNode(pRawParameters, 1))); @@ -427,65 +427,6 @@ int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeL return TSDB_CODE_SUCCESS; } -static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { - int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); - - if (isPartial) { - if (2 != numOfParams) { - return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); - } - - uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; - uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; - if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - - // param1 - SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1); - if (nodeType(pParamNode1) != QUERY_NODE_VALUE) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - - SValueNode* pValue = (SValueNode*)pParamNode1; - if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - - if (pValue->datum.i < 1 || pValue->datum.i > 100) { - return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); - } - - pValue->notReserved = true; - - // set result type - pFunc->node.resType = - (SDataType){.bytes = getTopBotInfoSize(pValue->datum.i) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; - } else { - if (1 != numOfParams) { - return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); - } - - uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; - if (TSDB_DATA_TYPE_BINARY != para1Type) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - - // Do nothing. We can only access output of partial functions as input, - // so original input type cannot be obtained, resType will be set same - // as original function input type after merge function created. - } - return TSDB_CODE_SUCCESS; -} - -static int32_t translateTopBotPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - return translateTopBotImpl(pFunc, pErrBuf, len, true); -} - -static int32_t translateTopBotMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - return translateTopBotImpl(pFunc, pErrBuf, len, false); -} - static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -1735,31 +1676,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = topFunction, .finalizeFunc = topBotFinalize, .combineFunc = topCombine, - .pPartialFunc = "_top_partial", - .pMergeFunc = "_top_merge", - // .createMergeParaFuc = topCreateMergePara - }, - { - .name = "_top_partial", - .type = FUNCTION_TYPE_TOP_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTopBotPartial, - .getEnvFunc = getTopBotFuncEnv, - .initFunc = topBotFunctionSetup, - .processFunc = topFunction, - .finalizeFunc = topBotPartialFinalize, - .combineFunc = topCombine, - }, - { - .name = "_top_merge", - .type = FUNCTION_TYPE_TOP_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTopBotMerge, - .getEnvFunc = getTopBotMergeFuncEnv, - .initFunc = functionSetup, - .processFunc = topFunctionMerge, - .finalizeFunc = topBotMergeFinalize, - .combineFunc = topCombine, + .pPartialFunc = "top", + .pMergeFunc = "top", + .createMergeParaFuc = topBotCreateMergePara }, { .name = "bottom", @@ -1771,30 +1690,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = bottomFunction, .finalizeFunc = topBotFinalize, .combineFunc = bottomCombine, - .pPartialFunc = "_bottom_partial", - .pMergeFunc = "_bottom_merge" - }, - { - .name = "_bottom_partial", - .type = FUNCTION_TYPE_BOTTOM_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTopBotPartial, - .getEnvFunc = getTopBotFuncEnv, - .initFunc = topBotFunctionSetup, - .processFunc = bottomFunction, - .finalizeFunc = topBotPartialFinalize, - .combineFunc = bottomCombine, - }, - { - .name = "_bottom_merge", - .type = FUNCTION_TYPE_BOTTOM_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTopBotMerge, - .getEnvFunc = getTopBotMergeFuncEnv, - .initFunc = functionSetup, - .processFunc = bottomFunctionMerge, - .finalizeFunc = topBotMergeFinalize, - .combineFunc = bottomCombine, + .pPartialFunc = "bottom", + .pMergeFunc = "bottom", + .createMergeParaFuc = topBotCreateMergePara }, { .name = "spread", @@ -2524,7 +2422,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getSelectivityFuncEnv, // todo remove this function later. .initFunc = functionSetup, .processFunc = NULL, - .finalizeFunc = NULL + .finalizeFunc = NULL, + .pPartialFunc = "_select_value", + .pMergeFunc = "_select_value" }, { .name = "_block_dist", @@ -2532,6 +2432,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateBlockDistFunc, .getEnvFunc = getBlockDistFuncEnv, + .initFunc = blockDistSetup, .processFunc = blockDistFunction, .finalizeFunc = blockDistFinalize }, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 7440df062f89d392bfa2380983fd3a0e822e67d7..076ae6146021b33179eb2271086c4b5a613176fd 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -67,8 +67,7 @@ typedef struct STopBotResItem { typedef struct STopBotRes { int32_t maxSize; - int16_t type; // store the original input type, used in merge function - int32_t numOfItems; + int16_t type; STopBotResItem* pItems; } STopBotRes; @@ -2872,12 +2871,6 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool getTopBotMergeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - // intermediate result is binary and length contains VAR header size - pEnv->calcMemSize = pFunc->node.resType.bytes; - return true; -} - bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { if (!functionSetup(pCtx, pResInfo)) { return false; @@ -2952,50 +2945,6 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void topBotTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, bool isTopQuery) { - for (int32_t i = 0; i < pInput->numOfItems; i++) { - addResult(pCtx, &pInput->pItems[i], pInput->type, isTopQuery); - } -} - -int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { - SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pCol = pInput->pData[0]; - ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); - - int32_t start = pInput->startRowIndex; - char* data = colDataGetData(pCol, start); - STopBotRes* pInputInfo = (STopBotRes*)varDataVal(data); - STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - - pInfo->maxSize = pInputInfo->maxSize; - pInfo->type = pInputInfo->type; - topBotTransferInfo(pCtx, pInputInfo, true); - SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); - - return TSDB_CODE_SUCCESS; -} - -int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx) { - SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pCol = pInput->pData[0]; - ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); - - int32_t start = pInput->startRowIndex; - char* data = colDataGetData(pCol, start); - STopBotRes* pInputInfo = (STopBotRes*)varDataVal(data); - STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - - pInfo->maxSize = pInputInfo->maxSize; - pInfo->type = pInputInfo->type; - topBotTransferInfo(pCtx, pInputInfo, false); - SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); - - return TSDB_CODE_SUCCESS; -} - static int32_t topBotResComparFn(const void* p1, const void* p2, const void* param) { uint16_t type = *(uint16_t*)param; @@ -3044,8 +2993,6 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // allocate the buffer and keep the data of this row into the new allocated buffer pEntryInfo->numOfRes++; - // accumulate number of items for each vgroup, this info is needed for merge - pRes->numOfItems++; taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, !isTopQuery); } else { // replace the minimum value in the result @@ -3144,7 +3091,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS releaseBufPage(pCtx->pBuf, pPage); } -int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMerge) { +int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); @@ -3164,39 +3111,13 @@ int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMer colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false); } - if (!isMerge) { - setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); - } + setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); currentRow += 1; } return pEntryInfo->numOfRes; } -int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return topBotFinalizeImpl(pCtx, pBlock, false); } - -int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { - return topBotFinalizeImpl(pCtx, pBlock, true); -} - -int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { - SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = getTopBotInfoSize(pRes->maxSize); - char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); - - memcpy(varDataVal(res), pRes, resultBytes); - varDataSetLen(res, resultBytes); - - int32_t slotId = pCtx->pExpr->base.resSchema.slotId; - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - - colDataAppend(pCol, pBlock->info.rows, res, false); - - taosMemoryFree(res); - return 1; -} - void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx); @@ -3211,8 +3132,6 @@ void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, pItem->tuplePos.pageId = -1; replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos); pEntryInfo->numOfRes++; - // accumulate number of items for each vgroup, this info is needed for merge - pRes->numOfItems++; taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, !isTopQuery); } else { // replace the minimum value in the result @@ -5004,7 +4923,19 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + STableBlockDistInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + pInfo->minRows = INT32_MAX; + return true; +} + int32_t blockDistFunction(SqlFunctionCtx* pCtx) { + const int32_t BLOCK_DIST_RESULT_ROWS = 24; + SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; @@ -5022,6 +4953,11 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { pDistInfo->totalRows += p1.totalRows; pDistInfo->numOfFiles += p1.numOfFiles; + pDistInfo->defMinRows = p1.defMinRows; + pDistInfo->defMaxRows = p1.defMaxRows; + pDistInfo->rowSize = p1.rowSize; + pDistInfo->numOfSmallBlocks = p1.numOfSmallBlocks; + if (pDistInfo->minRows > p1.minRows) { pDistInfo->minRows = p1.minRows; } @@ -5033,7 +4969,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i]; } - pResInfo->numOfRes = 1; + pResInfo->numOfRes = BLOCK_DIST_RESULT_ROWS; // default output rows return TSDB_CODE_SUCCESS; } @@ -5045,7 +4981,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1; if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1; - if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1; + if (tEncodeU32(&encoder, pInfo->numOfBlocks) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1; if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1; @@ -5076,7 +5012,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1; if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1; - if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1; + if (tDecodeU32(&decoder, &pInfo->numOfBlocks) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1; if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1; @@ -5098,32 +5034,29 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - char* pData = GET_ROWCELL_INTERBUF(pResInfo); + STableBlockDistInfo* pData = GET_ROWCELL_INTERBUF(pResInfo); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); int32_t row = 0; - - STableBlockDistInfo info = {0}; - tDeserializeBlockDistInfo(varDataVal(pData), varDataLen(pData), &info); - char st[256] = {0}; + double totalRawSize = pData->totalRows * pData->rowSize; int32_t len = - sprintf(st + VARSTR_HEADER_SIZE, "Blocks=[%d] Size=[%.3fKb] Average_Block_size=[%.3fKb] Compression_Ratio=[%.3f]", - info.numOfBlocks, info.totalSize / 1024.0, info.totalSize / (info.numOfBlocks * 1024.0), - info.totalSize / (info.totalRows * info.rowSize * 1.0)); + sprintf(st + VARSTR_HEADER_SIZE, "Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]", + pData->numOfBlocks, pData->totalSize / 1024.0, ((double)pData->totalSize) / pData->numOfBlocks, + pData->totalSize * 100 / totalRawSize, '%'); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); - len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%ld] MinRows=[%d] MaxRows=[%d] Averge_Rows=[%ld] Inmem_Rows=[%d]", - info.totalRows, info.minRows, info.maxRows, info.totalRows / info.numOfBlocks, info.numOfInmemRows); + len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%"PRId64"] Inmem_Rows=[%d] MinRows=[%d] MaxRows=[%d] Average_Rows=[%"PRId64"]", + pData->totalRows, pData->numOfInmemRows, pData->minRows, pData->maxRows, pData->totalRows / pData->numOfBlocks); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); - len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", info.numOfTables, - info.numOfFiles, 0); + len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables, + pData->numOfFiles, 0); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); @@ -5135,40 +5068,56 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t maxVal = 0; int32_t minVal = INT32_MAX; - for (int32_t i = 0; i < sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]); ++i) { - if (maxVal < info.blockRowsHisto[i]) { - maxVal = info.blockRowsHisto[i]; + for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) { + if (maxVal < pData->blockRowsHisto[i]) { + maxVal = pData->blockRowsHisto[i]; } - if (minVal > info.blockRowsHisto[i]) { - minVal = info.blockRowsHisto[i]; + if (minVal > pData->blockRowsHisto[i]) { + minVal = pData->blockRowsHisto[i]; } } int32_t delta = maxVal - minVal; int32_t step = delta / 50; + if (step == 0) { + step = 1; + } + + int32_t numOfBuckets = sizeof(pData->blockRowsHisto) / sizeof(pData->blockRowsHisto[0]); + int32_t bucketRange = (pData->maxRows - pData->minRows) / numOfBuckets; + + bool singleModel = false; + if (bucketRange == 0) { + singleModel = true; + step = 20; + bucketRange = (pData->defMaxRows - pData->defMinRows) / numOfBuckets; + } - int32_t numOfBuckets = sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]); - int32_t bucketRange = (info.maxRows - info.minRows) / numOfBuckets; + for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) { + len = sprintf(st + VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1)); - for (int32_t i = 0; i < 20; ++i) { - len += sprintf(st + VARSTR_HEADER_SIZE, "%04d |", info.defMinRows + bucketRange * (i + 1)); + int32_t num = 0; + if (singleModel && pData->blockRowsHisto[i] > 0) { + num = 20; + } else { + num = (pData->blockRowsHisto[i] + step - 1) / step; + } - int32_t num = (info.blockRowsHisto[i] + step - 1) / step; for (int32_t j = 0; j < num; ++j) { int32_t x = sprintf(st + VARSTR_HEADER_SIZE + len, "%c", '|'); len += x; } - double v = info.blockRowsHisto[i] * 100.0 / info.numOfBlocks; - len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.3f%c)", info.blockRowsHisto[i], v, '%'); + double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks; + len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%'); printf("%s\n", st); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); } - return row; + return TSDB_CODE_SUCCESS; } typedef struct SDerivInfo { diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index e3218f972b0076e2d6ca3f7f2e303383658b24cb..60589784511dbe2eab5ab16906f5149e7e2bd5da 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -419,6 +419,24 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE); } +static int32_t collectMetaKeyFromShowBlockDist(SCollectMetaKeyCxt* pCxt, SShowTableDistributedStmt* pStmt) { + SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId}; + strcpy(name.dbname, pStmt->dbName); + strcpy(name.tname, pStmt->tableName); + int32_t code = catalogRemoveTableMeta(pCxt->pParseCxt->pCatalog, &name); + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); + } + + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); + } + if (TSDB_CODE_SUCCESS == code) { + code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache); + } + return code; +} + static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { pCxt->pStmt = pStmt; switch (nodeType(pStmt)) { @@ -497,6 +515,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_DELETE_STMT: return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt); + case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT: + return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt); default: break; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a35e7679a121c9799a61a7973cbb1b3fc3898a5a..b5a63d937ae6be238406839921ba8a514568001a 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -36,11 +36,11 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->dataScan) < 0) return -1; if (tEncodeI32(pEncoder, pTask->childId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; @@ -84,11 +84,11 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->dataScan) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->childId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;