diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 94d41a7416679f496e2324e033ef667e262f3b1c..a9e2f5a71ab84c3fae02daebc8ee4389d40b7a56 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -44,12 +44,9 @@ extern "C" { } #define SDB_GET_INT64(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt64, int64_t) - #define SDB_GET_INT32(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt32, int32_t) - #define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t) - -#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t) +#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t) #define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \ { \ @@ -66,11 +63,8 @@ extern "C" { } #define SDB_SET_INT64(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt64, int64_t) - #define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t) - #define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t) - #define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t) #define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \ @@ -356,6 +350,14 @@ typedef struct SSdb { SdbDecodeFp decodeFps[SDB_MAX]; } SSdb; +typedef struct SSdbIter { + TdFilePtr file; + int64_t readlen; +} SSdbIter; + +SSdbIter *sdbIterInit(SSdb *pSdb); +SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 1c400215f99d0ba67404f4d0906346f6f5720d40..fcbb26205d4903384090442db8b22bd34f599b89 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -76,12 +76,12 @@ typedef struct { typedef struct { SWal *pWal; - int32_t errCode; - bool restored; sem_t syncSem; int64_t sync; ESyncState state; bool isStandBy; + bool restored; + int32_t errCode; } SSyncMgmt; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 868558525addc532c531a3d8d38538936377ec35..b502865d36cefef045d0f4a50870def4e39aaa3c 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -22,17 +22,15 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SMnode *pMnode = pFsm->data; - SSdb *pSdb = pMnode->pSdb; - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - SSdbRaw *pRaw = pMsg->pCont; + SMnode *pMnode = pFsm->data; + SSdbRaw *pRaw = pMsg->pCont; mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state)); - sdbWriteWithoutFree(pSdb, pRaw); - sdbSetApplyIndex(pSdb, cbMeta.index); - sdbSetApplyTerm(pSdb, cbMeta.term); + sdbWriteWithoutFree(pMnode->pSdb, pRaw); + sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); + sdbSetApplyTerm(pMnode->pSdb, cbMeta.term); if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { - tsem_post(&pMgmt->syncSem); + tsem_post(&pMnode->syncMgmt.syncSem); } } @@ -49,20 +47,15 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { pMnode->syncMgmt.restored = true; } -void* mndSnapshotRead(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len) { - /* - SMnode *pMnode = pFsm->data; - SSdbIter *pIter; - if (iter == NULL) { - pIter = sdbIterInit(pMnode->sdb) - } else { - pIter = iter; +void *mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *snapshot, void *iter, char **ppBuf, int32_t *len) { + SMnode *pMnode = pFsm->data; + SSdbIter *pIter = iter; + + if (iter == NULL) { + pIter = sdbIterInit(pMnode->pSdb); } - pIter = sdbIterRead(pIter, ppBuf, len); - return pIter; - */ - return NULL; + return sdbIterRead(pMnode->pSdb, pIter, ppBuf, len); } int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len) { diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index b000c208c87b0393616cf0fb1d4a0cdbc08782b7..eac7f4af5d961eae7f061a9312cf105f3600b823 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -392,3 +392,66 @@ int32_t sdbDeploy(SSdb *pSdb) { return 0; } + +SSdbIter *sdbIterInit(SSdb *pSdb) { + char datafile[PATH_MAX] = {0}; + char tmpfile[PATH_MAX] = {0}; + snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); + snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); + + if (taosCopyFile(datafile, tmpfile) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr()); + return NULL; + } + + SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter)); + if (pIter == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pIter->file = taosOpenFile(tmpfile, TD_FILE_READ); + if (pIter->file == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to read snapshot file:%s since %s", tmpfile, terrstr()); + taosMemoryFree(pIter); + return NULL; + } + + mDebug("start to read snapshot file:%s, iter:%p", tmpfile, pIter); + return pIter; +} + +SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *pIter, char **ppBuf, int32_t *buflen) { + const int32_t maxlen = 100; + + char *pBuf = taosMemoryCalloc(1, maxlen); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen); + if (readlen == 0) { + mTrace("read snapshot to the end, readlen:%" PRId64, pIter->readlen); + taosMemoryFree(pBuf); + taosCloseFile(&pIter->file); + taosMemoryFree(pIter); + pIter = NULL; + } else if (readlen < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to read snapshot since %s, readlen:%" PRId64, terrstr(), pIter->readlen); + taosMemoryFree(pBuf); + taosCloseFile(&pIter->file); + taosMemoryFree(pIter); + pIter = NULL; + } else { + pIter->readlen += readlen; + mTrace("read snapshot, readlen:%" PRId64, pIter->readlen); + *ppBuf = pBuf; + *buflen = readlen; + } + + return pIter; +}