未验证 提交 f904401d 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9574 from taosdata/feature/dnode3

mnode integration with wal module
......@@ -102,25 +102,24 @@ typedef enum {
} ESdbStatus;
typedef enum {
SDB_START = 0,
SDB_TRANS = 1,
SDB_CLUSTER = 2,
SDB_MNODE = 3,
SDB_QNODE = 4,
SDB_SNODE = 5,
SDB_BNODE = 6,
SDB_DNODE = 7,
SDB_USER = 8,
SDB_AUTH = 9,
SDB_ACCT = 10,
SDB_CONSUMER = 11,
SDB_CGROUP = 12,
SDB_TOPIC = 13,
SDB_VGROUP = 14,
SDB_STB = 15,
SDB_DB = 16,
SDB_FUNC = 17,
SDB_MAX = 18
SDB_TRANS = 0,
SDB_CLUSTER = 1,
SDB_MNODE = 2,
SDB_QNODE = 3,
SDB_SNODE = 4,
SDB_BNODE = 5,
SDB_DNODE = 6,
SDB_USER = 7,
SDB_AUTH = 8,
SDB_ACCT = 9,
SDB_CONSUMER = 10,
SDB_CGROUP = 11,
SDB_TOPIC = 12,
SDB_VGROUP = 13,
SDB_STB = 14,
SDB_DB = 15,
SDB_FUNC = 16,
SDB_MAX = 17
} ESdbType;
typedef struct SSdb SSdb;
......@@ -188,6 +187,14 @@ int32_t sdbDeploy(SSdb *pSdb);
*/
int32_t sdbReadFile(SSdb *pSdb);
/**
* @brief Write sdb file.
*
* @param pSdb The sdb object.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t sdbWriteFile(SSdb *pSdb);
/**
* @brief Parse and write raw data to sdb, then free the pRaw object
*
......@@ -260,7 +267,7 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2
*
* @param pSdb The sdb object.
* @param pIter The type of the table.
* @record int32_t The number of rows in the table
* @return int32_t The number of rows in the table
*/
int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
......@@ -269,10 +276,19 @@ int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
*
* @param pSdb The sdb object.
* @param pIter The type of the table.
* @record int32_t The max id of the table
* @return int32_t The max id of the table
*/
int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type);
/**
* @brief Update the version of sdb
*
* @param pSdb The sdb object.
* @param val The update value of the version.
* @return int32_t The current version of sdb
*/
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val);
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
void sdbFreeRaw(SSdbRaw *pRaw);
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val);
......
......@@ -160,6 +160,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0339)
#define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x033A)
#define TSDB_CODE_SDB_INVALID_DATA_CONTENT TAOS_DEF_ERROR_CODE(0, 0x033B)
#define TSDB_CODE_SDB_INVALID_WAl_VER TAOS_DEF_ERROR_CODE(0, 0x033C)
// mnode-dnode
#define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340)
......
enable_testing()
add_subdirectory(acct)
# add_subdirectory(auth)
# add_subdirectory(balance)
add_subdirectory(cluster)
......@@ -17,7 +16,6 @@ add_subdirectory(stb)
# add_subdirectory(sync)
# add_subdirectory(telem)
# add_subdirectory(trans)
add_subdirectory(user)
add_subdirectory(vgroup)
add_subdirectory(sut)
......@@ -8,7 +8,12 @@ target_include_directories(
target_link_libraries(
mnode
PRIVATE sdb
PRIVATE wal
PRIVATE transport
PRIVATE cjson
PRIVATE sync
)
\ No newline at end of file
)
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
\ No newline at end of file
......@@ -17,11 +17,13 @@
#define _TD_MND_INT_H_
#include "mndDef.h"
#include "sdb.h"
#include "tcache.h"
#include "tep.h"
#include "tqueue.h"
#include "ttime.h"
#include "wal.h"
#ifdef __cplusplus
extern "C" {
......@@ -65,6 +67,7 @@ typedef struct {
typedef struct {
int32_t errCode;
sem_t syncSem;
SWal *pWal;
SSyncNode *pSyncNode;
ESyncState state;
} SSyncMgmt;
......
......@@ -354,7 +354,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
}
if (pStatus->dnodeId == 0) {
mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
mDebug("dnode:%d, %s first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
} else {
if (pStatus->clusterId != pMnode->clusterId) {
if (pDnode != NULL) {
......
......@@ -152,7 +152,7 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
}
int32_t code = (*metaFp)(pMnodeMsg, pShow, &pRsp->tableMeta);
mDebug("show:0x%" PRIx64 ", get meta finished, numOfRows:%d cols:%d type:%s result:%s", pShow->id, pShow->numOfRows,
mDebug("show:0x%" PRIx64 ", get meta finished, numOfRows:%d cols:%d type:%s, result:%s", pShow->id, pShow->numOfRows,
pShow->numOfColumns, mndShowStr(type), tstrerror(code));
if (code == TSDB_CODE_SUCCESS) {
......
......@@ -15,11 +15,110 @@
#define _DEFAULT_SOURCE
#include "mndSync.h"
#include "mndTrans.h"
static int32_t mndInitWal(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
SWalCfg cfg = {.vgId = 1,
.fsyncPeriod = 0,
.rollPeriod = -1,
.segSize = -1,
.retentionPeriod = -1,
.retentionSize = -1,
.level = TAOS_WAL_FSYNC};
pMgmt->pWal = walOpen(path, &cfg);
if (pMgmt->pWal == NULL) return -1;
return 0;
}
static void mndCloseWal(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
if (pMgmt->pWal != NULL) {
walClose(pMgmt->pWal);
pMgmt->pWal = NULL;
}
}
static int32_t mndRestoreWal(SMnode *pMnode) {
SWal *pWal = pMnode->syncMgmt.pWal;
SSdb *pSdb = pMnode->pSdb;
int64_t lastSdbVer = sdbUpdateVer(pSdb, 0);
int32_t code = -1;
SWalReadHandle *pHandle = walOpenReadHandle(pWal);
if (pHandle == NULL) return -1;
int64_t first = walGetFirstVer(pWal);
int64_t last = walGetLastVer(pWal);
mDebug("restore sdb wal start, sdb ver:%" PRId64 ", wal first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last);
first = MAX(lastSdbVer + 1, first);
for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) {
if (walReadWithHandle(pHandle, ver) < 0) {
mError("failed to read by wal handle since %s, ver:%" PRId64, terrstr(), ver);
goto WAL_RESTORE_OVER;
}
SWalHead *pHead = pHandle->pHead;
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
if (sdbVer + 1 != ver) {
terrno = TSDB_CODE_SDB_INVALID_WAl_VER;
mError("failed to read wal from sdb, sdbVer:%" PRId64 " inconsistent with ver:%" PRId64, sdbVer, ver);
goto WAL_RESTORE_OVER;
}
if (sdbWriteNotFree(pSdb, (void *)pHead->head.body) < 0) {
mError("failed to read wal from sdb since %s, ver:%" PRId64, terrstr(), ver);
goto WAL_RESTORE_OVER;
}
sdbUpdateVer(pSdb, 1);
mDebug("wal:%" PRId64 ", is restored", ver);
}
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer);
if (walBeginSnapshot(pWal, sdbVer) < 0) {
goto WAL_RESTORE_OVER;
}
if (sdbVer != lastSdbVer) {
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
if (sdbWriteFile(pSdb) != 0) {
goto WAL_RESTORE_OVER;
}
}
if (walEndSnapshot(pWal) < 0) {
goto WAL_RESTORE_OVER;
}
code = 0;
WAL_RESTORE_OVER:
walCloseReadHandle(pHandle);
return code;
}
int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
tsem_init(&pMgmt->syncSem, 0, 0);
if (mndInitWal(pMnode) < 0) {
mError("failed to open wal since %s", terrstr());
return -1;
}
if (mndRestoreWal(pMnode) < 0) {
mError("failed to restore wal since %s", terrstr());
return -1;
}
pMgmt->state = TAOS_SYNC_STATE_LEADER;
pMgmt->pSyncNode = NULL;
return 0;
......@@ -28,6 +127,7 @@ int32_t mndInitSync(SMnode *pMnode) {
void mndCleanupSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
tsem_destroy(&pMgmt->syncSem);
mndCloseWal(pMnode);
}
static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
......@@ -41,6 +141,20 @@ static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSync
}
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
SWal *pWal = pMnode->syncMgmt.pWal;
SSdb *pSdb = pMnode->pSdb;
int64_t ver = sdbUpdateVer(pSdb, 1);
if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
sdbUpdateVer(pSdb, -1);
mError("failed to write raw:%p since %s, ver:%" PRId64, pRaw, terrstr(), ver);
return -1;
}
mTrace("raw:%p, write to wal, ver:%" PRId64, pRaw, ver);
walCommit(pWal, ver);
walFsync(pWal, true);
#if 1
return 0;
#else
......
......@@ -169,7 +169,7 @@ TRANS_ENCODE_OVER:
return NULL;
}
mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos);
mTrace("trans:%d, encode to raw:%p, row:%p len:%d", pTrans->id, pRaw, pTrans, dataPos);
return pRaw;
}
......@@ -226,6 +226,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
pData = malloc(dataLen);
if (pData == NULL) goto TRANS_DECODE_OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
pData = NULL;
......@@ -235,6 +236,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
pData = malloc(dataLen);
if (pData == NULL) goto TRANS_DECODE_OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
pData = NULL;
......@@ -243,6 +245,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < commitLogNum; ++i) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
pData = malloc(dataLen);
if (pData == NULL) goto TRANS_DECODE_OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
pData = NULL;
......@@ -284,13 +288,13 @@ TRANS_DECODE_OVER:
return NULL;
}
mTrace("trans:%d, decode from raw:%p, data:%p", pTrans->id, pRaw, pTrans);
mTrace("trans:%d, decode from raw:%p, row:%p", pTrans->id, pRaw, pTrans);
return pRow;
}
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
pTrans->stage = TRN_STAGE_PREPARE;
mTrace("trans:%d, perform insert action, data:%p", pTrans->id, pTrans);
mTrace("trans:%d, perform insert action, row:%p", pTrans->id, pTrans);
return 0;
}
......@@ -303,13 +307,13 @@ static void mndTransDropData(STrans *pTrans) {
}
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
mTrace("trans:%d, perform delete action, data:%p", pTrans->id, pTrans);
mTrace("trans:%d, perform delete action, row:%p", pTrans->id, pTrans);
mndTransDropData(pTrans);
return 0;
}
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) {
mTrace("trans:%d, perform update action, data:%p", pOldTrans->id, pOldTrans);
mTrace("trans:%d, perform update action, old_row:%p new_row:%p", pOldTrans->id, pOldTrans, pNewTrans);
pOldTrans->stage = pNewTrans->stage;
return 0;
}
......
......@@ -185,7 +185,7 @@ static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
for (int32_t s = pos; s >= 0; s--) {
SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
mDebug("step:%s will cleanup", pStep->name);
mDebug("%s will cleanup", pStep->name);
if (pStep->cleanupFp != NULL) {
(*pStep->cleanupFp)(pMnode);
}
......@@ -204,12 +204,12 @@ static int32_t mndExecSteps(SMnode *pMnode) {
if ((*pStep->initFp)(pMnode) != 0) {
int32_t code = terrno;
mError("step:%s exec failed since %s, start to cleanup", pStep->name, terrstr());
mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
mndCleanupSteps(pMnode, pos);
terrno = code;
return -1;
} else {
mDebug("step:%s is initialized", pStep->name);
mDebug("%s is initialized", pStep->name);
}
}
......@@ -357,7 +357,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr());
mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle);
return NULL;
}
......@@ -365,7 +365,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
taosFreeQitem(pMsg);
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr());
mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle);
return NULL;
}
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
......@@ -374,12 +374,12 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
pMsg->rpcMsg = *pRpcMsg;
pMsg->createdTime = taosGetTimestampSec();
mTrace("msg:%p, app:%p is created, RPC:%p", pMsg, pRpcMsg->ahandle, pRpcMsg->handle);
mTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpcMsg->ahandle, pRpcMsg->handle, pMsg->user);
return pMsg;
}
void mndCleanupMsg(SMnodeMsg *pMsg) {
mTrace("msg:%p, app:%p is destroyed, RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle);
mTrace("msg:%p, is destroyed, app:%p RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle);
rpcFreeCont(pMsg->rpcMsg.pCont);
pMsg->rpcMsg.pCont = NULL;
taosFreeQitem(pMsg);
......@@ -397,37 +397,37 @@ void mndProcessMsg(SMnodeMsg *pMsg) {
void *ahandle = pMsg->rpcMsg.ahandle;
bool isReq = (msgType & 1U);
mTrace("msg:%p, app:%p type:%s will be processed", pMsg, ahandle, TMSG_INFO(msgType));
mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle);
if (isReq && !mndIsMaster(pMnode)) {
code = TSDB_CODE_APP_NOT_READY;
mDebug("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
goto PROCESS_RPC_END;
}
if (isReq && pMsg->rpcMsg.pCont == NULL) {
code = TSDB_CODE_MND_INVALID_MSG_LEN;
mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
goto PROCESS_RPC_END;
}
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)];
if (fp == NULL) {
code = TSDB_CODE_MSG_NOT_PROCESSED;
mError("msg:%p, app:%p failed to process since no handle", pMsg, ahandle);
mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle);
goto PROCESS_RPC_END;
}
code = (*fp)(pMsg);
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mTrace("msg:%p, app:%p in progressing", pMsg, ahandle);
mTrace("msg:%p, in progress, app:%p", pMsg, ahandle);
return;
} else if (code != 0) {
code = terrno;
mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
goto PROCESS_RPC_END;
} else {
mTrace("msg:%p, app:%p is processed", pMsg, ahandle);
mTrace("msg:%p, is processed, app:%p", pMsg, ahandle);
}
PROCESS_RPC_END:
......
enable_testing()
add_subdirectory(acct)
add_subdirectory(user)
aux_source_directory(. ACCT_SRC)
add_executable(dnode_test_acct ${ACCT_SRC})
add_executable(mnode_test_acct ${ACCT_SRC})
target_link_libraries(
dnode_test_acct
mnode_test_acct
PUBLIC sut
)
add_test(
NAME dnode_test_acct
COMMAND dnode_test_acct
NAME mnode_test_acct
COMMAND mnode_test_acct
)
/**
* @file acct.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module acct-msg tests
* @brief MNODE module acct-msg tests
* @version 0.1
* @date 2021-12-15
*
......@@ -13,7 +13,7 @@
class DndTestAcct : public ::testing::Test {
protected:
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_acct", 9012); }
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_acct", 9012); }
static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test;
......
aux_source_directory(. USER_SRC)
add_executable(dnode_test_user ${USER_SRC})
add_executable(mnode_test_user ${USER_SRC})
target_link_libraries(
dnode_test_user
mnode_test_user
PUBLIC sut
)
add_test(
NAME dnode_test_user
COMMAND dnode_test_user
NAME mnode_test_user
COMMAND mnode_test_user
)
/**
* @file user.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module user-msg tests
* @brief MNODE module user-msg tests
* @version 0.1
* @date 2021-12-15
*
......@@ -13,7 +13,7 @@
class DndTestUser : public ::testing::Test {
protected:
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_user", 9140); }
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_user", 9140); }
static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test;
......
......@@ -17,11 +17,12 @@
#define _TD_SDB_INT_H_
#include "os.h"
#include "sdb.h"
#include "tmsg.h"
#include "thash.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tmsg.h"
#ifdef __cplusplus
extern "C" {
......@@ -59,7 +60,8 @@ typedef struct SSdb {
char *tmpDir;
int64_t lastCommitVer;
int64_t curVer;
int32_t maxId[SDB_MAX];
int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX];
SHashObj *hashObjs[SDB_MAX];
SRWLatch locks[SDB_MAX];
......@@ -71,8 +73,6 @@ typedef struct SSdb {
SdbDecodeFp decodeFps[SDB_MAX];
} SSdb;
int32_t sdbWriteFile(SSdb *pSdb);
const char *sdbTableName(ESdbType type);
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
......
......@@ -49,8 +49,13 @@ SSdb *sdbInit(SSdbOpt *pOption) {
for (ESdbType i = 0; i < SDB_MAX; ++i) {
taosInitRWLatch(&pSdb->locks[i]);
pSdb->maxId[i] = 0;
pSdb->tableVer[i] = -1;
pSdb->keyTypes[i] = SDB_KEY_INT32;
}
pSdb->curVer = -1;
pSdb->lastCommitVer = -1;
pSdb->pMnode = pOption->pMnode;
mDebug("sdb init successfully");
return pSdb;
......@@ -59,10 +64,10 @@ SSdb *sdbInit(SSdbOpt *pOption) {
void sdbCleanup(SSdb *pSdb) {
mDebug("start to cleanup sdb");
// if (pSdb->curVer != pSdb->lastCommitVer) {
mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer);
sdbWriteFile(pSdb);
// }
if (pSdb->curVer != pSdb->lastCommitVer) {
mDebug("write sdb file for current ver:%" PRId64 " != last commit ver:%" PRId64, pSdb->curVer, pSdb->lastCommitVer);
sdbWriteFile(pSdb);
}
if (pSdb->currDir != NULL) {
tfree(pSdb->currDir);
......@@ -133,7 +138,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
pSdb->maxId[sdbType] = 0;
pSdb->hashObjs[sdbType] = hash;
taosInitRWLatch(&pSdb->locks[sdbType]);
mDebug("sdb table:%d is initialized", sdbType);
mDebug("sdb table:%s is initialized", sdbTableName(sdbType));
return 0;
}
......@@ -159,3 +164,8 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
return 0;
}
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) {
pSdb->curVer += val;
return pSdb->curVer;
}
\ No newline at end of file
......@@ -17,10 +17,13 @@
#include "sdbInt.h"
#include "tchecksum.h"
#define SDB_TABLE_SIZE 24
#define SDB_RESERVE_SIZE 512
static int32_t sdbRunDeployFp(SSdb *pSdb) {
mDebug("start to deploy sdb");
for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) {
for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
SdbDeployFp fp = pSdb->deployFps[i];
if (fp == NULL) continue;
......@@ -34,6 +37,100 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) {
return 0;
}
static int32_t sdbReadFileHead(SSdb *pSdb, FileFd fd) {
int32_t ret = taosReadFile(fd, &pSdb->curVer, sizeof(int64_t));
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (ret != sizeof(int64_t)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t maxId = -1;
ret = taosReadFile(fd, &maxId, sizeof(int64_t));
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (ret != sizeof(int64_t)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
if (i < SDB_MAX) {
pSdb->maxId[i] = maxId;
}
}
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t ver = -1;
ret = taosReadFile(fd, &ver, sizeof(int64_t));
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (ret != sizeof(int64_t)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
if (i < SDB_MAX) {
pSdb->tableVer[i] = ver;
}
}
char reserve[SDB_RESERVE_SIZE] = {0};
ret = taosReadFile(fd, reserve, sizeof(reserve));
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (ret != sizeof(reserve)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
return 0;
}
static int32_t sdbWriteFileHead(SSdb *pSdb, FileFd fd) {
if (taosWriteFile(fd, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t maxId = -1;
if (i < SDB_MAX) {
maxId = pSdb->maxId[i];
}
if (taosWriteFile(fd, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t ver = -1;
if (i < SDB_MAX) {
ver = pSdb->tableVer[i];
}
if (taosWriteFile(fd, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
char reserve[SDB_RESERVE_SIZE] = {0};
if (taosWriteFile(fd, reserve, sizeof(reserve)) != sizeof(reserve)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
int32_t sdbReadFile(SSdb *pSdb) {
int64_t offset = 0;
int32_t code = 0;
......@@ -43,12 +140,13 @@ int32_t sdbReadFile(SSdb *pSdb) {
SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
if (pRaw == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed read file since %s", terrstr());
mError("failed read file since %s", terrstr());
return -1;
}
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to read file:%s", file);
FileFd fd = taosOpenFileRead(file);
if (fd <= 0) {
......@@ -58,6 +156,14 @@ int32_t sdbReadFile(SSdb *pSdb) {
return 0;
}
if (sdbReadFileHead(pSdb, fd) != 0) {
mError("failed to read file:%s head since %s", file, terrstr());
pSdb->curVer = -1;
free(pRaw);
taosCloseFile(fd);
return -1;
}
while (1) {
readLen = sizeof(SSdbRaw);
ret = taosReadFile(fd, pRaw, readLen);
......@@ -104,6 +210,8 @@ int32_t sdbReadFile(SSdb *pSdb) {
}
code = 0;
pSdb->lastCommitVer = pSdb->curVer;
mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
PARSE_SDB_DATA_ERROR:
taosCloseFile(fd);
......@@ -130,11 +238,17 @@ int32_t sdbWriteFile(SSdb *pSdb) {
return -1;
}
for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) {
if (sdbWriteFileHead(pSdb, fd) != 0) {
mError("failed to write file:%s head since %s", tmpfile, terrstr());
taosCloseFile(fd);
return -1;
}
for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
SdbEncodeFp encodeFp = pSdb->encodeFps[i];
if (encodeFp == NULL) continue;
mTrace("sdb write %s, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
SHashObj *hash = pSdb->hashObjs[i];
SRWLatch *pLock = &pSdb->locks[i];
......@@ -155,7 +269,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
pRaw->status = pRow->status;
int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
if (taosWriteFile(fd, pRaw, writeLen) != writeLen) {
code = TAOS_SYSTEM_ERROR(terrno);
code = TAOS_SYSTEM_ERROR(errno);
taosHashCancelIterate(hash, ppRow);
sdbFreeRaw(pRaw);
break;
......@@ -163,7 +277,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
code = TAOS_SYSTEM_ERROR(terrno);
code = TAOS_SYSTEM_ERROR(errno);
taosHashCancelIterate(hash, ppRow);
sdbFreeRaw(pRaw);
break;
......@@ -201,7 +315,8 @@ int32_t sdbWriteFile(SSdb *pSdb) {
if (code != 0) {
mError("failed to write file:%s since %s", curfile, tstrerror(code));
} else {
mDebug("write file:%s successfully", curfile);
pSdb->lastCommitVer = pSdb->curVer;
mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer);
}
terrno = code;
......
......@@ -38,6 +38,10 @@ const char *sdbTableName(ESdbType type) {
return "auth";
case SDB_ACCT:
return "acct";
case SDB_CONSUMER:
return "consumer";
case SDB_CGROUP:
return "cgroup";
case SDB_TOPIC:
return "topic";
case SDB_VGROUP:
......@@ -53,24 +57,41 @@ const char *sdbTableName(ESdbType type) {
}
}
static const char *sdbStatusStr(ESdbStatus status) {
switch (status) {
case SDB_STATUS_CREATING:
return "creating";
case SDB_STATUS_UPDATING:
return "updating";
case SDB_STATUS_DROPPING:
return "dropping";
case SDB_STATUS_READY:
return "ready";
case SDB_STATUS_DROPPED:
return "dropped";
default:
return "undefine";
}
}
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
EKeyType keyType = pSdb->keyTypes[pRow->type];
if (keyType == SDB_KEY_BINARY) {
mTrace("%s:%s, refCount:%d oper:%s row:%p", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount, oper,
pRow->pObj);
mTrace("%s:%s, refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount,
oper, pRow->pObj, sdbStatusStr(pRow->status));
} else if (keyType == SDB_KEY_INT32) {
mTrace("%s:%d, refCount:%d oper:%s row:%p", sdbTableName(pRow->type), *(int32_t *)pRow->pObj, pRow->refCount, oper,
pRow->pObj);
mTrace("%s:%d, refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj,
pRow->refCount, oper, pRow->pObj, sdbStatusStr(pRow->status));
} else if (keyType == SDB_KEY_INT64) {
mTrace("%s:%" PRId64 ", refCount:%d oper:%s row:%p", sdbTableName(pRow->type), *(int64_t *)pRow->pObj,
pRow->refCount, oper, pRow->pObj);
mTrace("%s:%" PRId64 ", refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int64_t *)pRow->pObj,
pRow->refCount, oper, pRow->pObj, sdbStatusStr(pRow->status));
} else {
}
}
static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
if (type >= SDB_MAX || type <= SDB_START) {
if (type >= SDB_MAX || type < 0) {
terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
return NULL;
}
......@@ -100,8 +121,6 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, void *pKey) {
}
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
int32_t code = 0;
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock);
......@@ -126,10 +145,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosWUnLockLatch(pLock);
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) {
pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
}
int32_t code = 0;
SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
if (insertFp != NULL) {
code = (*insertFp)(pSdb, pRow->pObj);
......@@ -143,12 +159,18 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
}
}
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) {
pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
}
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT64) {
pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
}
pSdb->tableVer[pRow->type]++;
return 0;
}
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
int32_t code = 0;
SRWLatch *pLock = &pSdb->locks[pNewRow->type];
taosRLockLatch(pLock);
......@@ -157,23 +179,25 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosRUnLockLatch(pLock);
return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
}
SSdbRow *pOldRow = *ppOldRow;
SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "updateRow");
taosRUnLockLatch(pLock);
int32_t code = 0;
SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type];
if (updateFp != NULL) {
code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
}
sdbFreeRow(pSdb, pNewRow);
pSdb->tableVer[pOldRow->type]++;
return code;
}
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
int32_t code = 0;
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock);
......@@ -187,12 +211,15 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "deleteRow");
taosHashRemove(hash, pOldRow->pObj, keySize);
taosWUnLockLatch(pLock);
// sdbRelease(pSdb, pOldRow->pObj);
pSdb->tableVer[pOldRow->type]++;
sdbFreeRow(pSdb, pRow);
return code;
// sdbRelease(pSdb, pOldRow->pObj);
return 0;
}
int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) {
......@@ -277,7 +304,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
if (pObj == NULL) return;
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
if (pRow->type >= SDB_MAX || pRow->type <= SDB_START) return;
if (pRow->type >= SDB_MAX ) return;
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock);
......
......@@ -20,6 +20,8 @@
#include "tchecksum.h"
#include "wal.h"
#include "taoserror.h"
#ifdef __cplusplus
extern "C" {
#endif
......
......@@ -19,8 +19,10 @@
SWalReadHandle *walOpenReadHandle(SWal *pWal) {
SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle));
if (pRead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pRead->pWal = pWal;
pRead->readIdxTfd = -1;
pRead->readLogTfd = -1;
......
......@@ -148,7 +148,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
int32_t walEndSnapshot(SWal *pWal) {
int64_t ver = pWal->vers.verInSnapshotting;
if (ver == -1) return -1;
if (ver == -1) return 0;
pWal->vers.snapshotVer = ver;
int ts = taosGetTimestampSec();
......
......@@ -170,6 +170,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_STATUS_TYPE, "Invalid status type")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_VER, "Invalid raw data version")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_LEN, "Invalid raw data len")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_CONTENT, "Invalid raw data content")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_WAl_VER, "Invalid wal version")
// mnode-dnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, "Dnode already exists")
......
......@@ -46,7 +46,6 @@ print =============== create child table
sql create table c1 using st tags(1)
sql create table c2 using st tags(2)
return
sql show tables
if $rows != 2 then
return -1
......@@ -56,6 +55,8 @@ print $data00 $data01 $data02
print $data10 $data11 $data22
print $data20 $data11 $data22
return
print =============== insert data
sql insert into c1 values(now+1s, 1)
sql insert into c1 values(now+2s, 2)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册