diff --git a/README-CN.md b/README-CN.md index 437d671bb97a1009f5d48d2423a3d17f75bf1494..a5d239a53296b5a7d96a3e1624879386ab57bf13 100644 --- a/README-CN.md +++ b/README-CN.md @@ -60,7 +60,7 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev 为了在 Ubuntu/Debian 系统上编译 [taos-tools](https://github.com/taosdata/taos-tools) 需要安装如下软件: ```bash -sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev pkg-config +sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config ``` ### CentOS 7.9 @@ -85,7 +85,7 @@ sudo dnf install -y gcc gcc-c++ make cmake epel-release git openssl-devel ``` -sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel +sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel ``` #### CentOS 8/Rocky Linux @@ -94,7 +94,7 @@ sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgco sudo yum install -y epel-release sudo yum install -y dnf-plugins-core sudo yum config-manager --set-enabled powertools -sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel +sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel ``` 注意:由于 snappy 缺乏 pkg-config 支持(参考 [链接](https://github.com/google/snappy/pull/86)),会导致 cmake 提示无法发现 libsnappy,实际上工作正常。 diff --git a/README.md b/README.md index 6aec756ec76d6801bfe72e3c66bc36a325d8245e..05c1c075f0c3b3894c93d0aa2e126a9dc6340758 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev To build the [taosTools](https://github.com/taosdata/taos-tools) on Ubuntu/Debian, the following packages need to be installed. ```bash -sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev pkg-config +sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config ``` ### CentOS 7.9 @@ -85,7 +85,7 @@ sudo dnf install -y gcc gcc-c++ make cmake epel-release git openssl-devel #### CentOS 7.9 ``` -sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel +sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel ``` #### CentOS 8/Rocky Linux @@ -94,7 +94,7 @@ sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgco sudo yum install -y epel-release sudo yum install -y dnf-plugins-core sudo yum config-manager --set-enabled powertools -sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel +sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel ``` Note: Since snappy lacks pkg-config support (refer to [link](https://github.com/google/snappy/pull/86)), it leads a cmake prompt libsnappy not found. But snappy still works well. diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 2d7bcf6592cddb101a53c7445f83a293a2fed0db..5683a4358e43ccb399331ef1ed8f22013b8e18d1 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG e62c5ea + GIT_TAG cac24d3 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 470b7bba7e738e1a4100e94e466e723e702f2424..bb1addf1b698479ca418882a1a16c3c59e5347cc 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -70,6 +70,11 @@ static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL); } + +static inline bool syncUtilUserCommit(tmsg_t msgType) { + return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; +} + /* ------------------------ OTHER DEFINITIONS ------------------------ */ // IE type #define TSDB_IE_TYPE_SEC 1 diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index f94e0c98dc2d4614a8d743f6a7ab07465b46b700..412054b13e3bdceec6234919040c4b99a43d6102 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -234,7 +234,6 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, - QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, QUERY_NODE_PHYSICAL_PLAN_PROJECT, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, @@ -265,7 +264,8 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT, QUERY_NODE_PHYSICAL_PLAN_DELETE, QUERY_NODE_PHYSICAL_SUBPLAN, - QUERY_NODE_PHYSICAL_PLAN + QUERY_NODE_PHYSICAL_PLAN, + QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN } ENodeType; /** diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 1c8f83d0186bf37abae1db936b0ca4656d22c3ab..134e410f2b1dcb6c8db17f078b47192860917272 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -796,9 +796,10 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c SQuery *pQuery = pRequest->pQuery; pRequest->metric.ctgEnd = taosGetTimestampUs(); - qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId); + qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, tstrerror(code)); if (code == TSDB_CODE_SUCCESS) { + pWrapper->pCatalogReq->forceUpdate = false; code = qContinueParseSql(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery); } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 26c8bd358680de5588e2bd0d29088e17771234a9..1c2407ad45d573e577adcc178c03953a8243e85f 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -408,7 +408,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfQnodeQueryThreads = tsNumOfCores * 2; tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); - if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 4, 1024, 0) != 0) return -1; // tsNumOfQnodeFetchThreads = tsNumOfCores / 2; // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 04c12abcf995622f8985313ddf65bc2d5cc381c0..99f8bd002a5944ec1f541c18ec523efab41e977e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -50,7 +50,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { static bool dmFailFastFp(tmsg_t msgType) { // add more msg type later - return msgType == TDMT_SYNC_HEARTBEAT; + return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES; } static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { @@ -301,6 +301,7 @@ int32_t dmInitServer(SDnode *pDnode) { rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.parent = pDnode; + rpcInit.compressSize = tsCompressMsgSize; pTrans->serverRpc = rpcOpen(&rpcInit); if (pTrans->serverRpc == NULL) { diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 895d7fedc4ff2dff78ea3c81f262d04c7e4a3be3..7cba69ed50d64ec8dd5ebc06eaa0bc3438afba4a 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -119,7 +119,13 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta } int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { - int32_t code = mndProcessWriteMsg(pFsm, pMsg, pMeta); + int32_t code = 0; + if (!syncUtilUserCommit(pMsg->msgType)) { + goto _out; + } + code = mndProcessWriteMsg(pFsm, pMsg, pMeta); + +_out: rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; return code; diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c index 2611c37fd43a6964ca28b8bf47274c75c9363c2d..21f63e6e985e4adaa101df0d8b6d7e4afd1aebc9 100644 --- a/source/dnode/vnode/src/meta/metaCache.c +++ b/source/dnode/vnode/src/meta/metaCache.c @@ -454,7 +454,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK SListNode* pNode = NULL; while ((pNode = tdListNext(&iter)) != NULL) { - memcpy(pBuf + sizeof(suid), pNode->data, keyLen); + memcpy(&pBuf[1], pNode->data, keyLen); // check whether it is existed in LRU cache, and remove it from linked list if not. LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index ab9f405e00a2ba684b5b8f8c69779659da946c92..8cc6666e4dbb3c0b3cd2003eb2a2a92a95003afb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -53,6 +53,7 @@ typedef struct { // -------------- TSKEY nextKey; // reset by each table commit int32_t commitFid; + int32_t expLevel; TSKEY minKey; TSKEY maxKey; // commit file data @@ -503,6 +504,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { // memory pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); + pCommitter->expLevel = tsdbFidLevel(pCommitter->commitFid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec()); tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, &pCommitter->maxKey); #if 0 @@ -556,7 +558,10 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { } } else { SDiskID did = {0}; - tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did); + if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &did) < 0) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); wSet.diskId = did; wSet.nSttF = 1; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0d1d63826aedcb47fbb100af94dd116efd6e9864..71cca88ffb7b94d8822b639d0b20d694f03f2c4f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -190,9 +190,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp version); tAssert(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm); + tAssert(pVnode->state.applied + 1 == version); + pVnode->state.applied = version; pVnode->state.applyTerm = pMsg->info.conn.applyTerm; + if (!syncUtilUserCommit(pMsg->msgType)) goto _exit; + // skip header pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); len = pMsg->contLen - sizeof(SMsgHead); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index ba9990658955f29b437ae0f17c36e24b3a6e84c3..c435c46d467f26bd4818f8074a4d90a263fe0e21 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -547,6 +547,14 @@ typedef struct SCtgOperation { #define ctgDebug(param, ...) qDebug("CTG:%p " param, pCtg, __VA_ARGS__) #define ctgTrace(param, ...) qTrace("CTG:%p " param, pCtg, __VA_ARGS__) +#define ctgTaskFatal(param, ...) qFatal("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__) +#define ctgTaskError(param, ...) qError("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__) +#define ctgTaskWarn(param, ...) qWarn("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__) +#define ctgTaskInfo(param, ...) qInfo("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__) +#define ctgTaskDebug(param, ...) qDebug("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__) +#define ctgTaskTrace(param, ...) qTrace("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__) + + #define CTG_LOCK_DEBUG(...) \ do { \ if (gCTGDebug.lockEnable) { \ diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 7138e0ce16a4c3543bf145251640447fd986e54d..5b01ac1fb96007234863d7c96f7aaa1e75b3c962 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -1094,6 +1094,9 @@ _return: ctgReleaseVgInfoToCache(pCtg, dbCache); } + if (code) { + ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code)); + } if (pTask->res || code) { ctgHandleTaskEnd(pTask, code); } @@ -1124,7 +1127,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu SVgroupInfo vgInfo = {0}; CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, pName, &vgInfo)); - ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); + ctgTaskDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); *vgId = vgInfo.vgId; CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq)); @@ -1144,7 +1147,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu SVgroupInfo vgInfo = {0}; CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo)); - ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); + ctgTaskDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); *vgId = vgInfo.vgId; CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq)); @@ -1162,7 +1165,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu return TSDB_CODE_SUCCESS; } - ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName)); + ctgTaskError("no tbmeta got, tbName:%s", tNameGetTableName(pName)); ctgRemoveTbMetaFromCache(pCtg, pName, false); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); @@ -1180,7 +1183,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; if (CTG_IS_META_NULL(pOut->metaType)) { - ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName)); + ctgTaskError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName)); ctgRemoveTbMetaFromCache(pCtg, pName, false); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); } @@ -1190,7 +1193,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu } if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) { - ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName)); + ctgTaskDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName)); taosMemoryFreeClear(pOut->tbMeta); @@ -1207,11 +1210,11 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu STableMeta* stbMeta = NULL; (void)ctgReadTbMetaFromCache(pCtg, &stbCtx, &stbMeta); if (stbMeta && stbMeta->sversion >= pOut->tbMeta->sversion) { - ctgDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName)); + ctgTaskDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName)); exist = 1; taosMemoryFreeClear(stbMeta); } else { - ctgDebug("need to get/update stb meta, tbName:%s", tNameGetTableName(pName)); + ctgTaskDebug("need to get/update stb meta, tbName:%s", tNameGetTableName(pName)); taosMemoryFreeClear(pOut->tbMeta); taosMemoryFreeClear(stbMeta); } @@ -1225,7 +1228,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu break; } default: - ctgError("invalid reqType %d", reqType); + ctgTaskError("invalid reqType %d", reqType); CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); } @@ -1280,6 +1283,7 @@ _return: TSWAP(pTask->res, ctx->pResList); taskDone = true; } + ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code)); } if (pTask->res && taskDone) { diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 2374d80bbf3f15430230f95177b4efe9beba2e4c..28b026aa231e868626a73d738d31247600334dc5 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -95,6 +95,8 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo // TODO: optimize to ignore null values for linear interpolation. if (!pLinearInfo->isStartSet) { if (!colDataIsNull_s(pColInfoData, rowIndex)) { + tAssert(IS_MATHABLE_TYPE(pColInfoData->info.type)); + pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); memcpy(pLinearInfo->start.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes); } diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index df7a3953f219497d9b3a15377c90cc12a633f08e..59545d8fbc1c1a46da78e747abfbf5363501fdf5 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -44,6 +44,7 @@ typedef struct SInsertParseContext { SParsedDataColInfo tags; // for stmt bool missCache; bool usingDuplicateTable; + bool forceUpdate; } SInsertParseContext; typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); @@ -829,6 +830,11 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo } static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { + if (pCxt->forceUpdate) { + pCxt->missCache = true; + return TSDB_CODE_SUCCESS; + } + int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache); if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache); @@ -844,6 +850,11 @@ static int32_t preParseUsingTableName(SInsertParseContext* pCxt, SVnodeModifOpSt } static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { + if (pCxt->forceUpdate) { + pCxt->missCache = true; + return TSDB_CODE_SUCCESS; + } + int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache); if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = getTableMeta(pCxt, &pStmt->usingTableName, true, &pStmt->pTableMeta, &pCxt->missCache); @@ -1909,6 +1920,7 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen}, .missCache = false, .usingDuplicateTable = false, + .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false) }; int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery); diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 39118910f1f975db2c43378e01e61be2c6e2e3aa..48df7e36a30737324342bb06bdbbfced8b73a5cd 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -377,7 +377,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) #define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \ - (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)))) + (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)) || (_task)->redirectCtx.inRedirect)) #define SCH_REDIRECT_MSGTYPE(_msgType) \ ((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \ (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH) diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 560ce0bdc36f506fe72c9b56ea5d66a44a5bd228..c23b461b4795b1ee8617a074179bf0a280d35cb4 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -156,6 +156,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SCH_RET(schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode)); } + pTask->redirectCtx.inRedirect = false; + switch (msgType) { case TDMT_VND_COMMIT_RSP: { SCH_ERR_JRET(rspCode); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index cd69834924d69b43e29ae71aa1ab0ec6f9245f77..8e60222ca6d4d53b189cffaf05fbae8c7f5dcdde 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -362,17 +362,12 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, } pCtx->totalTimes++; + pCtx->roundTimes++; if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) { pCtx->roundTotal = pEpSet->numOfEps; - pCtx->roundTimes = 0; - - pTask->delayExecMs = 0; - - goto _return; } - pCtx->roundTimes++; if (pCtx->roundTimes >= pCtx->roundTotal) { int64_t nowTs = taosGetTimestampMs(); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index f198f3809d7eaa307a189487e74077009f64bb4d..be14ef91a40ef6b10c76a5bcf34ca8953e00bed1 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -79,7 +79,6 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len); void syncUtilMsgHtoN(void* msg); void syncUtilMsgNtoH(void* msg); bool syncUtilUserPreCommit(tmsg_t msgType); -bool syncUtilUserCommit(tmsg_t msgType); bool syncUtilUserRollback(tmsg_t msgType); void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e2887f7b1249cb181bcd37f0d23105c7679e5ac8..f4caf0e9c7956dd6513ac4ffbca7db77e8420a90 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -791,9 +791,9 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { } int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { - tAssert(pNode->pLogStore != NULL && "log store not created"); - tAssert(pNode->pFsm != NULL && "pFsm not registered"); - tAssert(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered"); + tAssertS(pNode->pLogStore != NULL, "log store not created"); + tAssertS(pNode->pFsm != NULL, "pFsm not registered"); + tAssertS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); SSnapshot snapshot; if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) { sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr()); @@ -1144,8 +1144,8 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { } int32_t syncNodeRestore(SSyncNode* pSyncNode) { - tAssert(pSyncNode->pLogStore != NULL && "log store not created"); - tAssert(pSyncNode->pLogBuf != NULL && "ring log buffer not created"); + tAssertS(pSyncNode->pLogStore != NULL, "log store not created"); + tAssertS(pSyncNode->pLogBuf != NULL, "ring log buffer not created"); SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore); @@ -2663,7 +2663,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn int32_t code = syncNodeAppend(ths, pEntry); if (code < 0 && ths->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) { - tAssert(false && "failed to append blocking msg"); + tAssertS(false, "failed to append blocking msg"); } return code; } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 4a69b263a4debfc3bc5fa4c74c5e25866a26091b..5caacd38a44c6868094348b4b131158d64375694 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -50,7 +50,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt // initial log buffer with at least one item, e.g. commitIndex SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem; - tAssert(pMatch != NULL && "no matched log entry"); + tAssertS(pMatch != NULL, "no matched log entry"); tAssert(pMatch->index + 1 == index); SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; @@ -86,14 +86,14 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S if (prevIndex >= pBuf->startIndex) { pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem; - tAssert(pEntry != NULL && "no log entry found"); + tAssertS(pEntry != NULL, "no log entry found"); prevLogTerm = pEntry->term; return prevLogTerm; } if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) { int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs; - tAssert(timeMs != 0 && "no log entry found"); + tAssertS(timeMs != 0, "no log entry found"); prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term; tAssert(prevIndex == 0 || prevLogTerm != 0); return prevLogTerm; @@ -141,9 +141,9 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex } int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { - tAssert(pNode->pLogStore != NULL && "log store not created"); - tAssert(pNode->pFsm != NULL && "pFsm not registered"); - tAssert(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered"); + tAssertS(pNode->pLogStore != NULL, "log store not created"); + tAssertS(pNode->pFsm != NULL, "pFsm not registered"); + tAssertS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); SSnapshot snapshot; if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) { @@ -437,7 +437,7 @@ _out: } int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) { - tAssert(pFsm->FpCommitCb != NULL && "No commit cb registered for the FSM"); + tAssertS(pFsm->FpCommitCb != NULL, "No commit cb registered for the FSM"); if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) { return 0; @@ -513,13 +513,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm if (!syncUtilUserCommit(pEntry->originalRpcType)) { sInfo("vgId:%d, commit sync barrier. index: %" PRId64 ", term:%" PRId64 ", type: %s", vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType)); - pBuf->commitIndex = index; - if (!inBuf) { - syncEntryDestroy(pEntry); - pEntry = NULL; - } - continue; } + if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) { sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 ", role: %d, current term: %" PRId64, @@ -905,7 +900,7 @@ int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) { tAssert(pNode->logReplMgrs[i] == NULL); pNode->logReplMgrs[i] = syncLogReplMgrCreate(); pNode->logReplMgrs[i]->peerId = i; - tAssert(pNode->logReplMgrs[i] != NULL && "Out of memory."); + tAssertS(pNode->logReplMgrs[i] != NULL, "Out of memory."); } return 0; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index e2abb9b6db272e89794ffda08cb184b63e329fda..6cd5c8ede01f2f442bc207b7e2ebe6dbe58ade66 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -160,8 +160,6 @@ void syncUtilMsgNtoH(void* msg) { bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; } -bool syncUtilUserCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; } - bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; } void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 73c6dc4011f5d1e4baf9e29f81246ff4b0b1f0c5..2b1f68d5f64952cbd933e3c20c1c507c2f2be966 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1195,6 +1195,8 @@ void transCloseServer(void* arg) { sendQuitToWorkThrd(srv->pThreadObj[i]); destroyWorkThrd(srv->pThreadObj[i]); } + } else { + uv_loop_close(srv->loop); } taosMemoryFree(srv->pThreadObj); diff --git a/tests/script/tsim/vnode/replica3_repeat.sim b/tests/script/tsim/vnode/replica3_repeat.sim index 8efba515ae5778afdf6cd3f36079d4426741601d..10e18d01d07eda4957fa98050496d4b06f81126a 100644 --- a/tests/script/tsim/vnode/replica3_repeat.sim +++ b/tests/script/tsim/vnode/replica3_repeat.sim @@ -85,6 +85,7 @@ print ======== step3 system sh/exec.sh -n dnode2 -s stop sleep 3000 +$t = 0 $x = 0 loop: @@ -126,8 +127,8 @@ print ======== step8 $lastRows = $data00 print ======== loop Times $x -if $x < 2 then - $x = $x + 1 +if $t < 2 then + $t = $t + 1 goto loop endi @@ -138,4 +139,4 @@ system sh/exec.sh -n dnode4 -s stop -x SIGINT system sh/exec.sh -n dnode5 -s stop -x SIGINT system sh/exec.sh -n dnode6 -s stop -x SIGINT system sh/exec.sh -n dnode7 -s stop -x SIGINT -system sh/exec.sh -n dnode8 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode8 -s stop -x SIGINT