diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index c0e2ec91e0610ba53512aa1a27f8840a881ded15..d07f29c487b707452929c98580903983362fcae8 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -232,9 +232,9 @@ typedef struct SSelectStmt { char stmtName[TSDB_TABLE_NAME_LEN]; uint8_t precision; bool isEmptyResult; + bool isTimeOrderQuery; bool hasAggFuncs; bool hasRepeatScanFuncs; - bool isTimeOrderQuery; } SSelectStmt; typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 238600160a9ae8a9302fb061a0dcae0537d01745..aa2b32daab0d2733d2dcd398100d87e0f55815c1 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -634,6 +634,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_INVALID_TAGS_NUM TAOS_DEF_ERROR_CODE(0, 0x2643) #define TSDB_CODE_PAR_PERMISSION_DENIED TAOS_DEF_ERROR_CODE(0, 0x2644) #define TSDB_CODE_PAR_INVALID_STREAM_QUERY TAOS_DEF_ERROR_CODE(0, 0x2645) +#define TSDB_CODE_PAR_INVALID_INTERNAL_PK TAOS_DEF_ERROR_CODE(0, 0x2646) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index c146d42e052344f297fe55adb505aab39c230d90..f82d29d27eeb2f8b80baf56ff7c065025abcc3b5 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -46,6 +46,7 @@ SSchema* getTableTagSchema(const STableMeta* pTableMeta); int32_t getNumOfColumns(const STableMeta* pTableMeta); int32_t getNumOfTags(const STableMeta* pTableMeta); STableComInfo getTableInfo(const STableMeta* pTableMeta); +STableMeta* tableMetaDup(const STableMeta* pTableMeta); int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* errMsg, int16_t startColId); int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen); diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index b7a14a81c64c630bdb714b8ac7c78768f4b0ed2b..74a023f0cd5f4248e6547dc8ad29c828f6573e72 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -78,7 +78,7 @@ static bool checkUserName(SAstCreateContext* pCxt, SToken* pUserName) { static bool checkPassword(SAstCreateContext* pCxt, const SToken* pPasswordToken, char* pPassword) { if (NULL == pPasswordToken) { pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR; - } else if (pPasswordToken->n >= (TSDB_USET_PASSWORD_LEN - 2)) { + } else if (pPasswordToken->n >= (TSDB_USET_PASSWORD_LEN + 2)) { pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NAME_OR_PASSWD_TOO_LONG); } else { strncpy(pPassword, pPasswordToken->z, pPasswordToken->n); diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 3d069257c9db2be51e8d9e12eb14c5f2177f52c5..9c788202d302ca3f984f2c0d65db7b5ab61c8ae0 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -53,6 +53,7 @@ typedef struct SInsertParseContext { SHashObj* pTableBlockHashObj; // global SHashObj* pSubTableHashObj; // global SArray* pVgDataBlocks; // global + SHashObj* pTableNameHashObj; // global int32_t totalNum; SVnodeModifOpStmt* pOutput; SStmtCallback* pStmtCb; @@ -237,7 +238,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con return code; } -static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char *dbFname, bool isStb) { +static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) { SParseContext* pBasicCtx = pCxt->pComCxt; bool pass = false; @@ -252,6 +253,7 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char *db } else { CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &pCxt->pTableMeta)); + ASSERT(pCxt->pTableMeta->tableInfo.rowSize > 0); SVgroupInfo vg; CHECK_CODE( catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &vg)); @@ -260,9 +262,13 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char *db return TSDB_CODE_SUCCESS; } -static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char *dbFname) { return getTableMetaImpl(pCxt, name, dbFname, false); } +static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) { + return getTableMetaImpl(pCxt, name, dbFname, false); +} -static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char *dbFname) { return getTableMetaImpl(pCxt, name, dbFname, true); } +static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) { + return getTableMetaImpl(pCxt, name, dbFname, true); +} static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) { while (start < end) { @@ -863,7 +869,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); char stbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&sname, stbFName); - + CHECK_CODE(getSTableMeta(pCxt, &sname, stbFName)); if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) { return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); @@ -1063,8 +1069,8 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { // [...]; static int32_t parseInsertBody(SInsertParseContext* pCxt) { int32_t tbNum = 0; - char tbFName[TSDB_TABLE_FNAME_LEN]; - bool autoCreateTbl = false; + char tbFName[TSDB_TABLE_FNAME_LEN]; + bool autoCreateTbl = false; // for each table while (1) { @@ -1106,6 +1112,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); tNameExtractFullName(&name, tbFName); + CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName))); + // USING cluase if (TK_USING == sToken.type) { CHECK_CODE(parseUsingClause(pCxt, &name, tbFName)); @@ -1158,7 +1166,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); - (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj); + (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, + pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj); memset(&pCxt->tags, 0, sizeof(pCxt->tags)); pCxt->pVgroupsHashObj = NULL; @@ -1187,7 +1196,8 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { .pSql = (char*)pContext->pSql, .msg = {.buf = pContext->pMsg, .len = pContext->msgLen}, .pTableMeta = NULL, - .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false), + .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK), + .pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK), .totalNum = 0, .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT), .pStmtCb = pContext->pStmtCb}; @@ -1196,12 +1206,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj, &context.pTableBlockHashObj); } else { - context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); - context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + context.pTableBlockHashObj = + taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); } if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj || - NULL == context.pOutput) { + NULL == context.pTableNameHashObj || NULL == context.pOutput) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1214,6 +1225,10 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { if (NULL == *pQuery) { return TSDB_CODE_OUT_OF_MEMORY; } + (*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName)); + if (NULL == (*pQuery)->pTableList) { + return TSDB_CODE_OUT_OF_MEMORY; + } (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE; (*pQuery)->haveResultSet = false; (*pQuery)->msgType = TDMT_VND_SUBMIT; @@ -1226,6 +1241,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { if (TSDB_CODE_SUCCESS == code) { code = parseInsertBody(&context); } + if (TSDB_CODE_SUCCESS == code) { + SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL); + while (NULL != pTable) { + taosArrayPush((*pQuery)->pTableList, pTable); + pTable = taosHashIterate(context.pTableNameHashObj, pTable); + } + } destroyInsertParseContext(&context); return code; } @@ -1479,7 +1501,6 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu taosMemoryFree(pSTSchema); } #endif - } if (rowEnd) { @@ -1677,10 +1698,10 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* cols buildCreateTbReq(&smlHandle->createTblReq, tableName, row, pTableMeta->suid); STableDataBlocks* pDataBlock = NULL; - ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid), TSDB_DEFAULT_PAYLOAD_SIZE, - sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize, pTableMeta, - &pDataBlock, NULL, &smlHandle->createTblReq); - if(ret != TSDB_CODE_SUCCESS){ + ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid), + TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize, + pTableMeta, &pDataBlock, NULL, &smlHandle->createTblReq); + if (ret != TSDB_CODE_SUCCESS) { buildInvalidOperationMsg(&pBuf, "create data block error"); return ret; } diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index 8deaad6091953d4dbf24d03ed91fb3787e14c18b..677dbca0e95a67c0b6cb29e402964b2a20291c98 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -137,7 +137,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star } memset(dataBuf->pData, 0, sizeof(SSubmitBlk)); - dataBuf->pTableMeta = pTableMeta; + dataBuf->pTableMeta = tableMetaDup(pTableMeta); SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo; SSchema* pSchema = getTableColumnSchema(dataBuf->pTableMeta); @@ -465,7 +465,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p taosMemoryFreeClear(blkKeyInfo.pKeyTuple); return ret; } - + ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0); // the maximum expanded size in byte when a row-wise data is converted to SDataRow format int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3fed68188dd2dfa443f2a05ef94b1875fd61b6f0..c86c1ac2e97034a423d1949abd7e0a767be675e7 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -368,11 +368,15 @@ static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* p return TSDB_CODE_SUCCESS; } +static bool isInternalPrimaryKey(const SColumnNode* pCol) { + return PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && 0 == strcmp(pCol->colName, PK_TS_COL_INTERNAL_NAME); +} + static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) { bool found = false; if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) { const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta; - if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && 0 == strcmp(pCol->colName, PK_TS_COL_INTERNAL_NAME)) { + if (isInternalPrimaryKey(pCol)) { setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema, false, pCol); return true; } @@ -389,7 +393,9 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) { SNode* pNode; FOREACH(pNode, pProjectList) { SExprNode* pExpr = (SExprNode*)pNode; - if (0 == strcmp(pCol->colName, pExpr->aliasName)) { + if (0 == strcmp(pCol->colName, pExpr->aliasName) || + ((QUERY_NODE_COLUMN == nodeType(pExpr) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId) && + isInternalPrimaryKey(pCol))) { setColumnInfoByExpr(pTable, pExpr, pCol); found = true; break; @@ -433,7 +439,11 @@ static EDealRes translateColumnWithoutPrefix(STranslateContext* pCxt, SColumnNod } } if (!found) { - return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_COLUMN, pCol->colName); + if (isInternalPrimaryKey(pCol)) { + return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_INTERNAL_PK); + } else { + return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_COLUMN, pCol->colName); + } } return DEAL_RES_CONTINUE; } @@ -3655,6 +3665,9 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* destroyCreateTbReq(&req); return TSDB_CODE_OUT_OF_MEMORY; } + if (pStmt->ignoreExists) { + req.flags |= TD_CREATE_IF_NOT_EXISTS; + } SNode* pCol; col_id_t index = 0; FOREACH(pCol, pStmt->pCols) { @@ -3785,24 +3798,27 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) { return code; } -static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, const char* pDbName, - const char* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) { +static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt, SKVRow row, + uint64_t suid, SVgroupInfo* pVgInfo) { char dbFName[TSDB_DB_FNAME_LEN] = {0}; SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId}; - strcpy(name.dbname, pDbName); + strcpy(name.dbname, pStmt->dbName); tNameGetFullDbName(&name, dbFName); struct SVCreateTbReq req = {0}; req.type = TD_CHILD_TABLE; - req.name = strdup(pTableName); + req.name = strdup(pStmt->tableName); req.ctb.suid = suid; req.ctb.pTag = row; + if (pStmt->ignoreExists) { + req.flags |= TD_CREATE_IF_NOT_EXISTS; + } SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId)); if (pTableBatch == NULL) { SVgroupCreateTableBatch tBatch = {0}; tBatch.info = *pVgInfo; - strcpy(tBatch.dbName, pDbName); + strcpy(tBatch.dbName, pStmt->dbName); tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq)); taosArrayPush(tBatch.req.pArray, &req); @@ -3964,8 +3980,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info); } if (TSDB_CODE_SUCCESS == code) { - addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt->dbName, pStmt->tableName, row, - pSuperTableMeta->uid, &info); + addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt, row, pSuperTableMeta->uid, &info); } taosMemoryFreeClear(pSuperTableMeta); diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 715b9b97e655bd6f02bd704cacfd88e1650729ab..43aea8de7c7dfbdc9b2652d9fb4a8073ab1c42e8 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -146,6 +146,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Invalid binary/nchar column length"; case TSDB_CODE_PAR_INVALID_TAGS_NUM: return "Invalid number of tag columns"; + case TSDB_CODE_PAR_INVALID_INTERNAL_PK: + return "Invalid _c0 or _rowts expression"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: @@ -191,7 +193,7 @@ int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; } -SSchema *getTableColumnSchema(const STableMeta *pTableMeta) { +SSchema* getTableColumnSchema(const STableMeta* pTableMeta) { assert(pTableMeta != NULL); return (SSchema*)pTableMeta->schema; } @@ -226,6 +228,23 @@ STableComInfo getTableInfo(const STableMeta* pTableMeta) { return pTableMeta->tableInfo; } +static uint32_t getTableMetaSize(const STableMeta* pTableMeta) { + int32_t totalCols = 0; + if (pTableMeta->tableInfo.numOfColumns >= 0) { + totalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; + } + + return sizeof(STableMeta) + totalCols * sizeof(SSchema); +} + +STableMeta* tableMetaDup(const STableMeta* pTableMeta) { + size_t size = getTableMetaSize(pTableMeta); + + STableMeta* p = taosMemoryMalloc(size); + memcpy(p, pTableMeta, size); + return p; +} + int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen) { if (len <= 0 || dlen <= 0) return 0; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 30f6f03a6cd6214a131d4a56be29a77d407f7468..5ed7d9c1b54418231db1d727687942f9c117b307 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -151,20 +151,32 @@ static bool needOptimizeDynamicScan(const SFunctionNode* pFunc) { static int32_t osdGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSdrFuncs, SNodeList** pDsoFuncs) { SNodeList* pAllFuncs = osdGetAllFuncs(pScan->node.pParent); + SNodeList* pTmpSdrFuncs = NULL; + SNodeList* pTmpDsoFuncs = NULL; SNode* pFunc = NULL; + bool otherFunc = false; FOREACH(pFunc, pAllFuncs) { int32_t code = TSDB_CODE_SUCCESS; if (needOptimizeDataRequire((SFunctionNode*)pFunc)) { - code = nodesListMakeStrictAppend(pSdrFuncs, nodesCloneNode(pFunc)); + code = nodesListMakeStrictAppend(&pTmpSdrFuncs, nodesCloneNode(pFunc)); } else if (needOptimizeDynamicScan((SFunctionNode*)pFunc)) { - code = nodesListMakeStrictAppend(pDsoFuncs, nodesCloneNode(pFunc)); + code = nodesListMakeStrictAppend(&pTmpDsoFuncs, nodesCloneNode(pFunc)); + } else { + otherFunc = true; } if (TSDB_CODE_SUCCESS != code) { - nodesDestroyList(*pSdrFuncs); - nodesDestroyList(*pDsoFuncs); + nodesDestroyList(pTmpSdrFuncs); + nodesDestroyList(pTmpDsoFuncs); return code; } } + if (otherFunc) { + nodesDestroyList(pTmpSdrFuncs); + nodesDestroyList(pTmpDsoFuncs); + } else { + *pSdrFuncs = pTmpSdrFuncs; + *pDsoFuncs = pTmpDsoFuncs; + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 36625b28fb3d101a6c1ae0d5a33a5fda3bbecdb9..3c83d9f53a8669535eda1dc883af2951e9470d54 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -18,7 +18,7 @@ static char* getUsageErrFormat(int32_t errCode) { switch (errCode) { case TSDB_CODE_PLAN_EXPECTED_TS_EQUAL: - return "l.ts = r.ts is expected in join expression"; + return "left.ts = right.ts is expected in join expression"; case TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN: return "not support cross join"; default: diff --git a/source/libs/planner/test/planOptimizeTest.cpp b/source/libs/planner/test/planOptimizeTest.cpp index 6c7b1d0a0e9bb2f757f36915af33065566b7c6d9..77f9b5846c1edbe879c21d90c978a01232c2444e 100644 --- a/source/libs/planner/test/planOptimizeTest.cpp +++ b/source/libs/planner/test/planOptimizeTest.cpp @@ -28,6 +28,8 @@ TEST_F(PlanOptimizeTest, optimizeScanData) { run("SELECT COUNT(c1) FROM t1"); run("SELECT COUNT(CAST(c1 AS BIGINT)) FROM t1"); + + run("SELECT PERCENTILE(c1, 40), COUNT(*) FROM t1"); } TEST_F(PlanOptimizeTest, orderByPrimaryKey) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 1ce074c49fb08f13688d31f0dc4a2f4e8071b820..2710e54f9533ce9ed34552bfa0745d5854c7ad34 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1092,11 +1092,10 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) { for (int32_t i = 0; i < batchRsp.nRsps; ++i) { SVCreateTbRsp *rsp = batchRsp.pRsps + i; - if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) { - tDecoderClear(&coder); - SCH_ERR_JRET(rsp->code); - } else if (TSDB_CODE_SUCCESS != rsp->code) { + if (TSDB_CODE_SUCCESS != rsp->code) { code = rsp->code; + tDecoderClear(&coder); + SCH_ERR_JRET(code); } } } @@ -1117,11 +1116,10 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) { for (int32_t i = 0; i < batchRsp.nRsps; ++i) { SVDropTbRsp *rsp = batchRsp.pRsps + i; - if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) { - tDecoderClear(&coder); - SCH_ERR_JRET(rsp->code); - } else if (TSDB_CODE_SUCCESS != rsp->code) { + if (TSDB_CODE_SUCCESS != rsp->code) { code = rsp->code; + tDecoderClear(&coder); + SCH_ERR_JRET(code); } } } @@ -1137,7 +1135,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_JRET(rspCode); if (msg) { - SDecoder coder = {0}; + SDecoder coder = {0}; SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp)); tDecoderInit(&coder, msg, msgSize); code = tDecodeSSubmitRsp(&coder, rsp); @@ -1146,10 +1144,21 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch tFreeSSubmitRsp(rsp); SCH_ERR_JRET(code); } - + + if (rsp->nBlocks > 0) { + for (int32_t i = 0; i < rsp->nBlocks; ++i) { + SSubmitBlkRsp *blk = rsp->pBlocks + i; + if (TSDB_CODE_SUCCESS != blk->code) { + code = blk->code; + tFreeSSubmitRsp(rsp); + SCH_ERR_JRET(code); + } + } + } + atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows); - + if (pJob->attr.needRes) { SCH_LOCK(SCH_WRITE, &pJob->resLock); if (pJob->resData) { @@ -1160,7 +1169,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks)); taosMemoryFree(rsp->pBlocks); taosMemoryFree(rsp); - } else { + } else { pJob->resData = rsp; } SCH_UNLOCK(SCH_WRITE, &pJob->resLock);