diff --git a/include/common/taosdef.h b/include/common/taosdef.h index e1f8832edf146f71deed320a21c24c5df5a25292..5384082da32716217dc72ba599c337b7c73fe7bb 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -89,7 +89,7 @@ extern char *qtypeStr[]; #define TSDB_PORT_HTTP 11 -#undef TD_DEBUG_PRINT_ROW +#define TD_DEBUG_PRINT_ROW #ifdef __cplusplus } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ff9c2a07393d6a4700a32b5c0ae2c4eb6d8e6da1..0b803bb5728e250a0be566156d9b17edcc4194c9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -254,10 +254,7 @@ int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema); typedef struct { int8_t hashMeta; int64_t uid; - union { - char* ename; // used for encode - const char* dname; // used for decode - }; + char* tblFName; int32_t numOfRows; int32_t affectedRows; } SSubmitBlkRsp; @@ -274,6 +271,7 @@ typedef struct { int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp); int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp); +void tFreeSSubmitRsp(SSubmitRsp *pRsp); #define SCHEMA_SMA_ON 0x1 #define SCHEMA_IDX_ON 0x2 diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 460749243c9b4e7352b409869d613955c1ae0916..b3f35025d1a547353a49903a8e02b7b46a5e9aa4 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -56,6 +56,7 @@ typedef struct SQueryResult { uint64_t numOfRows; int32_t msgSize; char *msg; + void *res; } SQueryResult; typedef struct STaskInfo { @@ -71,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg); * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes); +int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, bool needRes, SQueryResult *pRes); /** * Process the query job, generated according to the query physical plan. diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b021651c16640e8ee48554d59f735be45515ec86..ab11cfa6fde8df0d12946540a57809e81d0de0bf 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -307,9 +307,9 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v // --- mq void hbMgrInitMqHbRspHandle(); -SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery); +SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res); int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList); -int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList); +int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res); int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); #ifdef __cplusplus diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index 153cb84986ccb2777cbaa1921197b5ac7f9c052e..e5efafc21445dd912501344c6e2c0e0ccd3203b9 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -55,6 +55,7 @@ typedef struct SStmtQueryResInfo { typedef struct SStmtBindInfo { bool needParse; + bool inExecCache; uint64_t tbUid; uint64_t tbSuid; int32_t sBindRowNum; @@ -64,6 +65,7 @@ typedef struct SStmtBindInfo { void* boundTags; char tbName[TSDB_TABLE_FNAME_LEN];; char tbFName[TSDB_TABLE_FNAME_LEN]; + char stbFName[TSDB_TABLE_FNAME_LEN]; SName sname; } SStmtBindInfo; @@ -72,12 +74,12 @@ typedef struct SStmtExecInfo { SRequestObj* pRequest; SHashObj* pVgHash; SHashObj* pBlockHash; + bool autoCreateTbl; } SStmtExecInfo; typedef struct SStmtSQLInfo { STMT_TYPE type; STMT_STATUS status; - bool autoCreateTbl; uint64_t runTimes; SHashObj* pTableCache; //SHash SQuery* pQuery; @@ -86,6 +88,7 @@ typedef struct SStmtSQLInfo { SArray* nodeList; SQueryPlan* pQueryPlan; SStmtQueryResInfo queryRes; + bool autoCreateTbl; } SStmtSQLInfo; typedef struct STscStmt { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 97c7d2bad1bcf54fad6c4a49922b5e17c06f7166..d2e95d5eab4fcd4ec02c4674659ad177a0ce4d82 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -286,12 +286,12 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { pResInfo->precision = precision; } -int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) { +int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, - pRequest->metric.start, &res); + pRequest->metric.start, NULL != pRes, &res); if (code != TSDB_CODE_SUCCESS) { if (pRequest->body.queryJob != 0) { schedulerFreeJob(pRequest->body.queryJob); @@ -310,6 +310,10 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList } } + if (pRes) { + *pRes = res.res; + } + pRequest->code = res.code; terrno = res.code; return pRequest->code; @@ -320,7 +324,7 @@ int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList) return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList); } -SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery) { +SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) { if (TSDB_CODE_SUCCESS == code) { switch (pQuery->execMode) { case QUERY_EXEC_MODE_LOCAL: @@ -333,7 +337,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList); if (TSDB_CODE_SUCCESS == code) { - code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList); + code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, res); } taosArrayDestroy(pNodeList); break; @@ -373,7 +377,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) { return pRequest; } - return launchQueryImpl(pRequest, pQuery, code, false); + return launchQueryImpl(pRequest, pQuery, code, false, NULL); } int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 59a7cbee1a4cbac291df6ec24c6579a7fa1a21d7..3656edaff39e7c7bf9f8cbb98d3e56364260cc41 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1668,7 +1668,7 @@ static int32_t smlInsertData(SSmlHandle* info) { smlBuildOutput(info->exec, info->pVgHash); info->cost.insertRpcTime = taosGetTimestampUs(); - launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true); + launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true, NULL); info->affectedRows = taos_affected_rows(info->pRequest); return info->pRequest->code; diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 61a850802a70112e69b06a5fac5b68f78332168c..d8262bc9a2e922b49400dd3250dc35c40f4a06c8 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -136,11 +136,12 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, return TSDB_CODE_SUCCESS; } -int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) { +int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash, bool autoCreateTbl) { STscStmt* pStmt = (STscStmt*)stmt; pStmt->exec.pVgHash = pVgHash; pStmt->exec.pBlockHash = pBlockHash; + pStmt->exec.autoCreateTbl = autoCreateTbl; return TSDB_CODE_SUCCESS; } @@ -149,10 +150,10 @@ int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char STscStmt* pStmt = (STscStmt*)stmt; STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbFName)); - STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash)); - - pStmt->sql.autoCreateTbl = autoCreateTbl; + STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash, autoCreateTbl)); + pStmt->sql.autoCreateTbl = autoCreateTbl; + return TSDB_CODE_SUCCESS; } @@ -192,8 +193,12 @@ int32_t stmtCacheBlock(STscStmt *pStmt) { return TSDB_CODE_OUT_OF_MEMORY; } - pStmt->bInfo.boundTags = NULL; - + if (pStmt->sql.autoCreateTbl) { + pStmt->bInfo.tagsCached = true; + } else { + pStmt->bInfo.boundTags = NULL; + } + return TSDB_CODE_SUCCESS; } @@ -235,6 +240,7 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) { pStmt->bInfo.tbSuid = 0; pStmt->bInfo.tbType = 0; pStmt->bInfo.needParse = true; + pStmt->bInfo.inExecCache = false; pStmt->bInfo.tbName[0] = 0; pStmt->bInfo.tbFName[0] = 0; @@ -259,7 +265,7 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) { char *key = taosHashGetKey(pIter, &keyLen); STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks); - if (keepTable && (pMeta->uid == pStmt->bInfo.tbUid)) { + if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) { STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true)); pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); @@ -272,6 +278,8 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) { pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); } + pStmt->exec.autoCreateTbl = false; + if (keepTable) { return TSDB_CODE_SUCCESS; } @@ -315,11 +323,45 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) { int32_t stmtGetFromCache(STscStmt* pStmt) { pStmt->bInfo.needParse = true; + pStmt->bInfo.inExecCache = false; + + STableDataBlocks *pBlockInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + if (pBlockInExec) { + pStmt->bInfo.inExecCache = true; + + if (pStmt->sql.autoCreateTbl) { + return TSDB_CODE_SUCCESS; + } + } if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) { + if (pStmt->bInfo.inExecCache) { + ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1); + pStmt->bInfo.needParse = false; + return TSDB_CODE_SUCCESS; + } + return TSDB_CODE_SUCCESS; } + if (pStmt->sql.autoCreateTbl) { + SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid)); + if (pCache) { + pStmt->bInfo.needParse = false; + + STableDataBlocks* pNewBlock = NULL; + STMT_ERR_RET(qRebuildStmtDataBlock(&pNewBlock, pCache->pDataBlock, 0)); + + if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) { + STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; + } + + STMT_RET(stmtCleanBindInfo(pStmt)); + } + if (NULL == pStmt->pCatalog) { STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog)); } @@ -347,7 +389,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } - if (taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName))) { + if (pStmt->bInfo.inExecCache) { SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid)); if (NULL == pCache) { tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash", pStmt->bInfo.tbFName, uid, cacheUid); @@ -484,11 +526,13 @@ int stmtSetTbTags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) { STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS)); - if (!pStmt->bInfo.needParse) { // table already cached, no need create table - return TSDB_CODE_SUCCESS; + if (pStmt->bInfo.needParse) { + STMT_ERR_RET(stmtParseSql(pStmt)); } - STMT_ERR_RET(stmtParseSql(pStmt)); + if (pStmt->bInfo.inExecCache) { + return TSDB_CODE_SUCCESS; + } STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); if (NULL == pDataBlock) { @@ -612,17 +656,73 @@ int stmtAddBatch(TAOS_STMT *stmt) { return TSDB_CODE_SUCCESS; } +int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp *pRsp) { + if (pRsp->nBlocks <= 0) { + tscError("invalid submit resp block number %d", pRsp->nBlocks); + STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR); + } + + size_t keyLen = 0; + STableDataBlocks **pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); + while (pIter) { + STableDataBlocks *pBlock = *pIter; + char *key = taosHashGetKey(pIter, &keyLen); + + STableMeta *pMeta = qGetTableMetaInDataBlock(pBlock); + if (pMeta->uid != pStmt->bInfo.tbUid) { + tscError("table uid %" PRIx64 " mis-match with current table uid %" PRIx64, pMeta->uid, pStmt->bInfo.tbUid); + STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR); + } + + if (pMeta->uid) { + pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + continue; + } + + SSubmitBlkRsp *blkRsp = NULL; + int32_t i = 0; + for (; i < pRsp->nBlocks; ++i) { + blkRsp = pRsp->pBlocks + i; + if (strlen(blkRsp->tblFName) != keyLen) { + continue; + } + + if (strncmp(blkRsp->tblFName, key, keyLen)) { + continue; + } + + break; + } + + if (i < pRsp->nBlocks) { + tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid, blkRsp->uid); + + pMeta->uid = blkRsp->uid; + pStmt->bInfo.tbUid = blkRsp->uid; + } else { + tscError("table %s not found in submit rsp", pStmt->bInfo.tbFName); + STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR); + } + + pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + } + + return TSDB_CODE_SUCCESS; +} + int stmtExec(TAOS_STMT *stmt) { STscStmt* pStmt = (STscStmt*)stmt; int32_t code = 0; + SSubmitRsp *pRsp = NULL; + bool autoCreateTbl = pStmt->exec.autoCreateTbl; STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); if (STMT_TYPE_QUERY == pStmt->sql.type) { - scheduleQuery(pStmt->exec.pRequest, pStmt->sql.pQueryPlan, pStmt->sql.nodeList); + scheduleQuery(pStmt->exec.pRequest, pStmt->sql.pQueryPlan, pStmt->sql.nodeList, NULL); } else { STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash)); - launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true); + launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true, (autoCreateTbl ? (void**)&pRsp : NULL)); } if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { @@ -643,6 +743,15 @@ int stmtExec(TAOS_STMT *stmt) { _return: stmtCleanExecInfo(pStmt, (code ? false : true), false); + + if (TSDB_CODE_SUCCESS == code && autoCreateTbl) { + if (NULL == pRsp) { + tscError("no submit resp got for auto create table"); + STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR); + } + + STMT_ERR_RET(stmtUpdateTableUid(pStmt, pRsp)); + } ++pStmt->sql.runTimes; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8778a4cf6f40b1f3e0a8d693f8e36e56720559b4..c68ce3c08b17231221e3160bc5bc746344cc21ef 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4022,7 +4022,7 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1; if (pBlock->hashMeta) { if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1; - if (tEncodeCStr(pEncoder, pBlock->ename) < 0) return -1; + if (tEncodeCStr(pEncoder, pBlock->tblFName) < 0) return -1; } if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1; if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1; @@ -4037,7 +4037,9 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) { if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1; if (pBlock->hashMeta) { if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1; - if (tDecodeCStr(pDecoder, &pBlock->dname) < 0) return -1; + pBlock->tblFName= taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1); + if (NULL == pBlock->tblFName) return -1; + if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1; } if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1; if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1; @@ -4068,12 +4070,29 @@ int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) { if (tDecodeI32v(pDecoder, &pRsp->numOfRows) < 0) return -1; if (tDecodeI32v(pDecoder, &pRsp->affectedRows) < 0) return -1; if (tDecodeI32v(pDecoder, &pRsp->nBlocks) < 0) return -1; - pRsp->pBlocks = tDecoderMalloc(pDecoder, sizeof(*pRsp->pBlocks) * pRsp->nBlocks); + pRsp->pBlocks = taosMemoryCalloc(pRsp->nBlocks, sizeof(*pRsp->pBlocks)); if (pRsp->pBlocks == NULL) return -1; for (int32_t iBlock = 0; iBlock < pRsp->nBlocks; iBlock++) { if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1; } - tEndDecode(pDecoder); + tEndDecode(pDecoder); + tDecoderClear(pDecoder); return 0; } + +void tFreeSSubmitRsp(SSubmitRsp *pRsp) { + if (NULL == pRsp) return; + + if (pRsp->pBlocks) { + for (int32_t i = 0; i < pRsp->nBlocks; ++i) { + SSubmitBlkRsp *sRsp = pRsp->pBlocks + i; + taosMemoryFree(sRsp->tblFName); + } + + taosMemoryFree(pRsp->pBlocks); + } + + taosMemoryFree(pRsp); +} + diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 95156c1e1cc8025d8be044bd423c2bf13ba47550..466963f17f9c2072c0a96440f812ac54027903b7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -342,8 +342,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin; if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax; - pRsp->numOfRows = pRsp->numOfRows; - pRsp->affectedRows = pRsp->affectedRows; + pRsp->numOfRows = pMsgIter->numOfRows; + pRsp->affectedRows = pMsgIter->numOfRows; return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 3e869650bf8ec3934c8596598a365a1653912c98..8156e6c512132337be9de46497b1327661cc3947 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -119,7 +119,7 @@ _exit: taosMemoryFree(metaRsp.pSchemas); metaReaderClear(&mer2); metaReaderClear(&mer1); - return code; + return TSDB_CODE_SUCCESS; } int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 678c9edbd4b26bf81257c4d61570073bbecdb6d3..10cdd9e85aee14d65293ad8190bc22f930216bdf 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -497,43 +497,51 @@ _exit: return 0; } +static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) { + SSubmitBlkIter blkIter = {0}; + STSchema *pSchema = NULL; + tb_uid_t suid = 0; + STSRow *row = NULL; + + tInitSubmitBlkIter(msgIter, pBlock, &blkIter); + if (blkIter.row == NULL) return 0; + if (!pSchema || (suid != msgIter->suid)) { + if (pSchema) { + taosMemoryFreeClear(pSchema); + } + pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 0); // TODO: use the real schema + if (pSchema) { + suid = msgIter->suid; + } + } + if (!pSchema) { + printf("%s:%d no valid schema\n", tags, __LINE__); + return -1; + } + char __tags[128] = {0}; + snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid); + while ((row = tGetSubmitBlkNext(&blkIter))) { + tdSRowPrint(row, pSchema, __tags); + } + + taosMemoryFreeClear(pSchema); + + return TSDB_CODE_SUCCESS; +} + static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) { ASSERT(pMsg != NULL); SSubmitMsgIter msgIter = {0}; SMeta *pMeta = pVnode->pMeta; SSubmitBlk *pBlock = NULL; - SSubmitBlkIter blkIter = {0}; - STSRow *row = NULL; - STSchema *pSchema = NULL; - tb_uid_t suid = 0; if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; while (true) { if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; if (pBlock == NULL) break; - tInitSubmitBlkIter(&msgIter, pBlock, &blkIter); - if (blkIter.row == NULL) continue; - if (!pSchema || (suid != msgIter.suid)) { - if (pSchema) { - taosMemoryFreeClear(pSchema); - } - pSchema = metaGetTbTSchema(pMeta, msgIter.suid, 0); // TODO: use the real schema - if (pSchema) { - suid = msgIter.suid; - } - } - if (!pSchema) { - printf("%s:%d no valid schema\n", tags, __LINE__); - continue; - } - char __tags[128] = {0}; - snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter.uid); - while ((row = tGetSubmitBlkNext(&blkIter))) { - tdSRowPrint(row, pSchema, __tags); - } - } - taosMemoryFreeClear(pSchema); + vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags); + } return 0; } @@ -589,8 +597,8 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in } submitBlkRsp.uid = createTbReq.uid; - submitBlkRsp.ename = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); - sprintf(submitBlkRsp.ename, "%s.%s", pVnode->config.dbname, createTbReq.name); + submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); + sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name); msgIter.uid = createTbReq.uid; if (createTbReq.type == TSDB_CHILD_TABLE) { @@ -599,6 +607,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in msgIter.suid = 0; } + vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid"); tDecoderClear(&decoder); } @@ -621,7 +630,7 @@ _exit: tEncoderClear(&encoder); for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) { - taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].ename); + taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName); } taosArrayDestroy(submitRsp.pArray); diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index f1febb47cdbbb48d610c65738fec19d753b02758..ad0319cf69cdabf042b7c6e074be8b0dd3f7dd10 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -839,6 +839,7 @@ static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pMeta->uid = 0; pMeta->vgId = vg.vgId; + pMeta->tableType = TSDB_CHILD_TABLE; STableMeta* pBackup = NULL; if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) { @@ -1094,8 +1095,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { sToken.z = tbName; sToken.n = strlen(tbName); - - autoCreateTbl = true; } else { return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z); } @@ -1112,6 +1111,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { if (TK_USING == sToken.type) { CHECK_CODE(parseUsingClause(pCxt, &name, tbFName)); NEXT_TOKEN(pCxt->pSql, sToken); + autoCreateTbl = true; } else { CHECK_CODE(getTableMeta(pCxt, &name, tbFName)); } diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index 8bf1fe012eddbfa4f1666b4caba3a0610427e145..d164df6db75e236f9afcda01cf76f29015d7c6a0 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -445,7 +445,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p const int INSERT_HEAD_SIZE = sizeof(SSubmitReq); int code = 0; bool isRawPayload = IS_RAW_PAYLOAD(payloadType); - SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); STableDataBlocks** p = taosHashIterate(pHashObj, NULL); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 5906ee8970f7725fb10139522eab3d276ef3f7f0..a90fb7fc2e882b0a65feb87940874269d007564a 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -159,6 +159,7 @@ typedef struct SSchTask { typedef struct SSchJobAttr { EExplainMode explainMode; + bool needRes; bool syncSchedule; bool queryJob; bool needFlowCtrl; @@ -190,6 +191,7 @@ typedef struct SSchJob { SSchTask *fetchTask; int32_t errCode; SArray *errList; // SArray + SRWLatch resLock; void *resData; //TODO free it or not int32_t resNumOfRows; const char *sql; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index a8b0f2e2b82ac88ed480fe5b01ed9e171a70428a..1ce074c49fb08f13688d31f0dc4a2f4e8071b820 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -70,7 +70,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * } int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql, - int64_t startTs, bool syncSchedule) { + int64_t startTs, bool needRes, bool syncSchedule) { int32_t code = 0; int64_t refId = -1; SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); @@ -81,6 +81,7 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray pJob->attr.explainMode = pDag->explainInfo.mode; pJob->attr.syncSchedule = syncSchedule; + pJob->attr.needRes = needRes; pJob->transport = transport; pJob->sql = sql; @@ -1133,16 +1134,39 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } case TDMT_VND_SUBMIT_RSP: { - if (msg) { - SSubmitRsp *rsp = (SSubmitRsp *)msg; - // SCH_ERR_JRET(rsp->code); - } - SCH_ERR_JRET(rspCode); - SSubmitRsp *rsp = (SSubmitRsp *)msg; - if (rsp) { - pJob->resNumOfRows += rsp->affectedRows; + if (msg) { + SDecoder coder = {0}; + SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp)); + tDecoderInit(&coder, msg, msgSize); + code = tDecodeSSubmitRsp(&coder, rsp); + if (code) { + SCH_TASK_ELOG("decode submitRsp failed, code:%d", code); + tFreeSSubmitRsp(rsp); + SCH_ERR_JRET(code); + } + + atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); + SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows); + + if (pJob->attr.needRes) { + SCH_LOCK(SCH_WRITE, &pJob->resLock); + if (pJob->resData) { + SSubmitRsp *sum = pJob->resData; + sum->affectedRows += rsp->affectedRows; + sum->nBlocks += rsp->nBlocks; + sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks)); + memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks)); + taosMemoryFree(rsp->pBlocks); + taosMemoryFree(rsp); + } else { + pJob->resData = rsp; + } + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + } else { + tFreeSSubmitRsp(rsp); + } } SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); @@ -2350,7 +2374,7 @@ void schFreeJobImpl(void *job) { } static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - int64_t startTs, bool syncSchedule) { + int64_t startTs, bool needRes, bool syncSchedule) { qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) { @@ -2359,7 +2383,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD int32_t code = 0; SSchJob *pJob = NULL; - SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule)); + SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, needRes, syncSchedule)); SCH_ERR_JRET(schLaunchJob(pJob)); @@ -2473,7 +2497,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, - int64_t startTs, SQueryResult *pRes) { + int64_t startTs, bool needRes, SQueryResult *pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -2481,13 +2505,17 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true)); } else { - SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true)); + SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, needRes, true)); } SSchJob *job = schAcquireJob(*pJob); pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; + if (needRes) { + pRes->res = job->resData; + job->resData = NULL; + } schReleaseJob(*pJob); @@ -2502,7 +2530,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pD if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false)); } else { - SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false)); + SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false, false)); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index fc0e05aaf106fb11d8daa9be9a55e510aac58ff5..09ecd9fffd013c18762e3de6cf36a51f99ced1ce 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -985,7 +985,7 @@ TEST(insertTest, normalCase) { taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId); SQueryResult res = {0}; - code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, &res); + code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, false, &res); ASSERT_EQ(code, 0); ASSERT_EQ(res.numOfRows, 20); diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index 5cccafa1b4ffe81373179a3421c4bc4d6a5b7072..9016dca21eac0eec35d90910672b1efec8ae6031 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -213,7 +213,7 @@ CaseCtrl gCaseCtrl = { // default #if 1 -CaseCtrl gCaseCtrl = { // default +CaseCtrl gCaseCtrl = { .bindNullNum = 0, .printCreateTblSql = true, .printQuerySql = true, @@ -233,9 +233,9 @@ CaseCtrl gCaseCtrl = { // default .printRes = true, .runTimes = 0, .caseIdx = -1, - .caseNum = 1, - .caseRunIdx = 11, - .caseRunNum = 1, + .caseNum = 15, + .caseRunIdx = 8, + .caseRunNum = 15, }; #endif