diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e014287522079ed5dc0df01807daf5f6e3b60111..82acb2dcb3f63a14d1665e8370b03c47fe23f44b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2272,3 +2272,4 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p #endif #endif /*_TD_COMMON_TAOS_MSG_H_*/ + \ No newline at end of file diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 67f746c8a143f165bc3171bfc3055b650b10d435..cd979968f7a1da28b5cf199ac6730f51be738271 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -213,6 +213,7 @@ typedef struct SSubplan { ESubplanType subplanType; int32_t msgType; // message type for subplan, used to denote the send message type to vnode. int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner. + char dbFName[TSDB_DB_FNAME_LEN]; SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node SQueryNodeStat execNodeStat; // only for scan subplan SNodeList* pChildren; // the datasource subplan,from which to fetch the result diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 4e0a1e942c8884b79642815566c131726e1b0969..fe506909d9c960f945f08923f629fc1087a56c18 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -181,9 +181,11 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE +#define ONLY_RSP_HEAD_ERROR(_code) ((_code) == TSDB_CODE_VND_INVALID_VGROUP_ID) #define NEED_CLIENT_RM_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_INVALID_TABLE_ID || (_code) == TSDB_CODE_VND_TB_NOT_EXIST) -#define NEED_CLIENT_REFRESH_VG_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH) -#define NEED_CLIENT_HANDLE_ERROR(_code) (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code)) +#define NEED_CLIENT_REFRESH_VG_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID) +#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED) +#define NEED_CLIENT_HANDLE_ERROR(_code) (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) #define NEED_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b657ca10d912d488b831feca88c4041465a18043..ac7e217abb946f8a7df36831cce7529b0369791f 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -354,6 +354,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) #define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) #define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) +#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x0617) // query #define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index e39520af813e2b3002dac528d47dc960aac38bd2..479eec598fd9b1e968a498c2b51b44866ae9b42c 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -278,13 +278,13 @@ int32_t clientProcessErrorList(SArray **pList) { for (int32_t i = 0; i < errNum; ++i) { SQueryErrorInfo *errInfo = taosArrayGet(errList, i); - if (TSDB_CODE_VND_HASH_MISMATCH == errInfo->code) { + if (NEED_CLIENT_REFRESH_VG_ERROR(errInfo->code)) { if (i == (errNum - 1)) { break; } // TODO REMOVE SAME DB ERROR - } else if (NEED_CLIENT_RM_TBLMETA_ERROR(errInfo->code)) { + } else if (NEED_CLIENT_REFRESH_TBLMETA_ERROR(errInfo->code) || NEED_CLIENT_RM_TBLMETA_ERROR(errInfo->code)) { continue; } else { taosArrayRemove(errList, i); @@ -355,6 +355,28 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { } catalogRemoveTableMeta(pCatalog, &errInfo->tableName); + } else if (NEED_CLIENT_REFRESH_TBLMETA_ERROR(errInfo->code)) { + ++needRetryNum; + + SCatalog *pCatalog = NULL; + tcode = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + if (tcode != TSDB_CODE_SUCCESS) { + ++needRetryFailNum; + code = tcode; + continue; + } + + SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(&errInfo->tableName, dbFName); + + tcode = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, &errInfo->tableName, -1); + if (tcode != TSDB_CODE_SUCCESS) { + ++needRetryFailNum; + code = tcode; + continue; + } } } diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index d311e1e41703e70ab0846d92f6946c81648b651c..713ab1282ddd7a881c0799e5a640213f81caf080 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -805,6 +805,24 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, return code; } +static void dndGenerateResponseHead(SRpcMsg *pMsg, void **pRspHead, int *contLen) { + if (TDMT_VND_SUBMIT != pMsg->msgType && TDMT_VND_QUERY != pMsg->msgType + && TDMT_VND_CREATE_TABLE != pMsg->msgType && TDMT_VND_TABLE_META != pMsg->msgType) { + return; + } + + *pRspHead = rpcMallocCont(sizeof(SRspHead)); + if (NULL == *pRspHead) { + return; + } + + SMsgHead *pHead = pMsg->pCont; + + strcpy(((SRspHead *)(*pRspHead))->dbFName, pHead->dbFName); + + *contLen = sizeof(SRspHead); +} + static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { SMsgHead *pHead = pMsg->pCont; pHead->contLen = htonl(pHead->contLen); @@ -815,6 +833,7 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId); if (pMsg->msgType & 1u) { SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; + dndGenerateResponseHead(pMsg, &rsp.pCont, &rsp.contLen); rpcSendResponse(&rsp); } rpcFreeCont(pMsg->pCont); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index e9978f208e6bd7d7cc0a284147cd4f7967778007..96e0bf11b352e9e7a74912bf3076a4d8a2ebc64b 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -83,21 +83,20 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { for (int i = 0; i < reqNum; i++) { SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); - // TODO OPEN THIS - #if 0 char tableFName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(&pCreateTbReq->name, tableFName); + SMsgHead *pHead = (SMsgHead *)pMsg->pCont; + sprintf(tableFName, "%s.%s", pHead->dbFName, pCreateTbReq->name); int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName); if (code) { SVCreateTbRsp rsp; rsp.code = code; - memcpy(rsp.tableName, pCreateTbReq->name, sizeof(rsp.tableName)); + tNameFromString(&rsp.tableName, tableFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); if (NULL == vCreateTbBatchRsp.rspList) { vCreateTbBatchRsp.rspList = taosArrayInit(reqNum - i, sizeof(SVCreateTbRsp)); if (NULL == vCreateTbBatchRsp.rspList) { - vError("vgId:%d, failed to init array: %d", reqNum - i); + vError("vgId:%d, failed to init array: %d", pVnode->vgId, reqNum - i); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -106,8 +105,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { taosArrayPush(vCreateTbBatchRsp.rspList, &rsp); } - #endif - if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { // TODO: handle error vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 853f3b52b1f80d20c3e4d38c9594438b0e2a9b9e..7f95bd55799c0f46bd3cf8255504bc0e1cca7102 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1722,18 +1722,17 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, c if (inCache) { input.dbId = dbCache->dbId; - input.vgVersion = dbCache->vgInfo->vgVersion; - input.numOfTable = dbCache->vgInfo->numOfTable; ctgReleaseVgInfo(dbCache); ctgReleaseDBCache(pCtg, dbCache); - } else { - input.vgVersion = CTG_DEFAULT_INVALID_VERSION; } + + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; + input.numOfTable = 0; code = ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut); if (code) { - if (CTG_DB_NOT_EXIST(code) && input.vgVersion > CTG_DEFAULT_INVALID_VERSION) { + if (CTG_DB_NOT_EXIST(code) && inCache) { ctgDebug("db no longer exist, dbFName:%s, dbId:%" PRIx64, input.db, input.dbId); ctgPushRmDBMsgInQueue(pCtg, input.db, input.dbId); } @@ -2660,7 +2659,7 @@ int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL, false)); + CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL, true)); } int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 54fe7358a84639c7c1b1985dffb4f8ab34fafa9d..3a8084741a198f090ecd87dbc11bf18305e6e681 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -8201,6 +8201,22 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa } } + STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->meta, pScanPhyNode->uid); + if (pTbCfg == NULL) { + tb_uid_t uid = 0; + pTbCfg = metaGetTbInfoByName(pHandle->meta, pScanPhyNode->tableName.tname, &uid); + if (pTbCfg) { + errInfo->code = TSDB_CODE_TDB_TABLE_RECREATED; + errInfo->tableName = pScanPhyNode->tableName; + return NULL; + } + + errInfo->code = TSDB_CODE_TDB_INVALID_TABLE_ID; + errInfo->tableName = pScanPhyNode->tableName; + return NULL; + } + + size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId); if (NULL == pDataReader) { diff --git a/source/libs/parser/inc/parInsertData.h b/source/libs/parser/inc/parInsertData.h index cbc35240a3c766fcc414733659042227dcd8b893..53efd15f103e7dff4bc01708af540aafed623511 100644 --- a/source/libs/parser/inc/parInsertData.h +++ b/source/libs/parser/inc/parInsertData.h @@ -78,6 +78,8 @@ typedef struct STableDataBlocks { char *pData; bool cloned; STagData tagData; + char tableName[TSDB_TABLE_NAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; SParsedDataColInfo boundColumnInfo; SRowBuilder rowBuilder; @@ -115,10 +117,11 @@ static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t rowType, } } -static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { - pBlocks->tid = pTableMeta->suid; - pBlocks->uid = pTableMeta->uid; - pBlocks->sversion = pTableMeta->sversion; +static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks* dataBuf, int32_t numOfRows) { + pBlocks->tid = dataBuf->pTableMeta->suid; + pBlocks->uid = dataBuf->pTableMeta->uid; + pBlocks->sversion = dataBuf->pTableMeta->sversion; + strcpy(pBlocks->tableName, dataBuf->tableName); if (pBlocks->numOfRows + numOfRows >= INT16_MAX) { return TSDB_CODE_TSC_INVALID_OPERATION; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 43cc30848365c11322a4495ab14291e2e01bf453..3bcec81bb091073b9c9488f28e038e52e40cafb0 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -50,6 +50,8 @@ typedef struct SInsertParseContext { SParseContext* pComCxt; // input char *pSql; // input SMsgBuf msg; // input + char dbFName[TSDB_DB_FNAME_LEN]; + char tableName[TSDB_TABLE_NAME_LEN]; STableMeta* pTableMeta; // each table SParsedDataColInfo tags; // each table SKVRowBuilder tagsBuilder; // each table @@ -228,6 +230,9 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg)); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); pCxt->pTableMeta->vgId = vg.vgId; // todo remove + strcpy(pCxt->tableName, name.tname); + tNameGetFullDbName(&name, pCxt->dbFName); + return TSDB_CODE_SUCCESS; } @@ -241,10 +246,11 @@ static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pS return -1; } -static void buildMsgHeader(SVgDataBlocks* blocks) { +static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) { SSubmitReq* submit = (SSubmitReq*)blocks->pData; submit->header.vgId = htonl(blocks->vg.vgId); submit->header.contLen = htonl(blocks->size); + strcpy(submit->header.dbFName, src->dbFName); submit->length = submit->header.contLen; submit->numOfBlocks = htonl(blocks->numOfTables); SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); @@ -278,7 +284,7 @@ static int32_t buildOutput(SInsertParseContext* pCxt) { dst->numOfTables = src->numOfTables; dst->size = src->size; TSWAP(dst->pData, src->pData, char*); - buildMsgHeader(dst); + buildMsgHeader(src, dst); taosArrayPush(pCxt->pOutput->pDataBlocks, &dst); } return TSDB_CODE_SUCCESS; @@ -893,7 +899,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows)); SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData); - if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf->pTableMeta, numOfRows)) { + if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) { return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767"); } @@ -970,7 +976,9 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { STableDataBlocks *dataBuf = NULL; CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL)); - + strcpy(dataBuf->tableName, pCxt->tableName); + strcpy(dataBuf->dbFName, pCxt->dbFName); + if (TK_NK_LP == sToken.type) { // pSql -> field1_name, ...) CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta))); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 59c9963623de6d5f599e9e7f05d5397fe25de289..f67d56324ad1d573dbc8bd4306fd5ad07e11e7b0 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1239,6 +1239,7 @@ static void destroyTranslateContext(STranslateContext* pCxt) { typedef struct SVgroupTablesBatch { SVCreateTbBatchReq req; SVgroupInfo info; + char dbName[TSDB_DB_NAME_LEN]; } SVgroupTablesBatch; static void toSchema(const SColumnDefNode* pCol, int32_t colId, SSchema* pSchema) { @@ -1254,7 +1255,7 @@ static void destroyCreateTbReq(SVCreateTbReq* pReq) { } static int32_t buildNormalTableBatchReq( - const char* pTableName, const SNodeList* pColumns, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) { + const char* pDbName, const char* pTableName, const SNodeList* pColumns, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) { SVCreateTbReq req = {0}; req.type = TD_NORMAL_TABLE; req.name = strdup(pTableName); @@ -1272,6 +1273,7 @@ static int32_t buildNormalTableBatchReq( } pBatch->info = *pVgroupInfo; + strcpy(pBatch->dbName, pDbName); pBatch->req.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq)); if (NULL == pBatch->req.pArray) { destroyCreateTbReq(&req); @@ -1282,7 +1284,7 @@ static int32_t buildNormalTableBatchReq( return TSDB_CODE_SUCCESS; } -static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) { +static int32_t serializeVgroupTablesBatch(int32_t acctId, SVgroupTablesBatch* pTbBatch, SArray* pBufArray) { int tlen = sizeof(SMsgHead) + tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req)); void* buf = malloc(tlen); if (NULL == buf) { @@ -1290,6 +1292,7 @@ static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray* } ((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId); ((SMsgHead*)buf)->contLen = htonl(tlen); + sprintf(((SMsgHead*)buf)->dbFName, "%d.%s", acctId, pTbBatch->dbName); void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tSerializeSVCreateTbBatchReq(&pBuf, &(pTbBatch->req)); @@ -1351,16 +1354,16 @@ static void destroyCreateTbReqArray(SArray* pArray) { taosArrayDestroy(pArray); } -static int32_t buildCreateTableDataBlock(const SCreateTableStmt* pStmt, const SVgroupInfo* pInfo, SArray** pBufArray) { +static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt* pStmt, const SVgroupInfo* pInfo, SArray** pBufArray) { *pBufArray = taosArrayInit(1, POINTER_BYTES); if (NULL == *pBufArray) { return TSDB_CODE_OUT_OF_MEMORY; } SVgroupTablesBatch tbatch = {0}; - int32_t code = buildNormalTableBatchReq(pStmt->tableName, pStmt->pCols, pInfo, &tbatch); + int32_t code = buildNormalTableBatchReq(pStmt->dbName, pStmt->tableName, pStmt->pCols, pInfo, &tbatch); if (TSDB_CODE_SUCCESS == code) { - code = serializeVgroupTablesBatch(&tbatch, *pBufArray); + code = serializeVgroupTablesBatch(acctId, &tbatch, *pBufArray); } destroyCreateTbReqBatch(&tbatch); @@ -1377,7 +1380,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) { int32_t code = getTableHashVgroup(pCxt->pParseCxt, pStmt->dbName, pStmt->tableName, &info); SArray* pBufArray = NULL; if (TSDB_CODE_SUCCESS == code) { - code = buildCreateTableDataBlock(pStmt, &info, &pBufArray); + code = buildCreateTableDataBlock(pCxt->pParseCxt->acctId, pStmt, &info, &pBufArray); } if (TSDB_CODE_SUCCESS == code) { code = rewriteToVnodeModifOpStmt(pQuery, pBufArray); @@ -1389,7 +1392,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) { return code; } -static void addCreateTbReqIntoVgroup(SHashObj* pVgroupHashmap, const char* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) { +static void addCreateTbReqIntoVgroup(SHashObj* pVgroupHashmap, const char* pDbName, const char* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) { struct SVCreateTbReq req = {0}; req.type = TD_CHILD_TABLE; req.name = strdup(pTableName); @@ -1400,6 +1403,7 @@ static void addCreateTbReqIntoVgroup(SHashObj* pVgroupHashmap, const char* pTabl if (pTableBatch == NULL) { SVgroupTablesBatch tBatch = {0}; tBatch.info = *pVgInfo; + strcpy(tBatch.dbName, pDbName); tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq)); taosArrayPush(tBatch.req.pArray, &req); @@ -1546,7 +1550,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla code = getTableHashVgroup(pCxt->pParseCxt, pStmt->dbName, pStmt->tableName, &info); } if (TSDB_CODE_SUCCESS == code) { - addCreateTbReqIntoVgroup(pVgroupHashmap, pStmt->tableName, row, pSuperTableMeta->uid, &info); + addCreateTbReqIntoVgroup(pVgroupHashmap, pStmt->dbName, pStmt->tableName, row, pSuperTableMeta->uid, &info); } tfree(pSuperTableMeta); @@ -1554,7 +1558,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla return code; } -static SArray* serializeVgroupsTablesBatch(SHashObj* pVgroupHashmap) { +static SArray* serializeVgroupsTablesBatch(int32_t acctId, SHashObj* pVgroupHashmap) { SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); if (NULL == pBufArray) { return NULL; @@ -1568,7 +1572,7 @@ static SArray* serializeVgroupsTablesBatch(SHashObj* pVgroupHashmap) { break; } - serializeVgroupTablesBatch(pTbBatch, pBufArray); + serializeVgroupTablesBatch(acctId, pTbBatch, pBufArray); destroyCreateTbReqBatch(pTbBatch); } while (true); @@ -1593,7 +1597,7 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery) } } - SArray* pBufArray = serializeVgroupsTablesBatch(pVgroupHashmap); + SArray* pBufArray = serializeVgroupsTablesBatch(pCxt->pParseCxt->acctId, pVgroupHashmap); taosHashCleanup(pVgroupHashmap); if (NULL == pBufArray) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 5af26b3e32cf6168b79aa1da7194f786d42c3665..cf7d467b80d2ce60fb478cfba4f45c5e3cb2d87d 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -256,6 +256,7 @@ static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* p pTableScan->scanRange = pScanLogicNode->scanRange; vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); + tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); return (SPhysiNode*)pTableScan; } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 2e8f8a44d4b538c3c78ef79d407b1661d04cf1b9..1a4f09d0c8a29ff074273980489e5bfa9d01a8f7 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -933,6 +933,29 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code, NULL)); } +int32_t schRspHeadToErrList(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SRspHead *head, SArray **errList) { + SQueryErrorInfo errInfo = {0}; + errInfo.code = errCode; + if (tNameFromString(&errInfo.tableName, head->dbFName, T_NAME_ACCT | T_NAME_DB)) { + SCH_TASK_ELOG("invalid rsp head, dbFName:%s", head->dbFName); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + *errList = taosArrayInit(1, sizeof(SQueryErrorInfo)); + if (NULL == *errList) { + SCH_TASK_ELOG("taskArrayInit %d errInfo failed", 1); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + if (NULL == taosArrayPush(*errList, &errInfo)) { + SCH_TASK_ELOG("taosArrayPush err to errList failed, dbFName:%s", head->dbFName); + taosArrayDestroy(*errList); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; +} + // Note: no more task error processing, handled in function internal int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { @@ -954,6 +977,12 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_CREATE_TABLE_RSP: { SVCreateTbBatchRsp batchRsp = {0}; if (msg) { + if (ONLY_RSP_HEAD_ERROR(rspCode)) { + SCH_ERR_JRET(schRspHeadToErrList(pJob, pTask, rspCode, (SRspHead *)msg, &errList)); + errInfoGot = true; + SCH_ERR_JRET(rspCode); + } + tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp); if (batchRsp.rspList) { int32_t num = taosArrayGetSize(batchRsp.rspList); @@ -973,7 +1002,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch taosArrayPush(errList, &errInfo); } - taosArrayDestroy(batchRsp.rspList); + taosArrayDestroy(batchRsp.rspList); errInfoGot = true; } } @@ -984,23 +1013,22 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } case TDMT_VND_SUBMIT_RSP: { - #if 0 //TODO OPEN THIS - SSubmitRsp *rsp = (SSubmitRsp *)msg; + if (msg) { + if (ONLY_RSP_HEAD_ERROR(rspCode)) { + SCH_ERR_JRET(schRspHeadToErrList(pJob, pTask, rspCode, (SRspHead *)msg, &errList)); + errInfoGot = true; + SCH_ERR_JRET(rspCode); + } + + SSubmitRsp *rsp = (SSubmitRsp *)msg; + + SCH_ERR_JRET(rsp->code); - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + pJob->resNumOfRows += rsp->affectedRows; } - pJob->resNumOfRows += rsp->affectedRows; - #else SCH_ERR_JRET(rspCode); - SSubmitRsp *rsp = (SSubmitRsp *)msg; - if (rsp) { - pJob->resNumOfRows += rsp->affectedRows; - } - #endif - SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); break; @@ -1008,6 +1036,12 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_QUERY_RSP: { SQueryTableRsp rsp = {0}; if (msg) { + if (ONLY_RSP_HEAD_ERROR(rspCode)) { + SCH_ERR_JRET(schRspHeadToErrList(pJob, pTask, rspCode, (SRspHead *)msg, &errList)); + errInfoGot = true; + SCH_ERR_JRET(rspCode); + } + tDeserializeSQueryTableRsp(msg, msgSize, &rsp); if (rsp.code) { errInfo.code = rsp.code; @@ -1326,6 +1360,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SSubQueryMsg *pMsg = msg; pMsg->header.vgId = htonl(addr->nodeId); + strcpy(pMsg->header.dbFName, pTask->plan->dbFName); pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a93a07648ab8f6923c92892349ba46f455ca4800..facf8df057498fc58bf5b910c3cafc79b3094766 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -350,6 +350,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")