提交 4172acf9 编写于 作者: S Shengliang Guan

git lo

......@@ -60,7 +60,7 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev
为了在 Ubuntu/Debian 系统上编译 [taos-tools](https://github.com/taosdata/taos-tools) 需要安装如下软件:
```bash
sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev pkg-config
sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config
```
### CentOS 7.9
......@@ -85,7 +85,7 @@ sudo dnf install -y gcc gcc-c++ make cmake epel-release git openssl-devel
```
sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel
sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel
```
#### CentOS 8/Rocky Linux
......@@ -94,7 +94,7 @@ sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgco
sudo yum install -y epel-release
sudo yum install -y dnf-plugins-core
sudo yum config-manager --set-enabled powertools
sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel
sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel
```
注意:由于 snappy 缺乏 pkg-config 支持(参考 [链接](https://github.com/google/snappy/pull/86)),会导致 cmake 提示无法发现 libsnappy,实际上工作正常。
......
......@@ -62,7 +62,7 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev
To build the [taosTools](https://github.com/taosdata/taos-tools) on Ubuntu/Debian, the following packages need to be installed.
```bash
sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev pkg-config
sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config
```
### CentOS 7.9
......@@ -85,7 +85,7 @@ sudo dnf install -y gcc gcc-c++ make cmake epel-release git openssl-devel
#### CentOS 7.9
```
sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel
sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel
```
#### CentOS 8/Rocky Linux
......@@ -94,7 +94,7 @@ sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgco
sudo yum install -y epel-release
sudo yum install -y dnf-plugins-core
sudo yum config-manager --set-enabled powertools
sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel
sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel
```
Note: Since snappy lacks pkg-config support (refer to [link](https://github.com/google/snappy/pull/86)), it leads a cmake prompt libsnappy not found. But snappy still works well.
......
......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG e62c5ea
GIT_TAG cac24d3
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -70,6 +70,11 @@ static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL);
}
static inline bool syncUtilUserCommit(tmsg_t msgType) {
return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER;
}
/* ------------------------ OTHER DEFINITIONS ------------------------ */
// IE type
#define TSDB_IE_TYPE_SEC 1
......
......@@ -234,7 +234,6 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN,
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT,
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN,
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
......@@ -265,7 +264,8 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT,
QUERY_NODE_PHYSICAL_PLAN_DELETE,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN
QUERY_NODE_PHYSICAL_PLAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN
} ENodeType;
/**
......
......@@ -796,9 +796,10 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c
SQuery *pQuery = pRequest->pQuery;
pRequest->metric.ctgEnd = taosGetTimestampUs();
qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId);
qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, tstrerror(code));
if (code == TSDB_CODE_SUCCESS) {
pWrapper->pCatalogReq->forceUpdate = false;
code = qContinueParseSql(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
}
......
......@@ -408,7 +408,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfQnodeQueryThreads = tsNumOfCores * 2;
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 4, 1024, 0) != 0) return -1;
// tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
......
......@@ -50,7 +50,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
static bool dmFailFastFp(tmsg_t msgType) {
// add more msg type later
return msgType == TDMT_SYNC_HEARTBEAT;
return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
}
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
......@@ -301,6 +301,7 @@ int32_t dmInitServer(SDnode *pDnode) {
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.parent = pDnode;
rpcInit.compressSize = tsCompressMsgSize;
pTrans->serverRpc = rpcOpen(&rpcInit);
if (pTrans->serverRpc == NULL) {
......
......@@ -119,7 +119,13 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
}
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
int32_t code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
int32_t code = 0;
if (!syncUtilUserCommit(pMsg->msgType)) {
goto _out;
}
code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
_out:
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return code;
......
......@@ -454,7 +454,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
memcpy(pBuf + sizeof(suid), pNode->data, keyLen);
memcpy(&pBuf[1], pNode->data, keyLen);
// check whether it is existed in LRU cache, and remove it from linked list if not.
LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len);
......
......@@ -53,6 +53,7 @@ typedef struct {
// --------------
TSKEY nextKey; // reset by each table commit
int32_t commitFid;
int32_t expLevel;
TSKEY minKey;
TSKEY maxKey;
// commit file data
......@@ -503,6 +504,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
// memory
pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
pCommitter->expLevel = tsdbFidLevel(pCommitter->commitFid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
&pCommitter->maxKey);
#if 0
......@@ -556,7 +558,10 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
}
} else {
SDiskID did = {0};
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &did) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
wSet.diskId = did;
wSet.nSttF = 1;
......
......@@ -190,9 +190,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
version);
tAssert(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
tAssert(pVnode->state.applied + 1 == version);
pVnode->state.applied = version;
pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;
// skip header
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
len = pMsg->contLen - sizeof(SMsgHead);
......
......@@ -547,6 +547,14 @@ typedef struct SCtgOperation {
#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgTaskFatal(param, ...) qFatal("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskError(param, ...) qError("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskWarn(param, ...) qWarn("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskInfo(param, ...) qInfo("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskDebug(param, ...) qDebug("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskTrace(param, ...) qTrace("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define CTG_LOCK_DEBUG(...) \
do { \
if (gCTGDebug.lockEnable) { \
......
......@@ -1094,6 +1094,9 @@ _return:
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
if (code) {
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code));
}
if (pTask->res || code) {
ctgHandleTaskEnd(pTask, code);
}
......@@ -1124,7 +1127,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, pName, &vgInfo));
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
ctgTaskDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
*vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
......@@ -1144,7 +1147,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo));
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
ctgTaskDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
*vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
......@@ -1162,7 +1165,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
return TSDB_CODE_SUCCESS;
}
ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
ctgTaskError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
ctgRemoveTbMetaFromCache(pCtg, pName, false);
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
......@@ -1180,7 +1183,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
if (CTG_IS_META_NULL(pOut->metaType)) {
ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName));
ctgTaskError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName));
ctgRemoveTbMetaFromCache(pCtg, pName, false);
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
}
......@@ -1190,7 +1193,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
}
if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) {
ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
ctgTaskDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
taosMemoryFreeClear(pOut->tbMeta);
......@@ -1207,11 +1210,11 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
STableMeta* stbMeta = NULL;
(void)ctgReadTbMetaFromCache(pCtg, &stbCtx, &stbMeta);
if (stbMeta && stbMeta->sversion >= pOut->tbMeta->sversion) {
ctgDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName));
ctgTaskDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName));
exist = 1;
taosMemoryFreeClear(stbMeta);
} else {
ctgDebug("need to get/update stb meta, tbName:%s", tNameGetTableName(pName));
ctgTaskDebug("need to get/update stb meta, tbName:%s", tNameGetTableName(pName));
taosMemoryFreeClear(pOut->tbMeta);
taosMemoryFreeClear(stbMeta);
}
......@@ -1225,7 +1228,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
break;
}
default:
ctgError("invalid reqType %d", reqType);
ctgTaskError("invalid reqType %d", reqType);
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
}
......@@ -1280,6 +1283,7 @@ _return:
TSWAP(pTask->res, ctx->pResList);
taskDone = true;
}
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code));
}
if (pTask->res && taskDone) {
......
......@@ -95,6 +95,8 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
// TODO: optimize to ignore null values for linear interpolation.
if (!pLinearInfo->isStartSet) {
if (!colDataIsNull_s(pColInfoData, rowIndex)) {
tAssert(IS_MATHABLE_TYPE(pColInfoData->info.type));
pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
memcpy(pLinearInfo->start.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes);
}
......
......@@ -44,6 +44,7 @@ typedef struct SInsertParseContext {
SParsedDataColInfo tags; // for stmt
bool missCache;
bool usingDuplicateTable;
bool forceUpdate;
} SInsertParseContext;
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
......@@ -829,6 +830,11 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo
}
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
if (pCxt->forceUpdate) {
pCxt->missCache = true;
return TSDB_CODE_SUCCESS;
}
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
......@@ -844,6 +850,11 @@ static int32_t preParseUsingTableName(SInsertParseContext* pCxt, SVnodeModifOpSt
}
static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
if (pCxt->forceUpdate) {
pCxt->missCache = true;
return TSDB_CODE_SUCCESS;
}
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTableMeta(pCxt, &pStmt->usingTableName, true, &pStmt->pTableMeta, &pCxt->missCache);
......@@ -1909,6 +1920,7 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
.msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
.missCache = false,
.usingDuplicateTable = false,
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)
};
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
......
......@@ -377,7 +377,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \
(SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task))))
(SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)) || (_task)->redirectCtx.inRedirect))
#define SCH_REDIRECT_MSGTYPE(_msgType) \
((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \
(_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
......
......@@ -156,6 +156,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_RET(schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode));
}
pTask->redirectCtx.inRedirect = false;
switch (msgType) {
case TDMT_VND_COMMIT_RSP: {
SCH_ERR_JRET(rspCode);
......
......@@ -362,17 +362,12 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet,
}
pCtx->totalTimes++;
pCtx->roundTimes++;
if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
pCtx->roundTotal = pEpSet->numOfEps;
pCtx->roundTimes = 0;
pTask->delayExecMs = 0;
goto _return;
}
pCtx->roundTimes++;
if (pCtx->roundTimes >= pCtx->roundTotal) {
int64_t nowTs = taosGetTimestampMs();
......
......@@ -79,7 +79,6 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len);
void syncUtilMsgHtoN(void* msg);
void syncUtilMsgNtoH(void* msg);
bool syncUtilUserPreCommit(tmsg_t msgType);
bool syncUtilUserCommit(tmsg_t msgType);
bool syncUtilUserRollback(tmsg_t msgType);
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...);
......
......@@ -791,9 +791,9 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
}
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
tAssert(pNode->pLogStore != NULL && "log store not created");
tAssert(pNode->pFsm != NULL && "pFsm not registered");
tAssert(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered");
tAssertS(pNode->pLogStore != NULL, "log store not created");
tAssertS(pNode->pFsm != NULL, "pFsm not registered");
tAssertS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
SSnapshot snapshot;
if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) {
sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr());
......@@ -1144,8 +1144,8 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
}
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
tAssert(pSyncNode->pLogStore != NULL && "log store not created");
tAssert(pSyncNode->pLogBuf != NULL && "ring log buffer not created");
tAssertS(pSyncNode->pLogStore != NULL, "log store not created");
tAssertS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
......@@ -2663,7 +2663,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
int32_t code = syncNodeAppend(ths, pEntry);
if (code < 0 && ths->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) {
tAssert(false && "failed to append blocking msg");
tAssertS(false, "failed to append blocking msg");
}
return code;
}
......
......@@ -50,7 +50,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
// initial log buffer with at least one item, e.g. commitIndex
SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
tAssert(pMatch != NULL && "no matched log entry");
tAssertS(pMatch != NULL, "no matched log entry");
tAssert(pMatch->index + 1 == index);
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
......@@ -86,14 +86,14 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S
if (prevIndex >= pBuf->startIndex) {
pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
tAssert(pEntry != NULL && "no log entry found");
tAssertS(pEntry != NULL, "no log entry found");
prevLogTerm = pEntry->term;
return prevLogTerm;
}
if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
tAssert(timeMs != 0 && "no log entry found");
tAssertS(timeMs != 0, "no log entry found");
prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
tAssert(prevIndex == 0 || prevLogTerm != 0);
return prevLogTerm;
......@@ -141,9 +141,9 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex
}
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
tAssert(pNode->pLogStore != NULL && "log store not created");
tAssert(pNode->pFsm != NULL && "pFsm not registered");
tAssert(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered");
tAssertS(pNode->pLogStore != NULL, "log store not created");
tAssertS(pNode->pFsm != NULL, "pFsm not registered");
tAssertS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
SSnapshot snapshot;
if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) {
......@@ -437,7 +437,7 @@ _out:
}
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) {
tAssert(pFsm->FpCommitCb != NULL && "No commit cb registered for the FSM");
tAssertS(pFsm->FpCommitCb != NULL, "No commit cb registered for the FSM");
if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) {
return 0;
......@@ -513,13 +513,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
if (!syncUtilUserCommit(pEntry->originalRpcType)) {
sInfo("vgId:%d, commit sync barrier. index: %" PRId64 ", term:%" PRId64 ", type: %s", vgId, pEntry->index,
pEntry->term, TMSG_INFO(pEntry->originalRpcType));
pBuf->commitIndex = index;
if (!inBuf) {
syncEntryDestroy(pEntry);
pEntry = NULL;
}
continue;
}
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) {
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
", role: %d, current term: %" PRId64,
......@@ -905,7 +900,7 @@ int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) {
tAssert(pNode->logReplMgrs[i] == NULL);
pNode->logReplMgrs[i] = syncLogReplMgrCreate();
pNode->logReplMgrs[i]->peerId = i;
tAssert(pNode->logReplMgrs[i] != NULL && "Out of memory.");
tAssertS(pNode->logReplMgrs[i] != NULL, "Out of memory.");
}
return 0;
}
......
......@@ -160,8 +160,6 @@ void syncUtilMsgNtoH(void* msg) {
bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
bool syncUtilUserCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
......
......@@ -1195,6 +1195,8 @@ void transCloseServer(void* arg) {
sendQuitToWorkThrd(srv->pThreadObj[i]);
destroyWorkThrd(srv->pThreadObj[i]);
}
} else {
uv_loop_close(srv->loop);
}
taosMemoryFree(srv->pThreadObj);
......
......@@ -85,6 +85,7 @@ print ======== step3
system sh/exec.sh -n dnode2 -s stop
sleep 3000
$t = 0
$x = 0
loop:
......@@ -126,8 +127,8 @@ print ======== step8
$lastRows = $data00
print ======== loop Times $x
if $x < 2 then
$x = $x + 1
if $t < 2 then
$t = $t + 1
goto loop
endi
......@@ -138,4 +139,4 @@ system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode8 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册