diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 01b142e333d39246b378ef7cb5b4b2291993c61d..b468456cb78e193cf83ff932699ac1ee8661eb77 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1525,9 +1525,23 @@ typedef struct SMqSetCVgReq { char* sql; char* logicalPlan; char* physicalPlan; - SArray* tasks; // SArray + SSubQueryMsg msg; } SMqSetCVgReq; +static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { + int32_t tlen = sizeof(SSubQueryMsg) + pMsg->contentLen; + if (buf == NULL) return tlen; + memcpy(*buf, pMsg, tlen); + *buf = POINTER_SHIFT(*buf, tlen); + return tlen; +} + +static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) { + int32_t tlen = sizeof(SSubQueryMsg) + ((SSubQueryMsg*)buf)->contentLen; + memcpy(pMsg, buf, tlen); + return POINTER_SHIFT(buf, tlen); +} + static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pReq->vgId); @@ -1537,6 +1551,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); + tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } @@ -1548,7 +1563,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); - pReq->tasks = NULL; + buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 61970ff44014f975a13f987ecc03cb17d960f8ba..a98bb8f51a1b8bf8bb267bc95158f5321bfb54c0 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -26,6 +26,13 @@ typedef void* qTaskInfo_t; typedef void* DataSinkHandle; struct SSubplan; + /** + * Create the exec task for streaming mode + * @param pMsg + * @param pStreamBlockReadHandle + * @return + */ +qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle); /** * Create the exec task object according to task json @@ -203,4 +210,4 @@ void** qDeregisterQInfo(void* pMgmt, void* pQInfo); } #endif -#endif /*_TD_EXECUTOR_H_*/ \ No newline at end of file +#endif /*_TD_EXECUTOR_H_*/ diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 39d8ec34203accf5a8842a733334c5ec9ef1318e..b15d5ad33aa2556bf915d07617ea84833fedbeb4 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -254,6 +254,7 @@ typedef struct SMultiFunctionsDesc { bool interpQuery; bool distinct; bool join; + bool continueQuery; } SMultiFunctionsDesc; int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength, diff --git a/include/libs/planner/plannerOp.h b/include/libs/planner/plannerOp.h index 2793e726358bd54126a649615981a21b4fea2a4a..42d3307ac851515e6f914c9a19b8039c1b10666e 100644 --- a/include/libs/planner/plannerOp.h +++ b/include/libs/planner/plannerOp.h @@ -23,6 +23,7 @@ #error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME #endif +OP_ENUM_MACRO(StreamScan) OP_ENUM_MACRO(TableScan) OP_ENUM_MACRO(DataBlocksOptScan) OP_ENUM_MACRO(TableSeqScan) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 02207c4d1ba8c877790f970aeb82e8d12cc34809..1925f0e3bda380b375b065b183ccce53cc2f994b 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -109,12 +109,71 @@ typedef struct STableMetaOutput { STableMeta *tbMeta; } STableMetaOutput; -const SSchema* tGetTbnameColumnSchema(); +typedef struct SDataBuf { + void *pData; + uint32_t len; +} SDataBuf; + +typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); +typedef int32_t (*__async_exec_fn_t)(void* param); + +typedef struct SMsgSendInfo { + __async_send_cb_fn_t fp; //async callback function + void *param; + uint64_t requestId; + uint64_t requestObjRefId; + int32_t msgType; + SDataBuf msgInfo; +} SMsgSendInfo; + +typedef struct SQueryNodeAddr { + int32_t nodeId; // vgId or qnodeId + int8_t inUse; + int8_t numOfEps; + SEpAddr epAddr[TSDB_MAX_REPLICA]; +} SQueryNodeAddr; + +static FORCE_INLINE void tConvertQueryAddrToEpSet(SEpSet* pEpSet, const SQueryNodeAddr* pAddr) { + pEpSet->inUse = pAddr->inUse; + pEpSet->numOfEps = pAddr->numOfEps; + for (int j = 0; j < TSDB_MAX_REPLICA; j++) { + pEpSet->port[j] = pAddr->epAddr[j].port; + memcpy(pEpSet->fqdn[j], pAddr->epAddr[j].fqdn, TSDB_FQDN_LEN); + } +} + +int32_t initTaskQueue(); +int32_t cleanupTaskQueue(); + +/** + * + * @param execFn The asynchronously execution function + * @param execParam The parameters of the execFn + * @param code The response code during execution the execFn + * @return + */ +int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); + +/** + * Asynchronously send message to server, after the response received, the callback will be incured. + * + * @param pTransporter + * @param epSet + * @param pTransporterId + * @param pInfo + * @return + */ +int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); + void initQueryModuleMsgHandle(); +const SSchema* tGetTbnameColumnSchema(); +bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); + extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize); + #define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE #define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 25e295f980e3de14cfedff8c480fdfb5be62cf41..5afafa08a38e8ce1c6acf69b527184582a0c764d 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -83,56 +83,6 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); - -typedef struct SDataBuf { - void *pData; - uint32_t len; -} SDataBuf; - -typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); -typedef int32_t (*__async_exec_fn_t)(void* param); - -typedef struct SMsgSendInfo { - __async_send_cb_fn_t fp; //async callback function - void *param; - uint64_t requestId; - uint64_t requestObjRefId; - int32_t msgType; - SDataBuf msgInfo; -} SMsgSendInfo; - -typedef struct SQueryNodeAddr { - int32_t nodeId; // vgId or qnodeId - int8_t inUse; - int8_t numOfEps; - SEpAddr epAddr[TSDB_MAX_REPLICA]; -} SQueryNodeAddr; - -bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); - -int32_t initTaskQueue(); -int32_t cleanupTaskQueue(); - -/** - * - * @param execFn The asynchronously execution function - * @param execParam The parameters of the execFn - * @param code The response code during execution the execFn - * @return - */ -int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); - -/** - * Asynchronously send message to server, after the response received, the callback will be incured. - * - * @param pTransporter - * @param epSet - * @param pTransporterId - * @param pInfo - * @return - */ -int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); - #ifdef __cplusplus } #endif diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 34ab1fb05ab37dcb76ee1179a6c11d035349cd32..159a92b0ab4c3ca1b26c05121a448ac2a30a1d94 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -394,6 +394,9 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return); + SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo* ) pQueryNode; + pQueryStmtInfo->info.continueQuery = true; + // todo check for invalid sql statement and return with error code CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return); @@ -403,6 +406,8 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, goto _return; } + printf("%s\n", pStr); + // The topic should be related to a database that the queried table is belonged to. SName name = {0}; char dbName[TSDB_DB_FNAME_LEN] = {0}; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 440ef0d728b82e1c17c6e44e3ef1976e834df419..736a47273fd68fa9fa15fc2f3d8d0bb821b360f3 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -525,30 +525,30 @@ TEST(testCase, driverInit_Test) { // taosHashCleanup(phash); //} // -//TEST(testCase, create_topic_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// ASSERT_TRUE(pFields == nullptr); -// -// int32_t numOfFields = taos_num_fields(pRes); -// ASSERT_EQ(numOfFields, 0); -// -// taos_free_result(pRes); -// -// char* sql = "select * from tu"; -// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); -// taos_free_result(pRes); -// taos_close(pConn); -//} -// +TEST(testCase, create_topic_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == nullptr); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + char* sql = "select * from tu"; + pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); + taos_free_result(pRes); + taos_close(pConn); +} + //TEST(testCase, insert_test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // ASSERT_NE(pConn, nullptr); @@ -646,36 +646,36 @@ TEST(testCase, driverInit_Test) { // taos_close(pConn); //} -TEST(testCase, agg_query_tables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use dbv"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table tx using st tags(111111111111111)"); - if (taos_errno(pRes) != 0) { - printf("failed to create table, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "select count(*) from tu"); - if (taos_errno(pRes) != 0) { - printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} \ No newline at end of file +//TEST(testCase, agg_query_tables) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); +// +// TAOS_RES* pRes = taos_query(pConn, "use dbv"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table tx using st tags(111111111111111)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create table, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "select count(*) from tu"); +// if (taos_errno(pRes) != 0) { +// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} \ No newline at end of file diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 772b9bf07919a3cb643c91408d224083e6513210..1507e2a30d472cd5a9bed64ec6433e24fdf4091c 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -327,25 +327,30 @@ typedef struct SMqTopicConsumer { #endif typedef struct SMqConsumerEp { - int32_t vgId; // -1 for unassigned - SEpSet epset; - int64_t consumerId; // -1 for unassigned - int64_t lastConsumerHbTs; - int64_t lastVgHbTs; + int32_t vgId; // -1 for unassigned + SEpSet epSet; + int64_t consumerId; // -1 for unassigned + int64_t lastConsumerHbTs; + int64_t lastVgHbTs; + int32_t execLen; + SSubQueryMsg qExec; } SMqConsumerEp; static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); - tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset); + tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); + tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); return tlen; } static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); - buf = taosDecodeSEpSet(buf, &pConsumerEp->epset); + buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); + buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); + pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen; return buf; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 4cf7505f74ec214606f44b86a40b696aad4cd5f0..817ca4f4bede60d3ea76bd4edd73e4bfaf8e6a91 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -105,6 +105,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { strcpy(req.sql, pTopic->sql); strcpy(req.logicalPlan, pTopic->logicalPlan); strcpy(req.physicalPlan, pTopic->physicalPlan); + memcpy(&req.msg, &pCEp->qExec, pCEp->execLen); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); void *reqStr = malloc(tlen); if (reqStr == NULL) { @@ -116,7 +117,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { // persist msg STransAction action = {0}; - action.epSet = pCEp->epset; + action.epSet = pCEp->epSet; action.pCont = reqStr; action.contLen = tlen; action.msgType = TDMT_VND_MQ_SET_CONN; @@ -141,20 +142,24 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { } static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { - SMqConsumerEp CEp; - CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; - int32_t sz; - SVgObj *pVgroup = NULL; - SSdb *pSdb = pMnode->pSdb; - void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup); - while (pIter != NULL) { - if (pVgroup->dbUid == pTopic->dbUid) { - CEp.epset = mndGetVgroupEpset(pMnode, pVgroup); - CEp.vgId = pVgroup->vgId; - taosArrayPush(unassignedVg, &CEp); - } - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + //convert phyplan to dag + SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); + SArray *pArray; + if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { + return -1; } + int32_t sz = taosArrayGetSize(pArray); + //convert dag to msg + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp CEp; + CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; + STaskInfo* pTaskInfo = taosArrayGet(pArray, i); + tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr); + CEp.vgId = pTaskInfo->addr.nodeId; + taosArrayPush(unassignedVg, &CEp); + } + + qDestroyQueryDag(pDag); return 0; } diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 75734d2d29a0794c7c38284dc9d7cbf0b94bbf56..e625c56db177f6d4e1b2879796ebca547ca3fb70 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -26,6 +26,7 @@ target_link_libraries( PUBLIC tfs PUBLIC wal PUBLIC scheduler + PUBLIC executor PUBLIC qworker ) diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index ec71777882ba72769c90d9d222dbd949a0733090..8c4effa221f437acebab25fa34386135f9572bb4 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -21,6 +21,7 @@ #include "meta.h" #include "os.h" #include "scheduler.h" +#include "executor.h" #include "taoserror.h" #include "tlist.h" #include "tmsg.h" @@ -165,7 +166,7 @@ typedef struct STqTaskItem { int8_t status; int64_t offset; void* dst; - SSubQueryMsg* pMsg; + qTaskInfo_t task; } STqTaskItem; // new version diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ead856a06b17ccdb3cb01e1645d1c96860c10720..89d4af48fd843f33de1283679efceb32bfb3b597 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -634,7 +634,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { } SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; - SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg; + /*SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;*/ // TODO: launch query and get output data void* outputData; @@ -681,7 +681,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { // TODO: handle error } - ASSERT(taosArrayGetSize(pArray) == 0); STaskInfo *pInfo = taosArrayGet(pArray, 0); SArray* pTasks; schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); @@ -689,8 +688,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { pTopic->buffer.lastOffset = -1; for (int i = 0; i < TQ_BUFFER_SIZE; i++) { SSubQueryMsg* pMsg = taosArrayGet(pTasks, i); - pTopic->buffer.output[i].pMsg = pMsg; pTopic->buffer.output[i].status = 0; + pTopic->buffer.output[i].task = createStreamExecTaskInfo(pMsg, NULL); } pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); // write mq meta @@ -733,6 +732,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) // TODO: filter out unused column return 0; } + SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { int32_t sversion = pHandle->pBlock->sversion; SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true); @@ -762,7 +762,3 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { taosArrayPush(pArray, &colInfo); return pArray; } -/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t - * status) {*/ -/*return 0;*/ -/*}*/ diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2fe3392b25a5303b4500ccd34fe76087f1dfc687..73a30a62f57a5092a706df2221259854e8129fd2 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -253,12 +253,8 @@ typedef struct SExecTaskInfo { STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure pthread_mutex_t lock; // used to synchronize the rsp/query threads -// tsem_t ready; -// int32_t dataReady; // denote if query result is ready or not -// void* rspContext; // response context char *sql; // query sql string jmp_buf env; // - DataSinkHandle dsHandle; struct SOperatorInfo *pRoot; } SExecTaskInfo; @@ -666,6 +662,6 @@ int32_t getMaximumIdleDurationSec(); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type); void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status); -int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle); +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle); #endif // TDENGINE_EXECUTORIMPL_H diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6dea4a4e57392be988126c579648f39a8270b9bf..a9334022965c41a29617fad7cddcc88cb9acb9c9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -11,4 +11,36 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "planner.h" +#include "executor.h" + +qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) { + if (pMsg == NULL || pStreamBlockReadHandle == NULL) { + return NULL; + } + + // print those info into log + pMsg->sId = be64toh(pMsg->sId); + pMsg->queryId = be64toh(pMsg->queryId); + pMsg->taskId = be64toh(pMsg->taskId); + pMsg->contentLen = ntohl(pMsg->contentLen); + + struct SSubplan *plan = NULL; + int32_t code = qStringToSubplan(pMsg->msg, &plan); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + + qTaskInfo_t pTaskInfo = NULL; + code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL); + if (code != TSDB_CODE_SUCCESS) { + // TODO: destroy SSubplan & pTaskInfo + terrno = code; + return NULL; + } + + return pTaskInfo; +} diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 9a81d2fe6b38eff127cedc1dc8a7d7f0b5fb71ab..c5b57ee1a5f8f6398f262032c2eff702f30980b7 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -69,11 +69,11 @@ void freeParam(STaskParam *param) { tfree(param->prevResult); } -int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { - assert(tsdb != NULL && pSubplan != NULL); +int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { + assert(readHandle != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb); + int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -84,11 +84,9 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ goto _error; } - code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle); + code = dsCreateDataSinker(pSubplan->pDataSink, handle); - *handle = (*pTask)->dsHandle; - -_error: + _error: // if failed to add ref for all tables in this query, abort current query return code; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index dc4d9c7238c5ced5df8c0c930413bf4072b91a4c..0ed480ed15f486cfc3f9bea1e6fdc35d8680bcfa 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -7778,22 +7778,29 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, return NULL; } -int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) { - tsdbReadHandleT tReaderHandle = NULL; - - int32_t code = 0; +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) { uint64_t queryId = pPlan->id.queryId; - SPhyNode* pPhyNode = pPlan->pNode; - + int32_t code = TSDB_CODE_SUCCESS; *pTaskInfo = createExecTaskInfo(queryId); + if (*pTaskInfo == NULL) { + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _complete; + } (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, readerHandle, queryId); if ((*pTaskInfo)->pRoot == NULL) { - return terrno; + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _complete; } - return TSDB_CODE_SUCCESS; + return code; + +_complete: + tfree(*pTaskInfo); + + terrno = code; + return code; } /** diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 4ff83641984fd3adc135c40f9cfdbab5d94bc4ec..41f50607cb47fbffd52bd5976e76824acfcb65c3 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -28,19 +28,20 @@ extern "C" { #define QNODE_TAGSCAN 1 #define QNODE_TABLESCAN 2 -#define QNODE_PROJECT 3 -#define QNODE_AGGREGATE 4 -#define QNODE_GROUPBY 5 -#define QNODE_LIMIT 6 -#define QNODE_JOIN 7 -#define QNODE_DISTINCT 8 -#define QNODE_SORT 9 -#define QNODE_UNION 10 -#define QNODE_TIMEWINDOW 11 -#define QNODE_SESSIONWINDOW 12 -#define QNODE_STATEWINDOW 13 -#define QNODE_FILL 14 -#define QNODE_MODIFY 15 +#define QNODE_STREAMSCAN 3 +#define QNODE_PROJECT 4 +#define QNODE_AGGREGATE 5 +#define QNODE_GROUPBY 6 +#define QNODE_LIMIT 7 +#define QNODE_JOIN 8 +#define QNODE_DISTINCT 9 +#define QNODE_SORT 10 +#define QNODE_UNION 11 +#define QNODE_TIMEWINDOW 12 +#define QNODE_SESSIONWINDOW 13 +#define QNODE_STATEWINDOW 14 +#define QNODE_FILL 15 +#define QNODE_MODIFY 16 typedef struct SQueryDistPlanNodeInfo { bool stableQuery; // super table query or not diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index 9c9ff8fe2b76b95e2a3dd573667cb7ae49d6bcc6..e817b764d5ff1aab8372718649722f65a097a654 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -121,6 +121,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla switch(type) { case QNODE_TAGSCAN: + case QNODE_STREAMSCAN: case QNODE_TABLESCAN: { SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo)); memcpy(info, pExtInfo, sizeof(SQueryTableInfo)); @@ -195,7 +196,12 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQ return pNode; } - SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info); + SQueryPlanNode* pNode = NULL; + if (pQueryInfo->info.continueQuery) { + pNode = createQueryNode(QNODE_STREAMSCAN, "StreamScan", NULL, 0, NULL, 0, info); + } else { + pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info); + } if (!pQueryInfo->info.projectionQuery) { SArray* p = pQueryInfo->exprList[0]; @@ -261,7 +267,6 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo* pNode->numOfExpr = num; pNode->pExpr = taosArrayInit(num, POINTER_BYTES); taosArrayAddAll(pNode->pExpr, p); -// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL); } } @@ -433,6 +438,7 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t len = len1 + totalLen; switch(pQueryNode->info.type) { + case QNODE_STREAMSCAN: case QNODE_TABLESCAN: { SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo; len1 = sprintf(buf + len, "%s #%" PRIu64, pInfo->tableName, pInfo->uid); @@ -643,7 +649,6 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) { assert(pQueryNode); - *str = calloc(1, 4096); int32_t len = sprintf(*str, "===== logic plan =====\n"); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 1e1a97df1f24daaef6f07a098c290ac2b06f1ca4..dd869d87b3839a014abe5dfea0173002ef047f1d 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -290,7 +290,8 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) { static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) { vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode); - return createUserTableScanNode(pPlanNode, pTable, OP_TableScan); + int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan; + return createUserTableScanNode(pPlanNode, pTable, type); } static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { @@ -326,6 +327,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { case QNODE_TAGSCAN: node = createTagScanNode(pPlanNode); break; + case QNODE_STREAMSCAN: case QNODE_TABLESCAN: node = createTableScanNode(pCxt, pPlanNode); break; diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 2abb90993b28e1304ce9b11a3376310b0ace654e..cf54fdec85c88ecc928e2f263a681ada6f514d07 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -829,6 +829,7 @@ static bool exchangeNodeFromJson(const cJSON* json, void* obj) { static bool specificPhyNodeToJson(const void* obj, cJSON* json) { const SPhyNode* phyNode = (const SPhyNode*)obj; switch (phyNode->info.type) { + case OP_StreamScan: case OP_TableScan: case OP_DataBlocksOptScan: case OP_TableSeqScan: diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 566356e25595b75cb2e58de8d3fc72a858ddf99e..07f1b0f85856c95d979f6a66c3872271ab927852 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -921,17 +921,17 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t code = qStringToSubplan(qwMsg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { - QW_TASK_ELOG("task string to subplan failed, code:%x", code); + QW_TASK_ELOG("task string to subplan failed, code:%s", tstrerror(code)); QW_ERR_JRET(code); } code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle); if (code) { - QW_TASK_ELOG("qCreateExecTask failed, code:%x", code); + QW_TASK_ELOG("qCreateExecTask failed, code:%s", tstrerror(code)); QW_ERR_JRET(code); } - if ((pTaskInfo && NULL == sinkHandle) || (NULL == pTaskInfo && sinkHandle)) { + if (NULL == sinkHandle || NULL == pTaskInfo) { QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index d8904cdfa9cef97055db5a8c9f7ee77dca2a2b64..ddfa73f0a52be60530f614d6f925b2ff3b381a86 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -666,12 +666,12 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) int32_t taskDone = 0; int32_t code = 0; - SCH_TASK_DLOG("taskOnFailure, code:%x", errCode); + SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode)); SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); if (!needRetry) { - SCH_TASK_ELOG("task failed and no more retry, code:%x", errCode); + SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode)); if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { code = schMoveTaskToFailList(pJob, pTask, &moved);