From 566b749cc5198a07eb13eef2a0374a9aa5d84c6c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 20 Nov 2020 12:32:42 +0800 Subject: [PATCH] TD-2046 --- src/mnode/inc/mnodeSdb.h | 5 +- src/mnode/src/mnodeSdb.c | 256 ++++++++++++++---------------------- src/mnode/src/mnodeTable.c | 2 +- src/mnode/src/mnodeVgroup.c | 2 +- src/sync/src/syncRestore.c | 2 +- tests/script/sh/deploy.sh | 36 ++--- 6 files changed, 125 insertions(+), 178 deletions(-) diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index 04fee5f6d5..965baf7c0d 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "mnode.h" +#include "twal.h" struct SSdbTable; @@ -60,6 +61,8 @@ typedef struct SSWriteMsg { void * pRow; SMnodeMsg *pMsg; struct SSdbTable *pTable; + char reserveForSync[16]; + SWalHead pHead[]; } SSWriteMsg; typedef struct { @@ -89,7 +92,7 @@ void sdbUpdateMnodeRoles(); int32_t sdbInsertRow(SSWriteMsg *pWrite); int32_t sdbDeleteRow(SSWriteMsg *pWrite); int32_t sdbUpdateRow(SSWriteMsg *pWrite); -int32_t sdbInsertRowImp(SSWriteMsg *pWrite); +int32_t sdbInsertRowToQueue(SSWriteMsg *pWrite); void *sdbGetRow(void *pTable, void *key); void *sdbFetchRow(void *pTable, void *pIter, void **ppRow); diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 12cf5757b3..15e638d436 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -34,7 +34,7 @@ #include "mnodeSdb.h" #define SDB_TABLE_LEN 12 -#define SDB_SYNC_HACK 16 +#define MAX_QUEUED_MSG_NUM 10000 typedef enum { SDB_ACTION_INSERT = 0, @@ -82,6 +82,7 @@ typedef struct { int64_t sync; void * wal; SSyncCfg cfg; + int32_t queuedMsg; int32_t numOfTables; SSdbTable *tableList[SDB_TABLE_MAX]; pthread_mutex_t mutex; @@ -105,16 +106,14 @@ static taos_qall tsSdbWQall; static taos_queue tsSdbWQueue; static SSdbWorkerPool tsSdbPool; -static int32_t sdbWrite(void *pWrite, void *pHead, int32_t qtype, void *unused); -static int32_t sdbWriteToQueue(void *pWrite, void *pHead, int32_t qtype, void *unused); +static int32_t sdbProcessWrite(void *pWrite, void *pHead, int32_t qtype, void *unused); +static int32_t sdbWriteWalToQueue(void *vparam, void *pHead, int32_t qtype, void *rparam); +static int32_t sdbWriteRowToQueue(SSWriteMsg *pInputWrite, int32_t action); static void * sdbWorkerFp(void *pWorker); static int32_t sdbInitWorker(); static void sdbCleanupWorker(); static int32_t sdbAllocQueue(); static void sdbFreeQueue(); -extern int32_t sdbInsertRowImp(SSWriteMsg *pWrite); -static int32_t sdbUpdateRowImp(SSWriteMsg *pWrite); -static int32_t sdbDeleteRowImp(SSWriteMsg *pWrite); static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite); static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite); static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite); @@ -181,7 +180,7 @@ static int32_t sdbInitWal() { } sdbInfo("vgId:1, open wal for restore"); - int code = walRestore(tsSdbMgmt.wal, NULL, sdbWrite); + int32_t code = walRestore(tsSdbMgmt.wal, NULL, sdbProcessWrite); if (code != TSDB_CODE_SUCCESS) { sdbError("vgId:1, failed to open wal for restore since %s", tstrerror(code)); return -1; @@ -250,7 +249,7 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { // failed to forward, need revert insert static void sdbHandleFailedConfirm(SSWriteMsg *pWrite) { - SWalHead *pHead = (SWalHead *)((char *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK); + SWalHead *pHead = pWrite->pHead; int32_t action = pHead->msgType % 10; sdbError("vgId:1, row:%p:%s hver:%" PRIu64 " action:%s, failed to foward since %s", pWrite->pRow, @@ -285,13 +284,6 @@ static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) { } dnodeSendRpcMWriteRsp(pMsg, pWrite->code); - - // if ahandle, means this func is called by sdb write - if (ahandle == NULL) { - sdbDecRef(pWrite->pTable, pWrite->pRow); - } - - taosFreeQitem(pWrite); } static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(NULL); } @@ -379,7 +371,7 @@ void sdbUpdateSync(void *pMnodes) { syncInfo.ahandle = NULL; syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getFileInfo = sdbGetFileInfo; - syncInfo.writeToCache = sdbWriteToQueue; + syncInfo.writeToCache = sdbWriteWalToQueue; syncInfo.confirmForward = sdbConfirmForward; syncInfo.notifyRole = sdbNotifyRole; tsSdbMgmt.cfg = syncCfg; @@ -389,6 +381,7 @@ void sdbUpdateSync(void *pMnodes) { } else { tsSdbMgmt.sync = syncStart(&syncInfo); } + sdbUpdateMnodeRoles(); } @@ -565,7 +558,7 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite) { return TSDB_CODE_SUCCESS; } -static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) { +static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unused) { SSWriteMsg *pWrite = wparam; SWalHead *pHead = hparam; int32_t tableId = pHead->msgType / 10; @@ -665,8 +658,7 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) { if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (sdbGetRowFromObj(pTable, pWrite->pRow)) { - sdbError("vgId:1, sdb:%s, failed to insert key:%s since it already exist", pTable->name, - sdbGetRowStr(pTable, pWrite->pRow)); + sdbError("vgId:1, sdb:%s, failed to insert:%s since it exist", pTable->name, sdbGetRowStr(pTable, pWrite->pRow)); sdbDecRef(pTable, pWrite->pRow); return TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE; } @@ -675,14 +667,14 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) { *((uint32_t *)pWrite->pRow) = atomic_add_fetch_32(&pTable->autoIndex, 1); // let vgId increase from 2 - if (pTable->autoIndex == 1 && strcmp(pTable->name, "vgroups") == 0) { + if (pTable->autoIndex == 1 && pTable->id == SDB_TABLE_VGROUP) { *((uint32_t *)pWrite->pRow) = atomic_add_fetch_32(&pTable->autoIndex, 1); } } int32_t code = sdbInsertHash(pTable, pWrite); if (code != TSDB_CODE_SUCCESS) { - sdbError("vgId:1, sdb:%s, failed to insert into hash", pTable->name); + sdbError("vgId:1, sdb:%s, failed to insert:%s into hash", pTable->name, sdbGetRowStr(pTable, pWrite->pRow)); return code; } @@ -694,37 +686,8 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) { if (pWrite->fpReq) { return (*pWrite->fpReq)(pWrite->pMsg); } else { - return sdbInsertRowImp(pWrite); - } -} - -int32_t sdbInsertRowImp(SSWriteMsg *pWrite) { - SSdbTable *pTable = pWrite->pTable; - if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; - - int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; - SSWriteMsg *pNewWrite = taosAllocateQitem(size); - - SWalHead *pHead = (SWalHead *)((char *)pNewWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK); - pHead->version = 0; - pHead->len = pWrite->rowSize; - pHead->msgType = pTable->id * 10 + SDB_ACTION_INSERT; - - pWrite->rowData = pHead->cont; - (*pTable->fpEncode)(pWrite); - pHead->len = pWrite->rowSize; - - memcpy(pNewWrite, pWrite, sizeof(SSWriteMsg)); - - if (pNewWrite->pMsg != NULL) { - sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, insert action is add to sdb queue", pNewWrite->pMsg->rpcMsg.ahandle, - pNewWrite->pMsg, pTable->name, pWrite->pRow, sdbGetRowStr(pTable, pWrite->pRow)); + return sdbWriteRowToQueue(pWrite, SDB_ACTION_INSERT); } - - sdbIncRef(pNewWrite->pTable, pNewWrite->pRow); - taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewWrite); - - return TSDB_CODE_MND_ACTION_IN_PROGRESS; } bool sdbCheckRowDeleted(void *tparam, void *pRow) { @@ -745,55 +708,24 @@ int32_t sdbDeleteRow(SSWriteMsg *pWrite) { return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; } - sdbIncRef(pTable, pWrite->pRow); - int32_t code = sdbDeleteHash(pTable, pWrite); if (code != TSDB_CODE_SUCCESS) { sdbError("vgId:1, sdb:%s, failed to delete from hash", pTable->name); - sdbDecRef(pTable, pWrite->pRow); return code; } // just delete data from memory if (pWrite->type != SDB_OPER_GLOBAL) { - sdbDecRef(pTable, pWrite->pRow); return TSDB_CODE_SUCCESS; } if (pWrite->fpReq) { return (*pWrite->fpReq)(pWrite->pMsg); } else { - return sdbDeleteRowImp(pWrite); + return sdbWriteRowToQueue(pWrite, SDB_ACTION_DELETE); } } -int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) { - SSdbTable *pTable = pWrite->pTable; - if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; - - int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; - SSWriteMsg *pNewWrite = taosAllocateQitem(size); - - SWalHead *pHead = (SWalHead *)((void *)pNewWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK); - pHead->version = 0; - pHead->msgType = pTable->id * 10 + SDB_ACTION_DELETE; - - pWrite->rowData = pHead->cont; - (*pTable->fpEncode)(pWrite); - pHead->len = pWrite->rowSize; - - memcpy(pNewWrite, pWrite, sizeof(SSWriteMsg)); - - if (pNewWrite->pMsg != NULL) { - sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, delete action is add to sdb queue", pNewWrite->pMsg->rpcMsg.ahandle, - pNewWrite->pMsg, pTable->name, pWrite->pRow, sdbGetRowStr(pTable, pWrite->pRow)); - } - - taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewWrite); - - return TSDB_CODE_MND_ACTION_IN_PROGRESS; -} - int32_t sdbUpdateRow(SSWriteMsg *pWrite) { SSdbTable *pTable = pWrite->pTable; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; @@ -818,38 +750,10 @@ int32_t sdbUpdateRow(SSWriteMsg *pWrite) { if (pWrite->fpReq) { return (*pWrite->fpReq)(pWrite->pMsg); } else { - return sdbUpdateRowImp(pWrite); + return sdbWriteRowToQueue(pWrite, SDB_ACTION_UPDATE); } } -int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) { - SSdbTable *pTable = pWrite->pTable; - if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; - - int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; - SSWriteMsg *pNewWrite = taosAllocateQitem(size); - - SWalHead *pHead = (SWalHead *)((void *)pNewWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK); - pHead->version = 0; - pHead->msgType = pTable->id * 10 + SDB_ACTION_UPDATE; - - pWrite->rowData = pHead->cont; - (*pTable->fpEncode)(pWrite); - pHead->len = pWrite->rowSize; - - memcpy(pNewWrite, pWrite, sizeof(SSWriteMsg)); - - if (pNewWrite->pMsg != NULL) { - sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, update action is add to sdb queue", pNewWrite->pMsg->rpcMsg.ahandle, - pNewWrite->pMsg, pTable->name, pWrite->pRow, sdbGetRowStr(pTable, pWrite->pRow)); - } - - sdbIncRef(pNewWrite->pTable, pNewWrite->pRow); - taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewWrite); - - return TSDB_CODE_MND_ACTION_IN_PROGRESS; -} - void *sdbFetchRow(void *tparam, void *pNode, void **ppRow) { SSdbTable *pTable = tparam; *ppRow = NULL; @@ -942,7 +846,7 @@ void sdbCloseTable(void *handle) { free(pTable); } -int32_t sdbInitWorker() { +static int32_t sdbInitWorker() { tsSdbPool.num = 1; tsSdbPool.worker = calloc(sizeof(SSdbWorker), tsSdbPool.num); @@ -958,7 +862,7 @@ int32_t sdbInitWorker() { return 0; } -void sdbCleanupWorker() { +static void sdbCleanupWorker() { for (int32_t i = 0; i < tsSdbPool.num; ++i) { SSdbWorker *pWorker = tsSdbPool.worker + i; if (pWorker->thread) { @@ -979,7 +883,7 @@ void sdbCleanupWorker() { mInfo("vgId:1, sdb write is closed"); } -int32_t sdbAllocQueue() { +static int32_t sdbAllocQueue() { tsSdbWQueue = taosOpenQueue(); if (tsSdbWQueue == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; @@ -1021,7 +925,7 @@ int32_t sdbAllocQueue() { return TSDB_CODE_SUCCESS; } -void sdbFreeQueue() { +static void sdbFreeQueue() { taosCloseQueue(tsSdbWQueue); taosFreeQall(tsSdbWQall); taosCloseQset(tsSdbWQset); @@ -1030,54 +934,96 @@ void sdbFreeQueue() { tsSdbWQueue = NULL; } -int32_t sdbWriteToQueue(void *wparam, void *hparam, int32_t qtype, void *unsed) { - SWalHead *pHead = hparam; - int32_t size = sizeof(SWalHead) + pHead->len; - SWalHead *pWal = taosAllocateQitem(size); - memcpy(pWal, pHead, size); +static int32_t sdbWriteToQueue(SSWriteMsg *pWrite, int32_t qtype) { + SWalHead *pHead = pWrite->pHead; - taosWriteQitem(tsSdbWQueue, qtype, pWal); - return 0; + if (pHead->len > TSDB_MAX_WAL_SIZE) { + sdbError("vgId:1, wal len:%d exceeds limit, hver:%" PRIu64, pHead->len, pHead->version); + taosFreeQitem(pWrite); + return TSDB_CODE_WAL_SIZE_LIMIT; + } + + int32_t queued = atomic_add_fetch_32(&tsSdbMgmt.queuedMsg, 1); + if (queued > MAX_QUEUED_MSG_NUM) { + sdbDebug("vgId:1, too many msg:%d in sdb queue, flow control", queued); + taosMsleep(1); + } + + sdbIncRef(pWrite->pTable, pWrite->pRow); + + sdbTrace("vgId:1, msg:%p write into to sdb queue", pWrite->pMsg); + taosWriteQitem(tsSdbWQueue, qtype, pWrite); + + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +} + +static void sdbFreeFromQueue(SSWriteMsg *pWrite) { + int32_t queued = atomic_sub_fetch_32(&tsSdbMgmt.queuedMsg, 1); + sdbTrace("vgId:1, msg:%p free from sdb queue, queued:%d", pWrite->pMsg, queued); + + sdbDecRef(pWrite->pTable, pWrite->pRow); + taosFreeQitem(pWrite); +} + +static int32_t sdbWriteWalToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { + SWalHead *pHead = wparam; + + int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pHead->len; + SSWriteMsg *pWrite = taosAllocateQitem(size); + if (pWrite == NULL) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + return sdbWriteToQueue(pWrite, qtype); } +static int32_t sdbWriteRowToQueue(SSWriteMsg *pInputWrite, int32_t action) { + SSdbTable *pTable = pInputWrite->pTable; + if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; + + int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize; + SSWriteMsg *pWrite = taosAllocateQitem(size); + if (pWrite == NULL) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + memcpy(pWrite, pInputWrite, sizeof(SSWriteMsg)); + pWrite->processedCount = 1; + + SWalHead *pHead = pWrite->pHead; + pWrite->rowData = pHead->cont; + (*pTable->fpEncode)(pWrite); + + pHead->len = pWrite->rowSize; + pHead->version = 0; + pHead->msgType = pTable->id * 10 + action; + + return sdbWriteToQueue(pWrite, TAOS_QTYPE_RPC); +} + +int32_t sdbInsertRowToQueue(SSWriteMsg *pWrite) { return sdbWriteRowToQueue(pWrite, SDB_ACTION_INSERT); } + static void *sdbWorkerFp(void *pWorker) { - SWalHead *pHead; SSWriteMsg *pWrite; - int32_t qtype; - int32_t numOfMsgs; - void * item; - void * unUsed; + int32_t qtype; + void * unUsed; while (1) { - numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed); + int32_t numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed); if (numOfMsgs == 0) { sdbDebug("qset:%p, sdb got no message from qset, exiting", tsSdbWQset); break; } for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(tsSdbWQall, &qtype, &item); - if (qtype == TAOS_QTYPE_RPC) { - pWrite = (SSWriteMsg *)item; - pWrite->processedCount = 1; - pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK; - if (pWrite->pMsg != NULL) { - sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s hver:%" PRIu64 ", will be processed in sdb queue", - pWrite->pMsg->rpcMsg.ahandle, pWrite->pMsg, pWrite->pTable->name, pWrite->pRow, - sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version); - } - } else { - pHead = (SWalHead *)item; - pWrite = NULL; - } + taosGetQitem(tsSdbWQall, &qtype, (void **)&pWrite); + sdbTrace("vgId:1, msg:%p, row:%p hver:%" PRIu64 ", will be processed in sdb queue", pWrite->pMsg, pWrite->pRow, + pWrite->pHead->version); - int32_t code = sdbWrite(pWrite, pHead, qtype, NULL); - if (code > 0) code = 0; - if (pWrite) { - pWrite->code = code; - } else { - pHead->len = code; // hackway - } + pWrite->code = sdbProcessWrite((qtype == TAOS_QTYPE_RPC) ? pWrite : NULL, pWrite->pHead, qtype, NULL); + if (pWrite->code > 0) pWrite->code = 0; + + sdbTrace("vgId:1, msg:%p is processed in sdb queue, code:%x", pWrite->pMsg, pWrite->code); } walFsync(tsSdbMgmt.wal, true); @@ -1085,18 +1031,16 @@ static void *sdbWorkerFp(void *pWorker) { // browse all items, and process them one by one taosResetQitems(tsSdbWQall); for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(tsSdbWQall, &qtype, &item); + taosGetQitem(tsSdbWQall, &qtype, (void **)&pWrite); if (qtype == TAOS_QTYPE_RPC) { - pWrite = (SSWriteMsg *)item; sdbConfirmForward(NULL, pWrite, pWrite->code); } else if (qtype == TAOS_QTYPE_FWD) { - pHead = (SWalHead *)item; - syncConfirmForward(tsSdbMgmt.sync, pHead->version, pHead->len); - taosFreeQitem(item); + syncConfirmForward(tsSdbMgmt.sync, pWrite->pHead->version, pWrite->code); } else { - taosFreeQitem(item); } + + sdbFreeFromQueue(pWrite); } } diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index a90b573576..587c766e44 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -2418,7 +2418,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { .fpRsp = mnodeDoCreateChildTableCb }; - int32_t code = sdbInsertRowImp(&desc); + int32_t code = sdbInsertRowToQueue(&desc); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mnodeMsg->pTable = NULL; mnodeDestroyChildTable(pTable); diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 2f0caea57a..2eb11e1def 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -966,7 +966,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { .fpRsp = mnodeCreateVgroupCb }; - int32_t code = sdbInsertRowImp(&wmsg); + int32_t code = sdbInsertRowToQueue(&wmsg); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mnodeMsg->pVgroup = NULL; mnodeDestroyVgroup(pVgroup); diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index d3dbe3a32d..e7901e6eb8 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -156,7 +156,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { sDebug("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version); - if (lastVer != 0 && lastVer == pHead->version) { + if (lastVer == pHead->version) { sError("%s, failed to restore record, same hver:%" PRIu64 ", wal sync failed" PRIu64, pPeer->id, lastVer); break; } diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 591d7749ea..8fccb1442f 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -111,24 +111,24 @@ echo "serverPort ${NODE}" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG echo "debugFlag 0" >> $TAOS_CFG -echo "mDebugFlag 143" >> $TAOS_CFG -echo "sdbDebugFlag 143" >> $TAOS_CFG -echo "dDebugFlag 131" >> $TAOS_CFG -echo "vDebugFlag 131" >> $TAOS_CFG -echo "tsdbDebugFlag 131" >> $TAOS_CFG -echo "cDebugFlag 131" >> $TAOS_CFG -echo "jnidebugFlag 131" >> $TAOS_CFG -echo "odbcdebugFlag 131" >> $TAOS_CFG -echo "httpDebugFlag 131" >> $TAOS_CFG -echo "monitorDebugFlag 131" >> $TAOS_CFG -echo "mqttDebugFlag 131" >> $TAOS_CFG -echo "qdebugFlag 131" >> $TAOS_CFG -echo "rpcDebugFlag 131" >> $TAOS_CFG +echo "mDebugFlag 135" >> $TAOS_CFG +echo "sdbDebugFlag 135" >> $TAOS_CFG +echo "dDebugFlag 135" >> $TAOS_CFG +echo "vDebugFlag 135" >> $TAOS_CFG +echo "tsdbDebugFlag 135" >> $TAOS_CFG +echo "cDebugFlag 135" >> $TAOS_CFG +echo "jnidebugFlag 135" >> $TAOS_CFG +echo "odbcdebugFlag 135" >> $TAOS_CFG +echo "httpDebugFlag 135" >> $TAOS_CFG +echo "monitorDebugFlag 135" >> $TAOS_CFG +echo "mqttDebugFlag 135" >> $TAOS_CFG +echo "qdebugFlag 135" >> $TAOS_CFG +echo "rpcDebugFlag 135" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG -echo "udebugFlag 131" >> $TAOS_CFG -echo "sdebugFlag 143" >> $TAOS_CFG -echo "wdebugFlag 143" >> $TAOS_CFG -echo "cqdebugFlag 131" >> $TAOS_CFG +echo "udebugFlag 135" >> $TAOS_CFG +echo "sdebugFlag 135" >> $TAOS_CFG +echo "wdebugFlag 135" >> $TAOS_CFG +echo "cqdebugFlag 135" >> $TAOS_CFG echo "monitor 0" >> $TAOS_CFG echo "monitorInterval 1" >> $TAOS_CFG echo "http 0" >> $TAOS_CFG @@ -140,7 +140,7 @@ echo "clog 2" >> $TAOS_CFG #echo "cache 1" >> $TAOS_CFG echo "days 10" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG -echo "maxVgroupsPerDb 10" >> $TAOS_CFG +echo "maxVgroupsPerDb 4" >> $TAOS_CFG echo "minTablesPerVnode 4" >> $TAOS_CFG echo "maxTablesPerVnode 1000" >> $TAOS_CFG echo "tableIncStepPerVnode 10000" >> $TAOS_CFG -- GitLab