From 9d6e37762319eea1425cb83cb8cdbb6c82149e2f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 18 Nov 2020 14:25:16 +0800 Subject: [PATCH] TD-1898 --- src/mnode/src/mnodeSdb.c | 130 ++++++++++++++++++------------------ src/sync/inc/syncInt.h | 2 +- src/sync/src/syncMain.c | 31 +++++---- src/sync/src/syncRestore.c | 2 +- src/sync/src/syncRetrieve.c | 2 +- src/wal/src/walWrite.c | 4 +- 6 files changed, 86 insertions(+), 85 deletions(-) diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 003ecd0d24..9c11fa80e8 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -166,7 +166,7 @@ static char *sdbGetKeyStr(SSdbTable *pTable, void *key) { } } -static char *sdbGetKeyStrFromObj(SSdbTable *pTable, void *key) { +static char *sdbGetObjStr(SSdbTable *pTable, void *key) { return sdbGetKeyStr(pTable, sdbGetObjKey(pTable, key)); } @@ -176,18 +176,18 @@ static void *sdbGetTableFromId(int32_t tableId) { static int32_t sdbInitWal() { SWalCfg walCfg = {.vgId = 1, .walLevel = TAOS_WAL_FSYNC, .keep = TAOS_WAL_KEEP, .fsyncPeriod = 0}; - char temp[TSDB_FILENAME_LEN]; + char temp[TSDB_FILENAME_LEN] = {0}; sprintf(temp, "%s/wal", tsMnodeDir); tsSdbObj.wal = walOpen(temp, &walCfg); if (tsSdbObj.wal == NULL) { - sdbError("failed to open sdb wal in %s", tsMnodeDir); + sdbError("vgId:1, failed to open wal in %s", tsMnodeDir); return -1; } - sdbInfo("open sdb wal for restore"); + sdbInfo("vgId:1, open wal for restore"); int code = walRestore(tsSdbObj.wal, NULL, sdbWrite); if (code != TSDB_CODE_SUCCESS) { - sdbError("failed to open wal for restore, reason:%s", tstrerror(code)); + sdbError("vgId:1, failed to open wal for restore since %s", tstrerror(code)); return -1; } return 0; @@ -205,10 +205,10 @@ static void sdbRestoreTables() { totalRows += pTable->numOfRows; numOfTables++; - sdbDebug("table:%s, is restored, numOfRows:%" PRId64, pTable->tableName, pTable->numOfRows); + sdbDebug("vgId:1, sdb:%s is restored, rows:%" PRId64, pTable->tableName, pTable->numOfRows); } - sdbInfo("sdb is restored, ver:%" PRId64 " totalRows:%d numOfTables:%d", tsSdbObj.version, totalRows, numOfTables); + sdbInfo("vgId:1, sdb is restored, mver:%" PRIu64 " rows:%d tables:%d", tsSdbObj.version, totalRows, numOfTables); } void sdbUpdateMnodeRoles() { @@ -217,12 +217,12 @@ void sdbUpdateMnodeRoles() { SNodesRole roles = {0}; syncGetNodesRole(tsSdbObj.sync, &roles); - sdbInfo("update mnodes sync roles, total:%d", tsSdbObj.cfg.replica); + sdbInfo("vgId:1, update mnodes roles, replica:%d", tsSdbObj.cfg.replica); for (int32_t i = 0; i < tsSdbObj.cfg.replica; ++i) { SMnodeObj *pMnode = mnodeGetMnode(roles.nodeId[i]); if (pMnode != NULL) { pMnode->role = roles.role[i]; - sdbInfo("mnode:%d, role:%s", pMnode->mnodeId, mnodeGetMnodeRoleStr(pMnode->role)); + sdbInfo("vgId:1, mnode:%d, role:%s", pMnode->mnodeId, mnodeGetMnodeRoleStr(pMnode->role)); if (pMnode->mnodeId == dnodeGetDnodeId()) tsSdbObj.role = pMnode->role; mnodeDecMnodeRef(pMnode); } @@ -242,7 +242,7 @@ static int32_t sdbGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) { } static void sdbNotifyRole(void *ahandle, int8_t role) { - sdbInfo("mnode role changed from %s to %s", mnodeGetMnodeRoleStr(tsSdbObj.role), mnodeGetMnodeRoleStr(role)); + sdbInfo("vgId:1, mnode role changed from %s to %s", mnodeGetMnodeRoleStr(tsSdbObj.role), mnodeGetMnodeRoleStr(role)); if (role == TAOS_SYNC_ROLE_MASTER && tsSdbObj.role != TAOS_SYNC_ROLE_MASTER) { balanceReset(); @@ -262,24 +262,21 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { int32_t processedCount = atomic_add_fetch_32(&pOper->processedCount, 1); if (processedCount <= 1) { if (pMsg != NULL) { - sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d result:%s", pMsg->rpcMsg.ahandle, pMsg, - processedCount, tstrerror(code)); + sdbDebug("vgId:1, msg:%p waiting for confirm, count:%d code:%x", pMsg, processedCount, code); } return; } if (pMsg != NULL) { - sdbDebug("app:%p:%p, is confirmed and will do callback func, result:%s", pMsg->rpcMsg.ahandle, pMsg, - tstrerror(code)); + sdbDebug("vgId:1, msg:%p is confirmed, code:%x", pMsg, code); } // failed to forward, need revert insert if (pOper->retCode != TSDB_CODE_SUCCESS) { SWalHead *pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK; int32_t action = pHead->msgType % 10; - sdbError("table:%s record:%p:%s ver:%" PRIu64 ", action:%d failed to foward reason:%s", - ((SSdbTable *)pOper->table)->tableName, pOper->pObj, sdbGetKeyStr(pOper->table, pHead->cont), - pHead->version, action, tstrerror(pOper->retCode)); + sdbError("vgId:1, key:%p:%s hver:%" PRIu64 " action:%d, failed to foward since %s", pOper->pObj, + sdbGetKeyStr(pOper->table, pHead->cont), pHead->version, action, tstrerror(pOper->retCode)); if (action == SDB_ACTION_INSERT) { // It's better to create a table in two stages, create it first and then set it success //sdbDeleteHash(pOper->table, pOper); @@ -314,11 +311,11 @@ void sdbUpdateAsync() { void sdbUpdateSync(void *pMnodes) { SMnodeInfos *mnodes = pMnodes; if (!mnodeIsRunning()) { - mDebug("mnode not start yet, update sync config later"); + mDebug("vgId:1, mnode not start yet, update sync config later"); return; } - mDebug("update sync config in sync module, mnodes:%p", pMnodes); + mDebug("vgId:1, update sync config in sync module, mnodes:%p", pMnodes); SSyncCfg syncCfg = {0}; int32_t index = 0; @@ -344,7 +341,7 @@ void sdbUpdateSync(void *pMnodes) { } sdbFreeIter(pIter); syncCfg.replica = index; - mDebug("mnodes info not input, use infos in sdb, numOfMnodes:%d", syncCfg.replica); + mDebug("vgId:1, mnodes info not input, use infos in sdb, numOfMnodes:%d", syncCfg.replica); } else { for (index = 0; index < mnodes->mnodeNum; ++index) { SMnodeInfo *node = &mnodes->mnodeInfos[index]; @@ -353,7 +350,7 @@ void sdbUpdateSync(void *pMnodes) { syncCfg.nodeInfo[index].nodePort += TSDB_PORT_SYNC; } syncCfg.replica = index; - mDebug("mnodes info input, numOfMnodes:%d", syncCfg.replica); + mDebug("vgId:1, mnodes info input, numOfMnodes:%d", syncCfg.replica); } syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2; @@ -367,18 +364,19 @@ void sdbUpdateSync(void *pMnodes) { } if (!hasThisDnode) { - sdbDebug("update sync config, this dnode not exist"); + sdbDebug("vgId:1, update sync config, this dnode not exist"); return; } if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) { - sdbDebug("update sync config, info not changed"); + sdbDebug("vgId:1, update sync config, info not changed"); return; } - sdbInfo("work as mnode, replica:%d", syncCfg.replica); + sdbInfo("vgId:1, work as mnode, replica:%d", syncCfg.replica); for (int32_t i = 0; i < syncCfg.replica; ++i) { - sdbInfo("mnode:%d, %s:%d", syncCfg.nodeInfo[i].nodeId, syncCfg.nodeInfo[i].nodeFqdn, syncCfg.nodeInfo[i].nodePort); + sdbInfo("vgId:1, mnode:%d, %s:%d", syncCfg.nodeInfo[i].nodeId, syncCfg.nodeInfo[i].nodeFqdn, + syncCfg.nodeInfo[i].nodePort); } SSyncInfo syncInfo = {0}; @@ -427,9 +425,9 @@ void sdbCleanUp() { if (tsSdbObj.status != SDB_STATUS_SERVING) return; tsSdbObj.status = SDB_STATUS_CLOSING; - + sdbCleanupWriteWorker(); - sdbDebug("sdb will be closed, ver:%" PRId64, tsSdbObj.version); + sdbDebug("vgId:1, sdb will be closed, mver:%" PRIu64, tsSdbObj.version); if (tsSdbObj.sync) { syncStop(tsSdbObj.sync); @@ -450,7 +448,7 @@ void sdbIncRef(void *handle, void *pObj) { SSdbTable *pTable = handle; int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos); int32_t refCount = atomic_add_fetch_32(pRefCount, 1); - sdbTrace("add ref to table:%s record:%p:%s:%d", pTable->tableName, pObj, sdbGetKeyStrFromObj(pTable, pObj), refCount); + sdbTrace("vgId:1, sdb:%s, inc ref to key:%p:%s:%d", pTable->tableName, pObj, sdbGetObjStr(pTable, pObj), refCount); } void sdbDecRef(void *handle, void *pObj) { @@ -459,11 +457,11 @@ void sdbDecRef(void *handle, void *pObj) { SSdbTable *pTable = handle; int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); - sdbTrace("def ref of table:%s record:%p:%s:%d", pTable->tableName, pObj, sdbGetKeyStrFromObj(pTable, pObj), refCount); + sdbTrace("vgId:1, sdb:%s, dec ref to key:%p:%s:%d", pTable->tableName, pObj, sdbGetObjStr(pTable, pObj), refCount); int32_t *updateEnd = pObj + pTable->refCountPos - 4; if (refCount <= 0 && *updateEnd) { - sdbTrace("table:%s, record:%p:%s:%d is destroyed", pTable->tableName, pObj, sdbGetKeyStrFromObj(pTable, pObj), refCount); + sdbTrace("vgId:1, sdb:%s, key:%p:%s:%d destroyed", pTable->tableName, pObj, sdbGetObjStr(pTable, pObj), refCount); SSdbOper oper = {.pObj = pObj}; (*pTable->destroyFp)(&oper); } @@ -523,13 +521,13 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { atomic_add_fetch_32(&pTable->autoIndex, 1); } - sdbDebug("table:%s, insert record:%s to hash, rowSize:%d numOfRows:%" PRId64 ", msg:%p", pTable->tableName, - sdbGetKeyStrFromObj(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, pOper->pMsg); + sdbDebug("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->tableName, + sdbGetObjStr(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, pOper->pMsg); int32_t code = (*pTable->insertFp)(pOper); if (code != TSDB_CODE_SUCCESS) { - sdbError("table:%s, failed to insert record:%s to hash, remove it", pTable->tableName, - sdbGetKeyStrFromObj(pTable, pOper->pObj)); + sdbError("vgId:1, sdb:%s, failed to insert key:%s to hash, remove it", pTable->tableName, + sdbGetObjStr(pTable, pOper->pObj)); sdbDeleteHash(pTable, pOper); } @@ -540,8 +538,8 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) { int32_t *updateEnd = pOper->pObj + pTable->refCountPos - 4; bool set = atomic_val_compare_exchange_32(updateEnd, 0, 1) == 0; if (!set) { - sdbError("table:%s, failed to delete record:%s from hash, for it already removed", pTable->tableName, - sdbGetKeyStrFromObj(pTable, pOper->pObj)); + sdbError("vgId:1, sdb:%s, failed to delete key:%s from hash, for it already removed", pTable->tableName, + sdbGetObjStr(pTable, pOper->pObj)); return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; } @@ -558,9 +556,9 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) { pthread_mutex_unlock(&pTable->mutex); atomic_sub_fetch_32(&pTable->numOfRows, 1); - - sdbDebug("table:%s, delete record:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName, - sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, pOper->pMsg); + + sdbDebug("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName, + sdbGetObjStr(pTable, pOper->pObj), pTable->numOfRows, pOper->pMsg); sdbDecRef(pTable, pOper->pObj); @@ -568,8 +566,8 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) { } static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) { - sdbDebug("table:%s, update record:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName, - sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, pOper->pMsg); + sdbDebug("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName, + sdbGetObjStr(pTable, pOper->pObj), pTable->numOfRows, pOper->pMsg); (*pTable->updateFp)(pOper); return TSDB_CODE_SUCCESS; @@ -594,12 +592,12 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) { // for data from WAL or forward, version may be smaller if (pHead->version <= tsSdbObj.version) { pthread_mutex_unlock(&tsSdbObj.mutex); - sdbDebug("table:%s, failed to restore %s record:%s from source(%d), ver:%" PRId64 " too large, sdb ver:%" PRId64, + sdbDebug("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64, pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); return TSDB_CODE_SUCCESS; } else if (pHead->version != tsSdbObj.version + 1) { pthread_mutex_unlock(&tsSdbObj.mutex); - sdbError("table:%s, failed to restore %s record:%s from source(%d), ver:%" PRId64 " too large, sdb ver:%" PRId64, + sdbError("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64, pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); return TSDB_CODE_SYN_INVALID_VERSION; } else { @@ -623,19 +621,19 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) { if (syncCode <= 0) pOper->processedCount = 1; if (syncCode < 0) { - sdbError("table:%s, failed to forward request, result:%s action:%s record:%s ver:%" PRId64 ", msg:%p", pTable->tableName, + sdbError("vgId:1, sdb:%s, failed to forward req since %s action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName, tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pOper->pMsg); } else if (syncCode > 0) { - sdbDebug("table:%s, forward request is sent, action:%s record:%s ver:%" PRId64 ", msg:%p", pTable->tableName, + sdbDebug("vgId:1, sdb:%s, forward req is sent, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pOper->pMsg); } else { - sdbTrace("table:%s, no need to send fwd request, action:%s record:%s ver:%" PRId64 ", msg:%p", pTable->tableName, + sdbTrace("vgId:1, sdb:%s, no need to send fwd req, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pOper->pMsg); } return syncCode; } - sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s ver:%" PRId64, pTable->tableName, + sdbDebug("vgId:1, sdb:%s, record from wal/fwd is disposed, action:%s key:%s hver:%" PRIu64, pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); // even it is WAL/FWD, it shall be called to update version in sync @@ -649,7 +647,7 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) { } else if (action == SDB_ACTION_DELETE) { void *pRow = sdbGetRowMeta(pTable, pHead->cont); if (pRow == NULL) { - sdbDebug("table:%s, object:%s not exist in hash, ignore delete action", pTable->tableName, + sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore delete action", pTable->tableName, sdbGetKeyStr(pTable, pHead->cont)); return TSDB_CODE_SUCCESS; } @@ -658,7 +656,7 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) { } else if (action == SDB_ACTION_UPDATE) { void *pRow = sdbGetRowMeta(pTable, pHead->cont); if (pRow == NULL) { - sdbDebug("table:%s, object:%s not exist in hash, ignore update action", pTable->tableName, + sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore update action", pTable->tableName, sdbGetKeyStr(pTable, pHead->cont)); return TSDB_CODE_SUCCESS; } @@ -675,8 +673,8 @@ int32_t sdbInsertRow(SSdbOper *pOper) { if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (sdbGetRowFromObj(pTable, pOper->pObj)) { - sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, - sdbGetKeyStrFromObj(pTable, pOper->pObj)); + sdbError("vgId:1, sdb:%s, failed to insert key:%s, already exist", pTable->tableName, + sdbGetObjStr(pTable, pOper->pObj)); sdbDecRef(pTable, pOper->pObj); return TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE; } @@ -692,7 +690,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) { int32_t code = sdbInsertHash(pTable, pOper); if (code != TSDB_CODE_SUCCESS) { - sdbError("table:%s, failed to insert into hash", pTable->tableName); + sdbError("vgId:1, sdb:%s, failed to insert into hash", pTable->tableName); return code; } @@ -727,8 +725,8 @@ int32_t sdbInsertRowImp(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, - pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); + sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, + pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetObjStr(pTable, pOper->pObj)); } sdbIncRef(pNewOper->table, pNewOper->pObj); @@ -751,7 +749,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { void *pRow = sdbGetRowMetaFromObj(pTable, pOper->pObj); if (pRow == NULL) { - sdbDebug("table:%s, record is not there, delete failed", pTable->tableName); + sdbDebug("vgId:1, sdb:%s, record is not there, delete failed", pTable->tableName); return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; } @@ -759,7 +757,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { int32_t code = sdbDeleteHash(pTable, pOper); if (code != TSDB_CODE_SUCCESS) { - sdbError("table:%s, failed to delete from hash", pTable->tableName); + sdbError("vgId:1, sdb:%s, failed to delete from hash", pTable->tableName); sdbDecRef(pTable, pOper->pObj); return code; } @@ -795,8 +793,8 @@ int32_t sdbDeleteRowImp(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, - pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); + sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, + pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetObjStr(pTable, pOper->pObj)); } taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); @@ -810,13 +808,13 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { void *pRow = sdbGetRowMetaFromObj(pTable, pOper->pObj); if (pRow == NULL) { - sdbDebug("table:%s, record is not there, update failed", pTable->tableName); + sdbDebug("vgId:1, sdb:%s, record is not there, update failed", pTable->tableName); return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; } int32_t code = sdbUpdateHash(pTable, pOper); if (code != TSDB_CODE_SUCCESS) { - sdbError("table:%s, failed to update hash", pTable->tableName); + sdbError("vgId:1, sdb:%s, failed to update hash", pTable->tableName); return code; } @@ -850,8 +848,8 @@ int32_t sdbUpdateRowImp(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, - pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); + sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, + pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetObjStr(pTable, pOper->pObj)); } sdbIncRef(pNewOper->table, pNewOper->pObj); @@ -948,7 +946,7 @@ void sdbCloseTable(void *handle) { taosHashCleanup(pTable->iHandle); pthread_mutex_destroy(&pTable->mutex); - sdbDebug("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbObj.numOfTables); + sdbDebug("vgId:1, sdb:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbObj.numOfTables); free(pTable); } @@ -964,7 +962,7 @@ int32_t sdbInitWriteWorker() { sdbAllocWriteQueue(); - mInfo("sdb write is opened"); + mInfo("vgId:1, sdb write is opened"); return 0; } @@ -986,7 +984,7 @@ void sdbCleanupWriteWorker() { sdbFreeWritequeue(); tfree(tsSdbPool.writeWorker); - mInfo("sdb write is closed"); + mInfo("vgId:1, sdb write is closed"); } int32_t sdbAllocWriteQueue() { @@ -1072,7 +1070,7 @@ static void *sdbWorkerFp(void *param) { pOper->processedCount = 1; pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK; if (pOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s ver:%" PRIu64 ", will be processed in sdb queue", + sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s hver:%" PRIu64 ", will be processed in sdb queue", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj, sdbGetKeyStr(pOper->table, pHead->cont), pHead->version); } diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 7156a2d08a..309d5b1a75 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -106,7 +106,7 @@ typedef struct { int8_t nacks; int8_t confirmed; int32_t code; - uint64_t time; + int64_t time; } SFwdInfo; typedef struct { diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 6ff04aad64..5c56e90af6 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -689,7 +689,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne if (pMaster) { // master is there pNode->pMaster = pMaster; - sDebug("%s, it is the master, ver:%" PRIu64, pMaster->id, pMaster->version); + sDebug("%s, it is the master, sver:%" PRIu64, pMaster->id, pMaster->version); if (syncValidateMaster(pPeer) < 0) return; @@ -697,7 +697,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne if (nodeVersion < pMaster->version) { syncRequired = 1; } else { - sInfo("%s is master, work as slave, ver:%" PRIu64, pMaster->id, pMaster->version); + sInfo("%s is master, work as slave, sver:%" PRIu64, pMaster->id, pMaster->version); nodeRole = TAOS_SYNC_ROLE_SLAVE; (*pNode->notifyRole)(pNode->ahandle, nodeRole); } @@ -854,7 +854,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; SFwdInfo * pFwdInfo; - sDebug("%s, forward-rsp is received, code:%x ver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version); + sDebug("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version); SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first; if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { @@ -891,7 +891,7 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { SSyncNode * pNode = pPeer->pSyncNode; SPeersStatus *pPeersStatus = (SPeersStatus *)cont; - sDebug("%s, status msg is received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id, + sDebug("%s, status msg is received, self:%s sver:%" PRIu64 " peer:%s sver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack); pPeer->version = pPeersStatus->version; @@ -979,7 +979,7 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { int32_t retLen = write(pPeer->peerFd, msg, statusMsgLen); if (retLen == statusMsgLen) { - sDebug("%s, status msg is sent, self:%s ver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[pPeersStatus->role], + sDebug("%s, status msg is sent, self:%s sver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack); } else { sDebug("%s, failed to send status msg, restart", pPeer->id); @@ -1154,7 +1154,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { pFwdInfo->time = time; pSyncFwds->fwds++; - sDebug("vgId:%d, fwd info is saved, ver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds); + sDebug("vgId:%d, fwd info is saved, hver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds); } static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { @@ -1168,7 +1168,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; pSyncFwds->fwds--; if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last; - // sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d", + // sDebug("vgId:%d, fwd info is removed, hver:%d, fwds:%d", // pNode->vgId, pFwdInfo->version, pSyncFwds->fwds); memset(pFwdInfo, 0, sizeof(SFwdInfo)); } @@ -1191,7 +1191,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code } if (confirm && pFwdInfo->confirmed == 0) { - sDebug("vgId:%d, forward is confirmed, ver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); + sDebug("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); (*pNode->confirmForward)(pNode->ahandle, pFwdInfo->mhandle, pFwdInfo->code); pFwdInfo->confirmed = 1; } @@ -1204,14 +1204,17 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; - if (pSyncFwds) {; - uint64_t time = taosGetTimestampMs(); + if (pSyncFwds) { + int64_t time = taosGetTimestampMs(); if (pSyncFwds->fwds > 0) { pthread_mutex_lock(&(pNode->mutex)); for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; - if (time - pFwdInfo->time < 2000) break; + if (ABS(time - pFwdInfo->time) < 2000) break; + + sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId, + pFwdInfo->version, time, pFwdInfo->time); syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); } @@ -1234,7 +1237,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle if (pWalHead->version > nodeVersion + 1) { - sError("vgId:%d, hver:%" PRIu64 ", inconsistent with ver:%" PRIu64, pNode->vgId, pWalHead->version, nodeVersion); + sError("vgId:%d, hver:%" PRIu64 ", inconsistent with sver:%" PRIu64, pNode->vgId, pWalHead->version, nodeVersion); if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { sInfo("vgId:%d, restart connection", pNode->vgId); for (int32_t i = 0; i < pNode->replica; ++i) { @@ -1277,9 +1280,9 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle int32_t retLen = write(pPeer->peerFd, pSyncHead, fwdLen); if (retLen == fwdLen) { - sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); + sDebug("%s, forward is sent, hver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); } else { - sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); + sError("%s, failed to forward, hver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); syncRestartConnection(pPeer); } } diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 5d7b9eac9b..33bd96ebb3 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -214,7 +214,7 @@ int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) { memcpy(pRecv->offset, pHead, len); pRecv->offset += len; pRecv->forwards++; - sDebug("%s, fwd is saved into queue, ver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards); + sDebug("%s, fwd is saved into queue, hver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards); } else { sError("%s, buffer size:%d is too small", pPeer->id, pRecv->bufferSize); pRecv->code = -1; // set error code diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 21151f1199..52d1bded31 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -268,7 +268,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi break; } - sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version); + sDebug("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version); int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); if (ret != wsize) break; pPeer->sversion = pHead->version; diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 72464d4309..36b3dba165 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -144,9 +144,9 @@ void walFsync(void *handle, bool forceFsync) { if (pWal == NULL || pWal->fd < 0) return; if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { - wTrace("vgId:%d, file:%s, do fsync", pWal->vgId, pWal->name); + wTrace("vgId:%d, fileId:%" PRId64 ", do fsync", pWal->vgId, pWal->fileId); if (fsync(pWal->fd) < 0) { - wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, fileId:%" PRId64 ", fsync failed since %s", pWal->vgId, pWal->fileId, strerror(errno)); } } } -- GitLab