未验证 提交 3fbbc50c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #13112 from taosdata/fix/mnode

enh: sdb snapshot
......@@ -17,12 +17,12 @@
#include "mndSync.h"
#include "mndTrans.h"
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
}
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
......@@ -32,7 +32,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont;
int32_t transId = sdbGetIdFromRaw(pRaw);
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
pMgmt->errCode = cbMeta.code;
mTrace("trans:%d, is proposed, savedTransId:%d code:0x%x, ver:%" PRId64 " term:%" PRId64 " role:%s raw:%p", transId,
pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, syncStr(cbMeta.state), pRaw);
......@@ -67,24 +67,37 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
}
}
int32_t mndSnapshotRead(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, void** ppIter, char** ppBuf, int32_t* len) {
/*
int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void **ppIter, char **ppBuf, int32_t *len) {
SMnode *pMnode = pFsm->data;
SSdbIter *pIter;
if (iter == NULL) {
pIter = sdbIterInit(pMnode->sdb)
mInfo("start to read snapshot from sdb");
int32_t code = sdbReadSnapshot(pMnode->pSdb, (SSdbIter **)ppIter, ppBuf, len);
if (code != 0) {
mError("failed to read snapshot from sdb since %s", terrstr());
} else {
pIter = iter;
if (*ppIter == NULL) {
mInfo("successfully to read snapshot from sdb");
}
}
*/
return 0;
return code;
}
int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char* pBuf, int32_t len) {
int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) {
SMnode *pMnode = pFsm->data;
sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf);
return 0;
pMnode->syncMgmt.restored = false;
mInfo("start to apply snapshot to sdb, len:%d", len);
int32_t code = sdbApplySnapshot(pMnode->pSdb, pBuf, len);
if (code != 0) {
mError("failed to apply snapshot to sdb, len:%d", len);
} else {
mInfo("successfully to apply snapshot to sdb, len:%d", len);
pMnode->syncMgmt.restored = true;
}
// taosMemoryFree(pBuf);
return code;
}
void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
......@@ -116,7 +129,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
pFsm->FpSnapshotRead = mndSnapshotRead;
pFsm->FpSnapshotApply = mndSnapshotApply;
pFsm->FpReConfigCb = mndReConfig;
return pFsm;
}
......@@ -150,8 +163,7 @@ int32_t mndInitSync(SMnode *pMnode) {
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = pMnode->replica;
pCfg->myIndex = pMnode->selfIndex;
mInfo("start to open mnode sync, replica:%d myindex:%d standby:%d", pCfg->replicaNum, pCfg->myIndex,
pMgmt->standby);
mInfo("start to open mnode sync, replica:%d myindex:%d standby:%d", pCfg->replicaNum, pCfg->myIndex, pMgmt->standby);
for (int32_t i = 0; i < pMnode->replica; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
......
......@@ -186,7 +186,7 @@ typedef struct SSdb {
typedef struct SSdbIter {
TdFilePtr file;
int64_t readlen;
int64_t total;
} SSdbIter;
typedef struct {
......@@ -380,13 +380,12 @@ SSdbRow *sdbAllocRow(int32_t objSize);
void *sdbGetRowObj(SSdbRow *pRow);
void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc);
SSdbIter *sdbIterInit(SSdb *pSdb);
SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len);
int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len);
int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len);
const char *sdbTableName(ESdbType type);
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
int32_t sdbGetIdFromRaw(SSdbRaw *pRaw);
int32_t sdbGetIdFromRaw(SSdb *pSdb, SSdbRaw *pRaw);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_SDB_INT_H_
#define _TD_SDB_INT_H_
#include "os.h"
#include "sdb.h"
#ifdef __cplusplus
extern "C" {
#endif
// clang-format off
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
// clang-format on
typedef struct SSdbRaw {
int8_t type;
int8_t status;
int8_t sver;
int8_t reserved;
int32_t dataLen;
char pData[];
} SSdbRaw;
typedef struct SSdbRow {
ESdbType type;
ESdbStatus status;
int32_t refCount;
char pObj[];
} SSdbRow;
const char *sdbTableName(ESdbType type);
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc);
#ifdef __cplusplus
}
#endif
#endif /*_TD_SDB_INT_H_*/
......@@ -393,7 +393,7 @@ int32_t sdbDeploy(SSdb *pSdb) {
return 0;
}
SSdbIter *sdbIterInit(SSdb *pSdb) {
static SSdbIter *sdbOpenIter(SSdb *pSdb) {
char datafile[PATH_MAX] = {0};
char tmpfile[PATH_MAX] = {0};
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
......@@ -414,44 +414,140 @@ SSdbIter *sdbIterInit(SSdb *pSdb) {
pIter->file = taosOpenFile(tmpfile, TD_FILE_READ);
if (pIter->file == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to read snapshot file:%s since %s", tmpfile, terrstr());
mError("failed to read file:%s since %s", tmpfile, terrstr());
taosMemoryFree(pIter);
return NULL;
}
mDebug("start to read snapshot file:%s, iter:%p", tmpfile, pIter);
return pIter;
}
SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *pIter, char **ppBuf, int32_t *buflen) {
static void sdbCloseIter(SSdbIter *pIter) {
if (pIter == NULL) return;
if (pIter->file != NULL) {
taosCloseFile(&pIter->file);
}
taosMemoryFree(pIter);
mInfo("sdbiter:%p, is closed", pIter);
}
static SSdbIter *sdbGetIter(SSdb *pSdb, SSdbIter **ppIter) {
SSdbIter *pIter = NULL;
if (ppIter != NULL) pIter = *ppIter;
if (pIter == NULL) {
pIter = sdbOpenIter(pSdb);
if (pIter != NULL) {
mInfo("sdbiter:%p, is created to read snapshot", pIter);
*ppIter = pIter;
} else {
mError("failed to create sdbiter to read snapshot since %s", terrstr());
*ppIter = NULL;
return NULL;
}
} else {
mInfo("sdbiter:%p, continue to read snapshot, total:%" PRId64, pIter, pIter->total);
}
return pIter;
}
int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len) {
const int32_t maxlen = 100;
SSdbIter *pIter = sdbGetIter(pSdb, ppIter);
if (pIter == NULL) return -1;
char *pBuf = taosMemoryCalloc(1, maxlen);
if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
sdbCloseIter(pIter);
return -1;
}
int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
if (readlen == 0) {
mTrace("read snapshot to the end, readlen:%" PRId64, pIter->readlen);
taosMemoryFree(pBuf);
taosCloseFile(&pIter->file);
taosMemoryFree(pIter);
pIter = NULL;
} else if (readlen < 0) {
if (readlen < 0 || (readlen == 0 && errno != 0)) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to read snapshot since %s, readlen:%" PRId64, terrstr(), pIter->readlen);
mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total);
*ppBuf = NULL;
*len = 0;
*ppIter = NULL;
sdbCloseIter(pIter);
taosMemoryFree(pBuf);
taosCloseFile(&pIter->file);
taosMemoryFree(pIter);
pIter = NULL;
} else {
pIter->readlen += readlen;
mTrace("read snapshot, readlen:%" PRId64, pIter->readlen);
return -1;
} else if (readlen == 0) {
mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
*ppBuf = NULL;
*len = 0;
*ppIter = NULL;
sdbCloseIter(pIter);
taosMemoryFree(pBuf);
return 0;
} else if ((readlen < maxlen && errno != 0) || readlen == maxlen) {
pIter->total += readlen;
mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
*ppBuf = pBuf;
*len = readlen;
return 0;
} else if (readlen < maxlen && errno == 0) {
mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
*ppBuf = pBuf;
*buflen = readlen;
*len = readlen;
*ppIter = NULL;
sdbCloseIter(pIter);
return 0;
} else {
// impossible
mError("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
*ppBuf = NULL;
*len = 0;
*ppIter = NULL;
sdbCloseIter(pIter);
taosMemoryFree(pBuf);
return -1;
}
return pIter;
}
int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len) {
char datafile[PATH_MAX] = {0};
char tmpfile[PATH_MAX] = {0};
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to write %s since %s", tmpfile, terrstr());
return -1;
}
int32_t writelen = taosWriteFile(pFile, pBuf, len);
if (writelen != len) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to write %s since %s", tmpfile, terrstr());
taosCloseFile(&pFile);
return -1;
}
if (taosFsyncFile(pFile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to fsync %s since %s", tmpfile, terrstr());
taosCloseFile(&pFile);
return -1;
}
(void)taosCloseFile(&pFile);
if (taosRenameFile(tmpfile, datafile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to rename file %s to %s since %s", tmpfile, datafile, terrstr());
return -1;
}
if (sdbReadFile(pSdb) != 0) {
mError("failed to read from %s since %s", datafile, terrstr());
return -1;
}
return 0;
}
\ No newline at end of file
......@@ -16,9 +16,14 @@
#define _DEFAULT_SOURCE
#include "sdb.h"
int32_t sdbGetIdFromRaw(SSdbRaw *pRaw) {
int32_t id = *((int32_t *)(pRaw->pData));
return id;
int32_t sdbGetIdFromRaw(SSdb *pSdb, SSdbRaw *pRaw) {
EKeyType keytype = pSdb->keyTypes[pRaw->type];
if (keytype == SDB_KEY_INT32) {
int32_t id = *((int32_t *)(pRaw->pData));
return id;
} else {
return -2;
}
}
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册