diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 805b4c7264e57794cfd31d7207784d47edde8cbc..11b91f056871eb5065d758c9c8d8a58b48828e19 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -101,6 +101,7 @@ void qmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QNODE_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, qmProcessFetchMsg, QNODE_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, qmProcessFetchMsg, QNODE_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, qmProcessFetchMsg, QNODE_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, qmProcessFetchMsg, QNODE_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, QNODE_HANDLE); diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 7a226a4c6bcc1c3070b18737455c072822c3820b..b21141001aa96c8d26c4a1297f9f2d5fe19a56c7 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -84,6 +84,8 @@ int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) { // return vnodeGetTableMeta(pQnode, pMsg); case TDMT_VND_CONSUME: // return tqProcessConsumeReq(pQnode->pTq, pMsg); + case TDMT_VND_QUERY_HEARTBEAT: + return qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg); default: qError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 997397314fc253d55530d9729c3961cc1357950b..64206fc10aac0ab9835d65333322657a0ccaecbf 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -60,4 +60,5 @@ void dsScheduleProcess(void* ahandle, void* pItem) { void dsDestroyDataSinker(DataSinkHandle handle) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; pHandleImpl->fDestroy(pHandleImpl); + taosMemoryFree(pHandleImpl); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 943d4b278396ccf2490fddd779864be4a3e026f4..d78ae2b79b46bcbb27d6311ca3d1b596c6f2a034 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4687,7 +4687,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT char* p = taosMemoryCalloc(1, 128); snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId); - pTaskInfo->id.str = strdup(p); + pTaskInfo->id.str = p; return pTaskInfo; } @@ -5301,6 +5301,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { // taosArrayDestroy(pTaskInfo->summary.queryProfEvents); // taosHashCleanup(pTaskInfo->summary.operatorProfResults); + destroyOperatorInfo(pTaskInfo->pRoot); taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 0bceb84f4a2807406b1877d5a938316934d009b1..319b68016156716749b38725004eae7b45756804 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1337,7 +1337,9 @@ void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal) { case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: - pVal->pz = pNode->datum.p; + pVal->pz = taosMemoryMalloc(pVal->nLen + VARSTR_HEADER_SIZE + 1); + memcpy(pVal->pz, pNode->datum.p, pVal->nLen + VARSTR_HEADER_SIZE); + pVal->pz[pVal->nLen + VARSTR_HEADER_SIZE] = 0; break; case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_DECIMAL: diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index ca471262ff747b2466159a93571ac77120b3b9ac..3ecf9878119be9891e74e56e4d59afc23bb99b34 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -25,6 +25,7 @@ extern "C" { #include "tlockfree.h" #include "ttimer.h" #include "tref.h" +#include "plannodes.h" #define QW_DEFAULT_SCHEDULER_NUMBER 10000 #define QW_DEFAULT_TASK_NUMBER 10000 @@ -131,8 +132,9 @@ typedef struct SQWTaskCtx { int8_t events[QW_EVENT_MAX]; - void *taskHandle; - void *sinkHandle; + void *taskHandle; + void *sinkHandle; + SSubplan *plan; } SQWTaskCtx; typedef struct SQWSchStatus { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index bd7ee6321aa067357d36d6739ea252ce9e00fdc1..717958c033c491304ed9c7ceba909a49b1f03e3f 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -424,6 +424,11 @@ void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { dsDestroyDataSinker(ctx->sinkHandle); ctx->sinkHandle = NULL; } + + if (ctx->plan) { + nodesDestroyNode(ctx->plan); + ctx->plan = NULL; + } } int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { @@ -440,6 +445,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { atomic_store_ptr(&ctx->taskHandle, NULL); atomic_store_ptr(&ctx->sinkHandle, NULL); + atomic_store_ptr(&ctx->plan, NULL); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP); @@ -922,7 +928,7 @@ _return: int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) { int32_t code = 0; bool queryRsped = false; - struct SSubplan *plan = NULL; + SSubplan* plan = NULL; SQWPhaseInput input = {0}; qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; @@ -950,6 +956,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex QW_ERR_JRET(code); } + ctx->plan = plan; + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -1428,6 +1436,10 @@ void qwCloseRef(void) { taosWUnLockLatch(&gQwMgmt.lock); } +void qwDestroySchStatus(SQWSchStatus *pStatus) { + taosHashCleanup(pStatus->tasksHash); +} + void qwDestroyImpl(void *pMgmt) { SQWorker *mgmt = (SQWorker *)pMgmt; @@ -1439,6 +1451,13 @@ void qwDestroyImpl(void *pMgmt) { // TODO FREE ALL taosHashCleanup(mgmt->ctxHash); + + void *pIter = taosHashIterate(mgmt->schHash, NULL); + while (pIter) { + SQWSchStatus *sch = (SQWSchStatus *)pIter; + qwDestroySchStatus(sch); + pIter = taosHashIterate(mgmt->schHash, pIter); + } taosHashCleanup(mgmt->schHash); taosMemoryFree(mgmt); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 56b5ddd38a2e5698af8af77b2cecf458fff5b357..604764388186722eb7d3ea374ee9e0b867bf1899 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -791,6 +791,10 @@ static void uvDestroyConn(uv_handle_t* handle) { } QUEUE_REMOVE(&conn->queue); taosMemoryFree(conn->pTcp); + if (conn->regArg.init == 1) { + transFreeMsg(conn->regArg.msg.pCont); + conn->regArg.init = 0; + } taosMemoryFree(conn); if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index 828216d1d3a28608f5d990ef0658dea3ad8fae7c..99e546fcd0b9a32bd457a16926ac5d371df211a6 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -398,7 +398,7 @@ void bpAppendOperatorParam(BindData *data, int32_t *len, int32_t dataType, int32 } } -void generateQuerySQL(BindData *data, int32_t tblIdx) { +void generateQueryCondSQL(BindData *data, int32_t tblIdx) { int32_t len = sprintf(data->sql, "select * from %s%d where ", bpTbPrefix, tblIdx); if (!gCurCase->fullCol) { for (int c = 0; c < gCurCase->bindColNum; ++c) { @@ -462,6 +462,72 @@ void generateQuerySQL(BindData *data, int32_t tblIdx) { } } + +void generateQueryMiscSQL(BindData *data, int32_t tblIdx) { + int32_t len = sprintf(data->sql, "select * from %s%d where ", bpTbPrefix, tblIdx); + if (!gCurCase->fullCol) { + for (int c = 0; c < gCurCase->bindColNum; ++c) { + if (c) { + len += sprintf(data->sql + len, " and "); + } + switch (data->pBind[c].buffer_type) { + case TSDB_DATA_TYPE_BOOL: + len += sprintf(data->sql + len, "booldata"); + break; + case TSDB_DATA_TYPE_TINYINT: + len += sprintf(data->sql + len, "tinydata"); + break; + case TSDB_DATA_TYPE_SMALLINT: + len += sprintf(data->sql + len, "smalldata"); + break; + case TSDB_DATA_TYPE_INT: + len += sprintf(data->sql + len, "intdata"); + break; + case TSDB_DATA_TYPE_BIGINT: + len += sprintf(data->sql + len, "bigdata"); + break; + case TSDB_DATA_TYPE_FLOAT: + len += sprintf(data->sql + len, "floatdata"); + break; + case TSDB_DATA_TYPE_DOUBLE: + len += sprintf(data->sql + len, "doubledata"); + break; + case TSDB_DATA_TYPE_VARCHAR: + len += sprintf(data->sql + len, "binarydata"); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + len += sprintf(data->sql + len, "ts"); + break; + case TSDB_DATA_TYPE_NCHAR: + len += sprintf(data->sql + len, "nchardata"); + break; + case TSDB_DATA_TYPE_UTINYINT: + len += sprintf(data->sql + len, "utinydata"); + break; + case TSDB_DATA_TYPE_USMALLINT: + len += sprintf(data->sql + len, "usmalldata"); + break; + case TSDB_DATA_TYPE_UINT: + len += sprintf(data->sql + len, "uintdata"); + break; + case TSDB_DATA_TYPE_UBIGINT: + len += sprintf(data->sql + len, "ubigdata"); + break; + default: + printf("!!!invalid col type:%d", data->pBind[c].buffer_type); + exit(1); + } + + bpAppendOperatorParam(data, &len, data->pBind[c].buffer_type, c); + } + } + + if (gCaseCtrl.printStmtSql) { + printf("\tSTMT SQL: %s\n", data->sql); + } +} + + void generateErrorSQL(BindData *data, int32_t tblIdx) { int32_t len = 0; data->sql = taosMemoryCalloc(1, 1024); @@ -677,7 +743,7 @@ int32_t prepareInsertData(BindData *data) { return 0; } -int32_t prepareQueryData(BindData *data, int32_t tblIdx) { +int32_t prepareQueryCondData(BindData *data, int32_t tblIdx) { static int64_t tsData = 1591060628000; uint64_t bindNum = gCurCase->rowNum / gCurCase->bindRowNum; @@ -735,6 +801,63 @@ int32_t prepareQueryData(BindData *data, int32_t tblIdx) { } +int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) { + static int64_t tsData = 1591060628000; + uint64_t bindNum = gCurCase->rowNum / gCurCase->bindRowNum; + + data->colNum = 0; + data->colTypes = taosMemoryCalloc(30, sizeof(int32_t)); + data->sql = taosMemoryCalloc(1, 1024); + data->pBind = taosMemoryCalloc(bindNum*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND)); + data->tsData = taosMemoryMalloc(bindNum * sizeof(int64_t)); + data->boolData = taosMemoryMalloc(bindNum * sizeof(bool)); + data->tinyData = taosMemoryMalloc(bindNum * sizeof(int8_t)); + data->utinyData = taosMemoryMalloc(bindNum * sizeof(uint8_t)); + data->smallData = taosMemoryMalloc(bindNum * sizeof(int16_t)); + data->usmallData = taosMemoryMalloc(bindNum * sizeof(uint16_t)); + data->intData = taosMemoryMalloc(bindNum * sizeof(int32_t)); + data->uintData = taosMemoryMalloc(bindNum * sizeof(uint32_t)); + data->bigData = taosMemoryMalloc(bindNum * sizeof(int64_t)); + data->ubigData = taosMemoryMalloc(bindNum * sizeof(uint64_t)); + data->floatData = taosMemoryMalloc(bindNum * sizeof(float)); + data->doubleData = taosMemoryMalloc(bindNum * sizeof(double)); + data->binaryData = taosMemoryMalloc(bindNum * gVarCharSize); + data->binaryLen = taosMemoryMalloc(bindNum * sizeof(int32_t)); + if (gCurCase->bindNullNum) { + data->isNull = taosMemoryCalloc(bindNum, sizeof(char)); + } + + for (int32_t i = 0; i < bindNum; ++i) { + data->tsData[i] = tsData + tblIdx*gCurCase->rowNum + rand()%gCurCase->rowNum; + data->boolData[i] = (bool)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum); + data->tinyData[i] = (int8_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum); + data->utinyData[i] = (uint8_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum); + data->smallData[i] = (int16_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum); + data->usmallData[i] = (uint16_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum); + data->intData[i] = (int32_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum); + data->uintData[i] = (uint32_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum); + data->bigData[i] = (int64_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum); + data->ubigData[i] = (uint64_t)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum); + data->floatData[i] = (float)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum); + data->doubleData[i] = (double)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum); + memset(data->binaryData + gVarCharSize * i, 'a'+i%26, gVarCharLen); + if (gCurCase->bindNullNum) { + data->isNull[i] = i % 2; + } + data->binaryLen[i] = gVarCharLen; + } + + for (int b = 0; b < bindNum; b++) { + for (int c = 0; c < gCurCase->bindColNum; ++c) { + prepareColData(data, b*gCurCase->bindColNum+c, b*gCurCase->bindRowNum, c); + } + } + + generateQueryMiscSQL(data, tblIdx); + + return 0; +} + void destroyData(BindData *data) { @@ -1385,7 +1508,7 @@ int querySUBTTest1(TAOS_STMT *stmt, TAOS *taos) { for (int32_t t = 0; t< gCurCase->tblNum; ++t) { memset(&data, 0, sizeof(data)); - prepareQueryData(&data, t); + prepareQueryCondData(&data, t); int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ @@ -1431,7 +1554,7 @@ int querySUBTTest2(TAOS_STMT *stmt, TAOS *taos) { for (int32_t t = 0; t< gCurCase->tblNum; ++t) { memset(&data, 0, sizeof(data)); - prepareQueryData(&data, t); + prepareQueryMiscData(&data, t); int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){