From 1850dc037183e05c04834f45ee33a03a909bf730 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 28 Apr 2022 11:47:55 +0800 Subject: [PATCH] stmt query --- include/libs/planner/planner.h | 2 +- source/client/inc/clientStmt.h | 27 ++++++++----- source/client/src/clientStmt.c | 54 +++++++++++++++++++------- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +-- source/libs/parser/src/parAstCreater.c | 5 +++ source/libs/planner/src/planner.c | 24 +++++++++++- source/libs/scalar/src/scalar.c | 5 ++- source/libs/scalar/src/sclfunc.c | 5 --- tests/script/api/batchprepare.c | 11 +++--- 9 files changed, 95 insertions(+), 44 deletions(-) diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 0f6a5106be..adbd84b044 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -50,7 +50,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo // @pSource one execution location of this group of datasource subplans int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource); -int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx); +int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx, uint64_t queryId); // Convert to subplan to string for the scheduler to send to the executor int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen); diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index 061d757551..ab27d7f00c 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -46,6 +46,12 @@ typedef struct SStmtTableCache { void* boundTags; } SStmtTableCache; +typedef struct SQueryFields { + TAOS_FIELD* fields; + TAOS_FIELD* userFields; + uint32_t numOfCols; +} SQueryFields; + typedef struct SStmtBindInfo { bool needParse; uint64_t tbUid; @@ -66,16 +72,17 @@ typedef struct SStmtExecInfo { } SStmtExecInfo; typedef struct SStmtSQLInfo { - STMT_TYPE type; - STMT_STATUS status; - bool autoCreate; - uint64_t runTimes; - SHashObj* pTableCache; //SHash - SQuery* pQuery; - char* sqlStr; - int32_t sqlLen; - SArray* nodeList; - SQueryPlan* pQueryPlan; + STMT_TYPE type; + STMT_STATUS status; + bool autoCreate; + uint64_t runTimes; + SHashObj* pTableCache; //SHash + SQuery* pQuery; + char* sqlStr; + int32_t sqlLen; + SArray* nodeList; + SQueryPlan* pQueryPlan; + SQueryFields fields; } SStmtSQLInfo; typedef struct STscStmt { diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 298a06572e..ca6a11a668 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -73,6 +73,22 @@ int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) { return TSDB_CODE_SUCCESS; } +int32_t stmtBackupQueryFields(STscStmt* pStmt) { + SQueryFields *pFields = &pStmt->sql.fields; + int32_t size = pFields->numOfCols * sizeof(TAOS_FIELD); + + pFields->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols; + pFields->fields = taosMemoryMalloc(size); + pFields->userFields = taosMemoryMalloc(size); + if (NULL == pFields->fields || NULL == pFields->userFields) { + STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY); + } + memcpy(pFields->fields, pStmt->exec.pRequest->body.resInfo.fields, size); + memcpy(pFields->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size); + + return TSDB_CODE_SUCCESS; +} + int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) { STscStmt* pStmt = (STscStmt*)stmt; @@ -258,37 +274,42 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { STableMeta *pTableMeta = NULL; SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta)); - - if (pTableMeta->uid == pStmt->bInfo.tbUid) { + uint64_t uid = pTableMeta->uid; + uint64_t suid = pTableMeta->suid; + int8_t tableType = pTableMeta->tableType; + taosMemoryFree(pTableMeta); + + if (uid == pStmt->bInfo.tbUid) { pStmt->bInfo.needParse = false; - + return TSDB_CODE_SUCCESS; } - if (taosHashGet(pStmt->exec.pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid))) { - SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid)); + if (taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid))) { + SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid)); if (NULL == pCache) { - tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", pTableMeta->uid); + tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", uid); + STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR); } pStmt->bInfo.needParse = false; - pStmt->bInfo.tbUid = pTableMeta->uid; - pStmt->bInfo.tbSuid = pTableMeta->suid; - pStmt->bInfo.tbType = pTableMeta->tableType; + pStmt->bInfo.tbUid = uid; + pStmt->bInfo.tbSuid = suid; + pStmt->bInfo.tbType = tableType; pStmt->bInfo.boundTags = pCache->boundTags; - + return TSDB_CODE_SUCCESS; } - SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid)); + SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid)); if (pCache) { pStmt->bInfo.needParse = false; - pStmt->bInfo.tbUid = pTableMeta->uid; - pStmt->bInfo.tbSuid = pTableMeta->suid; - pStmt->bInfo.tbType = pTableMeta->tableType; + pStmt->bInfo.tbUid = uid; + pStmt->bInfo.tbSuid = suid; + pStmt->bInfo.tbType = tableType; pStmt->bInfo.boundTags = pCache->boundTags; STableDataBlocks* pNewBlock = NULL; @@ -475,9 +496,10 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) { STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList)); pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag; pStmt->exec.pRequest->body.pDag = NULL; + STMT_ERR_RET(stmtBackupQueryFields(pStmt)); } - STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx)); + STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx, pStmt->exec.pRequest->requestId)); } STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid)); @@ -549,6 +571,8 @@ int stmtClose(TAOS_STMT *stmt) { STscStmt* pStmt = (STscStmt*)stmt; STMT_RET(stmtCleanSQLInfo(pStmt)); + + taosMemoryFree(stmt); } const char *stmtErrstr(TAOS_STMT *stmt) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 483475601b..dd56c0cc51 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1521,8 +1521,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit } else if (isRow1DataRow) { colIdOfRow1 = pSchema1->columns[j].colId; } else { - SKvRowIdx* pColIdx = tdKvRowColIdxAt(row1, j); - colIdOfRow1 = pColIdx->colId; + colIdOfRow1 = tdKvRowColIdAt(row1, j); } int32_t colIdOfRow2; @@ -1531,8 +1530,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit } else if (isRow2DataRow) { colIdOfRow2 = pSchema2->columns[k].colId; } else { - SKvRowIdx* pColIdx = tdKvRowColIdxAt(row2, k); - colIdOfRow2 = pColIdx->colId; + colIdOfRow2 = tdKvRowColIdAt(row2, j); } if (colIdOfRow1 == colIdOfRow2) { diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 6f30dca0c5..a92455a447 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -374,6 +374,11 @@ SNode* createCastFunctionNode(SAstCreateContext* pCxt, SNode* pExpr, SDataType d CHECK_OUT_OF_MEM(func); strcpy(func->functionName, "cast"); func->node.resType = dt; + if (TSDB_DATA_TYPE_BINARY == dt.type) { + func->node.resType.bytes += 2; + } else if (TSDB_DATA_TYPE_NCHAR == dt.type) { + func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE + 2; + } nodesListMakeAppend(&func->pParameterList, pExpr); return (SNode*)func; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index be31b48fda..70a969584a 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -165,15 +165,35 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) { return TSDB_CODE_SUCCESS; } -int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx) { +static EDealRes updatePlanQueryId(SNode* pNode, void* pContext) { + int64_t queryId = *(uint64_t *)pContext; + + if (QUERY_NODE_PHYSICAL_PLAN == nodeType(pNode)) { + SQueryPlan* planNode = (SQueryPlan*)pNode; + planNode->queryId = queryId; + } else if (QUERY_NODE_PHYSICAL_SUBPLAN == nodeType(pNode)) { + SSubplan* subplanNode = (SSubplan*)pNode; + subplanNode->id.queryId = queryId; + } + + return DEAL_RES_CONTINUE; +} + +int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx, uint64_t queryId) { + int32_t size = taosArrayGetSize(pPlan->pPlaceholderValues); + if (colIdx < 0) { - int32_t size = taosArrayGetSize(pPlan->pPlaceholderValues); for (int32_t i = 0; i < size; ++i) { setValueByBindParam((SValueNode*)taosArrayGetP(pPlan->pPlaceholderValues, i), pParams + i); } } else { setValueByBindParam((SValueNode*)taosArrayGetP(pPlan->pPlaceholderValues, colIdx), pParams); } + + if (colIdx < 0 || ((colIdx + 1) == size)) { + nodesWalkPhysiPlan((SNode*)pPlan, updatePlanQueryId, &queryId); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index cd5fe47c17..a6656dc87d 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -590,7 +590,10 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { if (colDataIsNull_s(output.columnData, 0)) { res->node.resType.type = TSDB_DATA_TYPE_NULL; } else { - res->node.resType = node->node.resType; + res->node.resType.type = output.columnData->info.type; + res->node.resType.bytes = output.columnData->info.bytes; + res->node.resType.scale = output.columnData->info.scale; + res->node.resType.precision = output.columnData->info.precision; int32_t type = output.columnData->info.type; if (IS_VAR_DATA_TYPE(type)) { res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1); diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 28514c3605..b19c844f83 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -648,11 +648,6 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int16_t outputType = GET_PARAM_TYPE(&pOutput[0]); int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]); - if (IS_VAR_DATA_TYPE(outputType)) { - int32_t factor = (TSDB_DATA_TYPE_NCHAR == outputType) ? TSDB_NCHAR_SIZE : 1; - outputLen = outputLen * factor + VARSTR_HEADER_SIZE; - } - char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1); char *output = outputBuf; diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index e65d298690..b1ab5253ad 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -168,7 +168,7 @@ typedef struct { int32_t caseRunNum; // total run case num } CaseCtrl; -#if 0 +#if 1 CaseCtrl gCaseCtrl = { .bindNullNum = 0, .prepareStb = false, @@ -190,8 +190,7 @@ CaseCtrl gCaseCtrl = { .bindColTypeNum = tListLen(bindColTypeList), .bindColTypeList = bindColTypeList, -// .caseIdx = 22, - .caseIdx = 2, + .caseIdx = 22, .caseNum = 1, .caseRunNum = 1, @@ -211,10 +210,10 @@ CaseCtrl gCaseCtrl = { .checkParamNum = false, .printRes = true, .runTimes = 0, - .caseIdx = 2, - .caseNum = 1, + .caseIdx = -1, + .caseNum = -1, .caseRunIdx = -1, - .caseRunNum = 1, + .caseRunNum = -1, }; #endif -- GitLab