diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index b53b3003c18337b2abf892fdbb648366876f00fd..e25aa468bff66f1deb6cb089ed3887518483a805 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,12 +17,12 @@ #include "mndSync.h" #include "mndTrans.h" -int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { +int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { SMsgHead *pHead = pMsg->pCont; pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); - return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); + return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } @@ -32,7 +32,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSdbRaw *pRaw = pMsg->pCont; - int32_t transId = sdbGetIdFromRaw(pRaw); + int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); pMgmt->errCode = cbMeta.code; mTrace("trans:%d, is proposed, savedTransId:%d code:0x%x, ver:%" PRId64 " term:%" PRId64 " role:%s raw:%p", transId, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, syncStr(cbMeta.state), pRaw); @@ -67,24 +67,37 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { } } -int32_t mndSnapshotRead(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, void** ppIter, char** ppBuf, int32_t* len) { - /* +int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void **ppIter, char **ppBuf, int32_t *len) { SMnode *pMnode = pFsm->data; - SSdbIter *pIter; - if (iter == NULL) { - pIter = sdbIterInit(pMnode->sdb) + mInfo("start to read snapshot from sdb"); + + int32_t code = sdbReadSnapshot(pMnode->pSdb, (SSdbIter **)ppIter, ppBuf, len); + if (code != 0) { + mError("failed to read snapshot from sdb since %s", terrstr()); } else { - pIter = iter; + if (*ppIter == NULL) { + mInfo("successfully to read snapshot from sdb"); + } } - */ - return 0; + return code; } -int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char* pBuf, int32_t len) { +int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) { SMnode *pMnode = pFsm->data; - sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf); - return 0; + pMnode->syncMgmt.restored = false; + mInfo("start to apply snapshot to sdb, len:%d", len); + + int32_t code = sdbApplySnapshot(pMnode->pSdb, pBuf, len); + if (code != 0) { + mError("failed to apply snapshot to sdb, len:%d", len); + } else { + mInfo("successfully to apply snapshot to sdb, len:%d", len); + pMnode->syncMgmt.restored = true; + } + + // taosMemoryFree(pBuf); + return code; } void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { @@ -116,7 +129,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { pFsm->FpSnapshotRead = mndSnapshotRead; pFsm->FpSnapshotApply = mndSnapshotApply; pFsm->FpReConfigCb = mndReConfig; - + return pFsm; } @@ -150,8 +163,7 @@ int32_t mndInitSync(SMnode *pMnode) { SSyncCfg *pCfg = &syncInfo.syncCfg; pCfg->replicaNum = pMnode->replica; pCfg->myIndex = pMnode->selfIndex; - mInfo("start to open mnode sync, replica:%d myindex:%d standby:%d", pCfg->replicaNum, pCfg->myIndex, - pMgmt->standby); + mInfo("start to open mnode sync, replica:%d myindex:%d standby:%d", pCfg->replicaNum, pCfg->myIndex, pMgmt->standby); for (int32_t i = 0; i < pMnode->replica; ++i) { SNodeInfo *pNode = &pCfg->nodeInfo[i]; tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 3932defd8db79e0a83cfec078c01e8a97e0ec3d5..32e29f5e1c6082e8a3823b2f180d90f997fe73ac 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -186,7 +186,7 @@ typedef struct SSdb { typedef struct SSdbIter { TdFilePtr file; - int64_t readlen; + int64_t total; } SSdbIter; typedef struct { @@ -380,13 +380,12 @@ SSdbRow *sdbAllocRow(int32_t objSize); void *sdbGetRowObj(SSdbRow *pRow); void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc); -SSdbIter *sdbIterInit(SSdb *pSdb); -SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len); +int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len); +int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len); const char *sdbTableName(ESdbType type); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); - -int32_t sdbGetIdFromRaw(SSdbRaw *pRaw); +int32_t sdbGetIdFromRaw(SSdb *pSdb, SSdbRaw *pRaw); #ifdef __cplusplus } diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h deleted file mode 100644 index c49d6e8fb287619d9503282dd2e164ed432ce823..0000000000000000000000000000000000000000 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_SDB_INT_H_ -#define _TD_SDB_INT_H_ - -#include "os.h" - -#include "sdb.h" - -#ifdef __cplusplus -extern "C" { -#endif - -// clang-format off -#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} -#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} -#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} -#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }} -#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} -#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} -// clang-format on - -typedef struct SSdbRaw { - int8_t type; - int8_t status; - int8_t sver; - int8_t reserved; - int32_t dataLen; - char pData[]; -} SSdbRaw; - -typedef struct SSdbRow { - ESdbType type; - ESdbStatus status; - int32_t refCount; - char pObj[]; -} SSdbRow; - -const char *sdbTableName(ESdbType type); -void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); - -void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_SDB_INT_H_*/ diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 25cda199568592ba809e76c92e32107a30a163da..46abd5e8b2957f8813666a4553a7e3d37998cd8d 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -393,7 +393,7 @@ int32_t sdbDeploy(SSdb *pSdb) { return 0; } -SSdbIter *sdbIterInit(SSdb *pSdb) { +static SSdbIter *sdbOpenIter(SSdb *pSdb) { char datafile[PATH_MAX] = {0}; char tmpfile[PATH_MAX] = {0}; snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); @@ -414,44 +414,140 @@ SSdbIter *sdbIterInit(SSdb *pSdb) { pIter->file = taosOpenFile(tmpfile, TD_FILE_READ); if (pIter->file == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to read snapshot file:%s since %s", tmpfile, terrstr()); + mError("failed to read file:%s since %s", tmpfile, terrstr()); taosMemoryFree(pIter); return NULL; } - mDebug("start to read snapshot file:%s, iter:%p", tmpfile, pIter); return pIter; } -SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *pIter, char **ppBuf, int32_t *buflen) { +static void sdbCloseIter(SSdbIter *pIter) { + if (pIter == NULL) return; + if (pIter->file != NULL) { + taosCloseFile(&pIter->file); + } + taosMemoryFree(pIter); + mInfo("sdbiter:%p, is closed", pIter); +} + +static SSdbIter *sdbGetIter(SSdb *pSdb, SSdbIter **ppIter) { + SSdbIter *pIter = NULL; + if (ppIter != NULL) pIter = *ppIter; + + if (pIter == NULL) { + pIter = sdbOpenIter(pSdb); + if (pIter != NULL) { + mInfo("sdbiter:%p, is created to read snapshot", pIter); + *ppIter = pIter; + } else { + mError("failed to create sdbiter to read snapshot since %s", terrstr()); + *ppIter = NULL; + return NULL; + } + } else { + mInfo("sdbiter:%p, continue to read snapshot, total:%" PRId64, pIter, pIter->total); + } + + return pIter; +} + +int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len) { const int32_t maxlen = 100; + SSdbIter *pIter = sdbGetIter(pSdb, ppIter); + if (pIter == NULL) return -1; + char *pBuf = taosMemoryCalloc(1, maxlen); if (pBuf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + sdbCloseIter(pIter); + return -1; } int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen); - if (readlen == 0) { - mTrace("read snapshot to the end, readlen:%" PRId64, pIter->readlen); - taosMemoryFree(pBuf); - taosCloseFile(&pIter->file); - taosMemoryFree(pIter); - pIter = NULL; - } else if (readlen < 0) { + if (readlen < 0 || (readlen == 0 && errno != 0)) { terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to read snapshot since %s, readlen:%" PRId64, terrstr(), pIter->readlen); + mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total); + *ppBuf = NULL; + *len = 0; + *ppIter = NULL; + sdbCloseIter(pIter); taosMemoryFree(pBuf); - taosCloseFile(&pIter->file); - taosMemoryFree(pIter); - pIter = NULL; - } else { - pIter->readlen += readlen; - mTrace("read snapshot, readlen:%" PRId64, pIter->readlen); + return -1; + } else if (readlen == 0) { + mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total); + *ppBuf = NULL; + *len = 0; + *ppIter = NULL; + sdbCloseIter(pIter); + taosMemoryFree(pBuf); + return 0; + } else if ((readlen < maxlen && errno != 0) || readlen == maxlen) { + pIter->total += readlen; + mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total); + *ppBuf = pBuf; + *len = readlen; + return 0; + } else if (readlen < maxlen && errno == 0) { + mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total); *ppBuf = pBuf; - *buflen = readlen; + *len = readlen; + *ppIter = NULL; + sdbCloseIter(pIter); + return 0; + } else { + // impossible + mError("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total); + *ppBuf = NULL; + *len = 0; + *ppIter = NULL; + sdbCloseIter(pIter); + taosMemoryFree(pBuf); + return -1; } - - return pIter; } + +int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len) { + char datafile[PATH_MAX] = {0}; + char tmpfile[PATH_MAX] = {0}; + snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); + snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); + + TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to write %s since %s", tmpfile, terrstr()); + return -1; + } + + int32_t writelen = taosWriteFile(pFile, pBuf, len); + if (writelen != len) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to write %s since %s", tmpfile, terrstr()); + taosCloseFile(&pFile); + return -1; + } + + if (taosFsyncFile(pFile) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to fsync %s since %s", tmpfile, terrstr()); + taosCloseFile(&pFile); + return -1; + } + + (void)taosCloseFile(&pFile); + + if (taosRenameFile(tmpfile, datafile) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to rename file %s to %s since %s", tmpfile, datafile, terrstr()); + return -1; + } + + if (sdbReadFile(pSdb) != 0) { + mError("failed to read from %s since %s", datafile, terrstr()); + return -1; + } + + return 0; +} \ No newline at end of file diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index 4b61ebb627622bfdb1bb32a1df591d564a7c7b01..90643a54a9de42d4f505fdcb4f1d25ef95b80ac7 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -16,9 +16,14 @@ #define _DEFAULT_SOURCE #include "sdb.h" -int32_t sdbGetIdFromRaw(SSdbRaw *pRaw) { - int32_t id = *((int32_t *)(pRaw->pData)); - return id; +int32_t sdbGetIdFromRaw(SSdb *pSdb, SSdbRaw *pRaw) { + EKeyType keytype = pSdb->keyTypes[pRaw->type]; + if (keytype == SDB_KEY_INT32) { + int32_t id = *((int32_t *)(pRaw->pData)); + return id; + } else { + return -2; + } } SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {