提交 faea22cc 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/stream_compression

此差异已折叠。
此差异已折叠。
...@@ -315,7 +315,7 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) { ...@@ -315,7 +315,7 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
return 0; return 0;
} }
mTrace("update cluster uptime to %" PRId64, clusterObj.upTime); mInfo("update cluster uptime to %" PRId64, clusterObj.upTime);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-uptime"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-uptime");
if (pTrans == NULL) return -1; if (pTrans == NULL) return -1;
......
...@@ -178,6 +178,8 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -178,6 +178,8 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer; SMqConsumerObj *pConsumer;
void *pIter = NULL; void *pIter = NULL;
mTrace("start to process mq timer");
// rebalance cannot be parallel // rebalance cannot be parallel
if (!mndRebTryStart()) { if (!mndRebTryStart()) {
mInfo("mq rebalance already in progress, do nothing"); mInfo("mq rebalance already in progress, do nothing");
......
...@@ -119,28 +119,30 @@ static void *mndThreadFp(void *param) { ...@@ -119,28 +119,30 @@ static void *mndThreadFp(void *param) {
lastTime++; lastTime++;
taosMsleep(100); taosMsleep(100);
if (mndGetStop(pMnode)) break; if (mndGetStop(pMnode)) break;
if (lastTime % 10 != 0) continue;
if (lastTime % (tsTtlPushInterval * 10) == 1) { int64_t sec = lastTime / 10;
if (sec % tsTtlPushInterval == 0) {
mndPullupTtl(pMnode); mndPullupTtl(pMnode);
} }
if (lastTime % (tsTransPullupInterval * 10) == 0) { if (sec % tsTransPullupInterval == 0) {
mndPullupTrans(pMnode); mndPullupTrans(pMnode);
} }
if (lastTime % (tsMqRebalanceInterval * 10) == 0) { if (sec % tsMqRebalanceInterval == 0) {
mndCalMqRebalance(pMnode); mndCalMqRebalance(pMnode);
} }
if (lastTime % (tsTelemInterval * 10) == ((tsTelemInterval - 1) * 10)) { if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
mndPullupTelem(pMnode); mndPullupTelem(pMnode);
} }
if (lastTime % (tsGrantHBInterval * 10) == 0) { if (sec % tsGrantHBInterval == 0) {
mndPullupGrant(pMnode); mndPullupGrant(pMnode);
} }
if ((lastTime % (tsUptimeInterval * 10)) == ((tsUptimeInterval - 1) * 10)) { if (sec % tsUptimeInterval == 0) {
mndIncreaseUpTime(pMnode); mndIncreaseUpTime(pMnode);
} }
} }
...@@ -399,15 +401,15 @@ void mndPreClose(SMnode *pMnode) { ...@@ -399,15 +401,15 @@ void mndPreClose(SMnode *pMnode) {
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0); atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
syncLeaderTransfer(pMnode->syncMgmt.sync); syncLeaderTransfer(pMnode->syncMgmt.sync);
/* #if 0
mInfo("vgId:1, mnode start leader transfer"); mInfo("vgId:1, mnode start leader transfer");
// wait for leader transfer finish // wait for leader transfer finish
while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) { while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
taosMsleep(10); taosMsleep(10);
mInfo("vgId:1, mnode waiting for leader transfer"); mInfo("vgId:1, mnode waiting for leader transfer");
} }
mInfo("vgId:1, mnode finish leader transfer"); mInfo("vgId:1, mnode finish leader transfer");
*/ #endif
} }
} }
......
...@@ -834,6 +834,8 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) { ...@@ -834,6 +834,8 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq); int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
int32_t contLen = reqLen + sizeof(SMsgHead); int32_t contLen = reqLen + sizeof(SMsgHead);
mInfo("start to process ttl timer");
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
......
...@@ -133,7 +133,7 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) { ...@@ -133,7 +133,7 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) {
if (taosSendHttpReport(tsTelemServer, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) { if (taosSendHttpReport(tsTelemServer, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) {
mError("failed to send telemetry report"); mError("failed to send telemetry report");
} else { } else {
mTrace("succeed to send telemetry report"); mInfo("succeed to send telemetry report");
} }
taosMemoryFree(pCont); taosMemoryFree(pCont);
} }
......
...@@ -1478,6 +1478,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans) { ...@@ -1478,6 +1478,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
} }
static int32_t mndProcessTransTimer(SRpcMsg *pReq) { static int32_t mndProcessTransTimer(SRpcMsg *pReq) {
mTrace("start to process trans timer");
mndTransPullup(pReq->info.node); mndTransPullup(pReq->info.node);
return 0; return 0;
} }
......
...@@ -221,14 +221,15 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -221,14 +221,15 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
return terrno; return terrno;
} }
SSdbRow *pOldRow = *ppOldRow; SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status; pOldRow->status = pRaw->status;
atomic_add_fetch_32(&pOldRow->refCount, 1);
sdbPrintOper(pSdb, pOldRow, "delete"); sdbPrintOper(pSdb, pOldRow, "delete");
taosHashRemove(hash, pOldRow->pObj, keySize); taosHashRemove(hash, pOldRow->pObj, keySize);
pSdb->tableVer[pOldRow->type]++;
taosThreadRwlockUnlock(pLock); taosThreadRwlockUnlock(pLock);
pSdb->tableVer[pOldRow->type]++;
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
sdbCheckRow(pSdb, pOldRow); sdbCheckRow(pSdb, pOldRow);
...@@ -317,7 +318,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { ...@@ -317,7 +318,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosThreadRwlockWrlock(pLock); taosThreadRwlockWrlock(pLock);
int32_t ref = atomic_load_32(&pRow->refCount); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "check"); sdbPrintOper(pSdb, pRow, "check");
if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) { if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
sdbFreeRow(pSdb, pRow, true); sdbFreeRow(pSdb, pRow, true);
......
...@@ -688,6 +688,8 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) { ...@@ -688,6 +688,8 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
} }
} while (true); } while (true);
walApplyVer(pVnode->pWal, pVnode->state.applied);
pVnode->restored = true; pVnode->restored = true;
vDebug("vgId:%d, sync restore finished", pVnode->config.vgId); vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
} }
......
...@@ -46,7 +46,7 @@ SRaftStore *raftStoreOpen(const char *path) { ...@@ -46,7 +46,7 @@ SRaftStore *raftStoreOpen(const char *path) {
ASSERT(pRaftStore->pFile != NULL); ASSERT(pRaftStore->pFile != NULL);
int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE); int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE);
ASSERT(len == RAFT_STORE_BLOCK_SIZE); ASSERT(len > 0);
ret = raftStoreDeserialize(pRaftStore, storeBuf, len); ret = raftStoreDeserialize(pRaftStore, storeBuf, len);
ASSERT(ret == 0); ASSERT(ret == 0);
......
...@@ -201,7 +201,7 @@ void *threadFunc(void *param) { ...@@ -201,7 +201,7 @@ void *threadFunc(void *param) {
int64_t t = pInfo->tableBeginIndex; int64_t t = pInfo->tableBeginIndex;
for (; t <= pInfo->tableEndIndex;) { for (; t <= pInfo->tableEndIndex;) {
// int64_t batch = (pInfo->tableEndIndex - t); // int64_t batch = (pInfo->tableEndIndex - t);
// batch = MIN(batch, batchNum); // batch = TMIN(batch, batchNum);
int32_t len = sprintf(qstr, "create table"); int32_t len = sprintf(qstr, "create table");
for (int32_t i = 0; i < batchNumOfTbl;) { for (int32_t i = 0; i < batchNumOfTbl;) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册