提交 5252e177 编写于 作者: S Shengliang Guan

TD-10431 user manage

上级 09e20e3d
...@@ -125,8 +125,7 @@ typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyTy ...@@ -125,8 +125,7 @@ typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyTy
typedef enum { typedef enum {
SDB_STATUS_CREATING = 1, SDB_STATUS_CREATING = 1,
SDB_STATUS_READY = 2, SDB_STATUS_READY = 2,
SDB_STATUS_DROPPING = 3, SDB_STATUS_DROPPED = 3
SDB_STATUS_DROPPED = 4
} ESdbStatus; } ESdbStatus;
typedef enum { typedef enum {
...@@ -258,7 +257,7 @@ int32_t sdbDeploy(SSdb *pSdb); ...@@ -258,7 +257,7 @@ int32_t sdbDeploy(SSdb *pSdb);
int32_t sdbReadFile(SSdb *pSdb); int32_t sdbReadFile(SSdb *pSdb);
/** /**
* @brief Parse and write raw data to sdb. * @brief Parse and write raw data to sdb, then free the pRaw object
* *
* @param pSdb The sdb object. * @param pSdb The sdb object.
* @param pRaw The raw data. * @param pRaw The raw data.
...@@ -266,6 +265,15 @@ int32_t sdbReadFile(SSdb *pSdb); ...@@ -266,6 +265,15 @@ int32_t sdbReadFile(SSdb *pSdb);
*/ */
int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw); int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw);
/**
* @brief Parse and write raw data to sdb.
*
* @param pSdb The sdb object.
* @param pRaw The raw data.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw);
/** /**
* @brief Acquire a row from sdb * @brief Acquire a row from sdb
* *
......
...@@ -34,7 +34,6 @@ void initLog(const char* path) { ...@@ -34,7 +34,6 @@ void initLog(const char* path) {
sDebugFlag = 0; sDebugFlag = 0;
tsdbDebugFlag = 0; tsdbDebugFlag = 0;
cqDebugFlag = 0; cqDebugFlag = 0;
debugFlag = 0;
char temp[PATH_MAX]; char temp[PATH_MAX];
snprintf(temp, PATH_MAX, "%s/taosdlog", path); snprintf(temp, PATH_MAX, "%s/taosdlog", path);
......
add_executable(dndTestUser "")
target_sources(dndTestUser
PRIVATE
"user.cpp"
"../sut/deploy.cpp"
)
target_link_libraries(
dndTestUser
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dndTestUser
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
)
enable_testing()
add_test(
NAME dndTestUser
COMMAND dndTestUser
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "deploy.h"
class DndTestUser : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
static void SetUpTestSuite() {
const char* user = "root";
const char* pass = "taosdata";
const char* path = "/tmp/dndTestUser";
const char* fqdn = "localhost";
uint16_t port = 9524;
pServer = createServer(path, fqdn, port);
ASSERT(pServer);
pClient = createClient(user, pass, fqdn, port);
}
static void TearDownTestSuite() {
dropServer(pServer);
dropClient(pClient);
}
static SServer* pServer;
static SClient* pClient;
static int32_t connId;
};
SServer* DndTestUser::pServer;
SClient* DndTestUser::pClient;
int32_t DndTestUser::connId;
#if 0
TEST_F(DndTestUser, ShowUser) {
int32_t showId = 0;
//--- meta ---
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = TSDB_MGMT_TABLE_USER;
strcpy(pShow->db, "");
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
ASSERT_NE(pShowRsp, nullptr);
pShowRsp->showId = htonl(pShowRsp->showId);
STableMetaMsg* pMeta = &pShowRsp->tableMeta;
pMeta->contLen = htonl(pMeta->contLen);
pMeta->numOfColumns = htons(pMeta->numOfColumns);
pMeta->sversion = htons(pMeta->sversion);
pMeta->tversion = htons(pMeta->tversion);
pMeta->tid = htonl(pMeta->tid);
pMeta->uid = htobe64(pMeta->uid);
pMeta->suid = htobe64(pMeta->suid);
showId = pShowRsp->showId;
EXPECT_NE(pShowRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0);
EXPECT_STREQ(pMeta->tableFname, "show users");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->numOfColumns, 4);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tid, 0);
EXPECT_EQ(pMeta->uid, 0);
EXPECT_STREQ(pMeta->sTableName, "");
EXPECT_EQ(pMeta->suid, 0);
SSchema* pSchema = NULL;
pSchema = &pMeta->schema[0];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "name");
pSchema = &pMeta->schema[1];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, 10 + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "privilege");
pSchema = &pMeta->schema[2];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "create_time");
pSchema = &pMeta->schema[3];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "account");
//--- retrieve ---
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = htonl(showId);
pRetrieve->free = 0;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->offset = htobe64(pRetrieveRsp->offset);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, 2);
EXPECT_EQ(pRetrieveRsp->offset, 0);
EXPECT_EQ(pRetrieveRsp->useconds, 0);
EXPECT_EQ(pRetrieveRsp->completed, 1);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->reserved, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0);
char* pData = pRetrieveRsp->data;
int32_t pos = 0;
char* strVal = NULL;
int64_t int64Val = 0;
//--- name ---
{
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "root");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "_root");
}
//--- privilege ---
{
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += 10;
EXPECT_STREQ(strVal, "super");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += 10;
EXPECT_STREQ(strVal, "writable");
}
//--- create_time ---
{
int64Val = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_GT(int64Val, 0);
int64Val = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_GT(int64Val, 0);
}
//--- account ---
{
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "root");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "root");
}
}
#endif
TEST_F(DndTestUser, CreateUser_01) {
ASSERT_NE(pClient, nullptr);
//--- create user ---
SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(sizeof(SCreateUserMsg));
strcpy(pReq->user, "u1");
strcpy(pReq->pass, "p1");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateUserMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_USER;
sendMsg(pClient, &rpcMsg);
// taosMsleep(10000000);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
//--- meta ---
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = TSDB_MGMT_TABLE_USER;
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
STableMetaMsg* pMeta = &pShowRsp->tableMeta;
pMeta->numOfColumns = htons(pMeta->numOfColumns);
EXPECT_EQ(pMeta->numOfColumns, 4);
//--- retrieve ---
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = pShowRsp->showId;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
EXPECT_EQ(pRetrieveRsp->numOfRows, 3);
char* pData = pRetrieveRsp->data;
int32_t pos = 0;
char* strVal = NULL;
//--- name ---
{
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "u1");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "root");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "_root");
}
}
// TEST_F(DndTestUser, AlterUser) {
// ASSERT_NE(pClient, nullptr);
// SAlterUserMsg* pReq = (SAlterUserMsg*)rpcMallocCont(sizeof(SAlterUserMsg));
// SRpcMsg rpcMsg = {0};
// rpcMsg.pCont = pReq;
// rpcMsg.contLen = sizeof(SAlterUserMsg);
// rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_ACCT;
// sendMsg(pClient, &rpcMsg);
// SRpcMsg* pMsg = pClient->pRsp;
// ASSERT_NE(pMsg, nullptr);
// ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
// }
// TEST_F(DndTestUser, DropUser) {
// ASSERT_NE(pClient, nullptr);
// SDropUserMsg* pReq = (SDropUserMsg*)rpcMallocCont(sizeof(SDropUserMsg));
// SRpcMsg rpcMsg = {0};
// rpcMsg.pCont = pReq;
// rpcMsg.contLen = sizeof(SDropUserMsg);
// rpcMsg.msgType = TSDB_MSG_TYPE_DROP_ACCT;
// sendMsg(pClient, &rpcMsg);
// SRpcMsg* pMsg = pClient->pRsp;
// ASSERT_NE(pMsg, nullptr);
// ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
// }
...@@ -318,6 +318,11 @@ typedef struct SMnodeMsg { ...@@ -318,6 +318,11 @@ typedef struct SMnodeMsg {
void *pCont; void *pCont;
} SMnodeMsg; } SMnodeMsg;
typedef struct {
int32_t id;
void *rpcHandle;
} STransMsg;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -25,7 +25,7 @@ extern "C" { ...@@ -25,7 +25,7 @@ extern "C" {
int32_t mndInitSync(SMnode *pMnode); int32_t mndInitSync(SMnode *pMnode);
void mndCleanupSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode);
bool mndIsMaster(SMnode *pMnode); bool mndIsMaster(SMnode *pMnode);
int32_t mndSyncPropose(SSdbRaw *pRaw, void *pData); int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -33,13 +33,10 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); ...@@ -33,13 +33,10 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *, void *pMsg); int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *, void *pMsg);
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *, void *pMsg); int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *, void *pMsg);
int32_t mndTransPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)); int32_t mndTransPrepare(STrans *pTrans);
int32_t mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, void *pData, int32_t code); void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code);
int32_t mndTransExecute(SSdb *pSdb, int32_t tranId); int32_t mndTransExecute(SSdb *pSdb, int32_t tranId);
SSdbRaw *mndTransActionEncode(STrans *pTrans);
SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -65,7 +65,7 @@ static int32_t mnodeCreateDefaultAcct(SMnode *pMnode) { ...@@ -65,7 +65,7 @@ static int32_t mnodeCreateDefaultAcct(SMnode *pMnode) {
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("acct:%s, will be created while deploy sdb", acctObj.acct); mDebug("acct:%s, will be created while deploy sdb", acctObj.acct);
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
} }
...@@ -132,14 +132,14 @@ static int32_t mnodeAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) { ...@@ -132,14 +132,14 @@ static int32_t mnodeAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) {
} }
static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *pDstAcct) { static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *pDstAcct) {
mTrace("acct:%s, perform update action", pSrcAcct->acct); mTrace("acct:%s, perform update action", pDstAcct->acct);
memcpy(pSrcAcct->acct, pDstAcct->acct, TSDB_USER_LEN); memcpy(pDstAcct->acct, pSrcAcct->acct, TSDB_USER_LEN);
pSrcAcct->createdTime = pDstAcct->createdTime; pDstAcct->createdTime = pSrcAcct->createdTime;
pSrcAcct->updateTime = pDstAcct->updateTime; pDstAcct->updateTime = pSrcAcct->updateTime;
pSrcAcct->acctId = pDstAcct->acctId; pDstAcct->acctId = pSrcAcct->acctId;
pSrcAcct->status = pDstAcct->status; pDstAcct->status = pSrcAcct->status;
pSrcAcct->cfg = pDstAcct->cfg; pDstAcct->cfg = pSrcAcct->cfg;
return 0; return 0;
} }
......
...@@ -108,7 +108,7 @@ static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) { ...@@ -108,7 +108,7 @@ static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) {
} }
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pSrcCluster, SClusterObj *pDstCluster) { static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pSrcCluster, SClusterObj *pDstCluster) {
mTrace("cluster:%d, perform update action", pSrcCluster->id); mTrace("cluster:%d, perform update action", pDstCluster->id);
return 0; return 0;
} }
...@@ -132,7 +132,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { ...@@ -132,7 +132,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("cluster:%d, will be created while deploy sdb", clusterObj.id); mDebug("cluster:%d, will be created while deploy sdb", clusterObj.id);
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
} }
......
...@@ -102,12 +102,12 @@ static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) { ...@@ -102,12 +102,12 @@ static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
} }
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pSrcDnode, SDnodeObj *pDstDnode) { static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pSrcDnode, SDnodeObj *pDstDnode) {
mTrace("dnode:%d, perform update action", pSrcDnode->id); mTrace("dnode:%d, perform update action", pDstDnode->id);
pSrcDnode->id = pDstDnode->id; pDstDnode->id = pSrcDnode->id;
pSrcDnode->createdTime = pDstDnode->createdTime; pDstDnode->createdTime = pSrcDnode->createdTime;
pSrcDnode->updateTime = pDstDnode->updateTime; pDstDnode->updateTime = pSrcDnode->updateTime;
pSrcDnode->port = pDstDnode->port; pDstDnode->port = pSrcDnode->port;
memcpy(pSrcDnode->fqdn, pDstDnode->fqdn, TSDB_FQDN_LEN); memcpy(pDstDnode->fqdn, pSrcDnode->fqdn, TSDB_FQDN_LEN);
return 0; return 0;
} }
...@@ -123,7 +123,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { ...@@ -123,7 +123,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("dnode:%d, will be created while deploy sdb", dnodeObj.id); mDebug("dnode:%d, will be created while deploy sdb", dnodeObj.id);
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
} }
......
...@@ -82,10 +82,10 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj) { ...@@ -82,10 +82,10 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj) {
} }
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pSrcMnode, SMnodeObj *pDstMnode) { static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pSrcMnode, SMnodeObj *pDstMnode) {
mTrace("mnode:%d, perform update action", pSrcMnode->id); mTrace("mnode:%d, perform update action", pDstMnode->id);
pSrcMnode->id = pDstMnode->id; pDstMnode->id = pSrcMnode->id;
pSrcMnode->createdTime = pDstMnode->createdTime; pDstMnode->createdTime = pSrcMnode->createdTime;
pSrcMnode->updateTime = pDstMnode->updateTime; pDstMnode->updateTime = pSrcMnode->updateTime;
return 0; return 0;
} }
...@@ -99,7 +99,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) { ...@@ -99,7 +99,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("mnode:%d, will be created while deploy sdb", mnodeObj.id); mDebug("mnode:%d, will be created while deploy sdb", mnodeObj.id);
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
} }
......
...@@ -21,9 +21,15 @@ ...@@ -21,9 +21,15 @@
int32_t mndInitSync(SMnode *pMnode) { return 0; } int32_t mndInitSync(SMnode *pMnode) { return 0; }
void mndCleanupSync(SMnode *pMnode) {} void mndCleanupSync(SMnode *pMnode) {}
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, void *pData) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg) {
mndTransApply(pMnode, pData, pData, 0); int32_t code = 0;
free(pData);
int32_t len = sdbGetRawTotalSize(pRaw);
SSdbRaw *pReceived = calloc(1, len);
memcpy(pReceived, pRaw, len);
mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg);
mndTransApply(pMnode, pReceived, pMsg, code);
return 0; return 0;
} }
......
...@@ -189,12 +189,12 @@ static void mndSendTelemetryReport(SMnode* pMnode) { ...@@ -189,12 +189,12 @@ static void mndSendTelemetryReport(SMnode* pMnode) {
char buf[128] = {0}; char buf[128] = {0};
uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER); uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER);
if (ip == 0xffffffff) { if (ip == 0xffffffff) {
mTrace("failed to get IP address of " TELEMETRY_SERVER " since :%s", strerror(errno)); mDebug("failed to get IP address of " TELEMETRY_SERVER " since :%s", strerror(errno));
return; return;
} }
SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0); SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0);
if (fd < 0) { if (fd < 0) {
mTrace("failed to create socket for telemetry, reason:%s", strerror(errno)); mDebug("failed to create socket for telemetry, reason:%s", strerror(errno));
return; return;
} }
...@@ -228,7 +228,7 @@ static void mndSendTelemetryReport(SMnode* pMnode) { ...@@ -228,7 +228,7 @@ static void mndSendTelemetryReport(SMnode* pMnode) {
// read something to avoid nginx error 499 // read something to avoid nginx error 499
if (taosReadSocket(fd, buf, 10) < 0) { if (taosReadSocket(fd, buf, 10) < 0) {
mTrace("failed to receive response since %s", strerror(errno)); mDebug("failed to receive response since %s", strerror(errno));
} }
taosCloseSocket(fd); taosCloseSocket(fd);
...@@ -297,7 +297,7 @@ int32_t mndInitTelem(SMnode* pMnode) { ...@@ -297,7 +297,7 @@ int32_t mndInitTelem(SMnode* pMnode) {
int32_t code = pthread_create(&pMgmt->thread, &attr, mndTelemThreadFp, pMnode); int32_t code = pthread_create(&pMgmt->thread, &attr, mndTelemThreadFp, pMnode);
pthread_attr_destroy(&attr); pthread_attr_destroy(&attr);
if (code != 0) { if (code != 0) {
mTrace("failed to create telemetry thread since :%s", strerror(code)); mDebug("failed to create telemetry thread since :%s", strerror(code));
} }
mInfo("mnd telemetry is initialized"); mInfo("mnd telemetry is initialized");
......
...@@ -15,13 +15,33 @@ ...@@ -15,13 +15,33 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndTrans.h" #include "mndTrans.h"
#include "trpc.h" #include "mndSync.h"
#define SDB_TRANS_VER 1 #define SDB_TRANS_VER 1
#define TRN_DEFAULT_ARRAY_SIZE 8 #define TRN_DEFAULT_ARRAY_SIZE 8
SSdbRaw *mndTransActionEncode(STrans *pTrans) { static SSdbRaw *mndTransActionEncode(STrans *pTrans);
int32_t rawDataLen = 10 * sizeof(int32_t); static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pTrans, STrans *pDstTrans);
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans);
int32_t mndInitTrans(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_TRANS,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndTransActionEncode,
.decodeFp = (SdbDecodeFp)mndTransActionDecode,
.insertFp = (SdbInsertFp)mndTransActionInsert,
.updateFp = (SdbUpdateFp)mndTransActionUpdate,
.deleteFp = (SdbDeleteFp)mndTransActionDelete};
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupTrans(SMnode *pMnode) {}
static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
int32_t rawDataLen = 16 * sizeof(int32_t);
int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
...@@ -29,23 +49,23 @@ SSdbRaw *mndTransActionEncode(STrans *pTrans) { ...@@ -29,23 +49,23 @@ SSdbRaw *mndTransActionEncode(STrans *pTrans) {
int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
for (int32_t i = 0; i < redoLogNum; ++i) { for (int32_t i = 0; i < redoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGet(pTrans->redoLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
rawDataLen += sdbGetRawTotalSize(pTmp); rawDataLen += sdbGetRawTotalSize(pTmp);
} }
for (int32_t i = 0; i < undoLogNum; ++i) { for (int32_t i = 0; i < undoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGet(pTrans->undoLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
rawDataLen += sdbGetRawTotalSize(pTmp); rawDataLen += sdbGetRawTotalSize(pTmp);
} }
for (int32_t i = 0; i < commitLogNum; ++i) { for (int32_t i = 0; i < commitLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGet(pTrans->commitLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
rawDataLen += sdbGetRawTotalSize(pTmp); rawDataLen += sdbGetRawTotalSize(pTmp);
} }
SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, SDB_TRANS_VER, rawDataLen); SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, SDB_TRANS_VER, rawDataLen);
if (pRaw == NULL) { if (pRaw == NULL) {
mError("trn:%d, failed to alloc raw since %s", pTrans->id, terrstr()); mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr());
return NULL; return NULL;
} }
...@@ -60,31 +80,33 @@ SSdbRaw *mndTransActionEncode(STrans *pTrans) { ...@@ -60,31 +80,33 @@ SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT32(pRaw, dataPos, undoActionNum) SDB_SET_INT32(pRaw, dataPos, undoActionNum)
for (int32_t i = 0; i < redoLogNum; ++i) { for (int32_t i = 0; i < redoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGet(pTrans->redoLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp); int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len) SDB_SET_INT32(pRaw, dataPos, len)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len) SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
} }
for (int32_t i = 0; i < undoLogNum; ++i) { for (int32_t i = 0; i < undoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGet(pTrans->undoLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp); int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len) SDB_SET_INT32(pRaw, dataPos, len)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len) SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
} }
for (int32_t i = 0; i < commitLogNum; ++i) { for (int32_t i = 0; i < commitLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGet(pTrans->commitLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp); int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len) SDB_SET_INT32(pRaw, dataPos, len)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len) SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
} }
mDebug("trn:%d, is encoded as raw:%p, len:%d", pTrans->id, pRaw, dataPos); mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos);
return pRaw; return pRaw;
} }
SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
int32_t code = 0;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) { if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
mError("failed to get soft ver from raw:%p since %s", pRaw, terrstr()); mError("failed to get soft ver from raw:%p since %s", pRaw, terrstr());
...@@ -97,8 +119,8 @@ SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { ...@@ -97,8 +119,8 @@ SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
return NULL; return NULL;
} }
SSdbRow *pRow = sdbAllocRow(sizeof(STrans)); SSdbRow *pRow = sdbAllocRow(sizeof(STrans));
STrans *pTrans = sdbGetRowObj(pRow); STrans *pTrans = sdbGetRowObj(pRow);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("failed to alloc trans from raw:%p since %s", pRaw, terrstr()); mError("failed to alloc trans from raw:%p since %s", pRaw, terrstr());
return NULL; return NULL;
...@@ -112,9 +134,9 @@ SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { ...@@ -112,9 +134,9 @@ SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
pTrans->redoActions == NULL || pTrans->undoActions == NULL) { pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; mDebug("trans:%d, failed to create array while parsed from raw:%p", pTrans->id, pRaw);
mDebug("trn:%d, failed to create array while parsed from raw:%p", pTrans->id, pRaw); code = TSDB_CODE_OUT_OF_MEMORY;
return NULL; goto TRANS_DECODE_OVER;
} }
int32_t redoLogNum = 0; int32_t redoLogNum = 0;
...@@ -133,81 +155,111 @@ SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { ...@@ -133,81 +155,111 @@ SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum) SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum)
SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum) SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum)
int32_t code = 0;
for (int32_t i = 0; i < redoLogNum; ++i) { for (int32_t i = 0; i < redoLogNum; ++i) {
int32_t dataLen = 0; int32_t dataLen = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
char *pData = malloc(dataLen); char *pData = malloc(dataLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
void *ret = taosArrayPush(pTrans->redoLogs, pData); void *ret = taosArrayPush(pTrans->redoLogs, &pData);
if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
break;
}
}
for (int32_t i = 0; i < undoLogNum; ++i) {
int32_t dataLen = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
char *pData = malloc(dataLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
void *ret = taosArrayPush(pTrans->undoLogs, &pData);
if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
break;
}
}
for (int32_t i = 0; i < commitLogNum; ++i) {
int32_t dataLen = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
char *pData = malloc(dataLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
void *ret = taosArrayPush(pTrans->commitLogs, &pData);
if (ret == NULL) { if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
break; break;
} }
} }
TRANS_DECODE_OVER:
if (code != 0) { if (code != 0) {
terrno = code; mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, tstrerror(errno));
mError("trn:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
terrno = code;
return NULL; return NULL;
} }
mDebug("trn:%d, is parsed from raw:%p", pTrans->id, pRaw); mTrace("trans:%d, decode from raw:%p", pTrans->id, pRaw);
return pRow; return pRow;
} }
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
mTrace("trans:%d, perform insert action, stage:%d", pTrans->id, pTrans->stage);
SArray *pArray = pTrans->redoLogs; SArray *pArray = pTrans->redoLogs;
int32_t arraySize = taosArrayGetSize(pArray); int32_t arraySize = taosArrayGetSize(pArray);
for (int32_t i = 0; i < arraySize; ++i) { for (int32_t i = 0; i < arraySize; ++i) {
SSdbRaw *pRaw = taosArrayGet(pArray, i); SSdbRaw *pRaw = taosArrayGetP(pArray, i);
int32_t code = sdbWrite(pSdb, pRaw); int32_t code = sdbWrite(pSdb, pRaw);
if (code != 0) { if (code != 0) {
mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr()); mError("trans:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
return code; return code;
} }
} }
mDebug("trn:%d, write to sdb", pTrans->id);
return 0; return 0;
} }
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
SArray *pArray = pTrans->redoLogs; mTrace("trans:%d, perform delete action, stage:%d", pTrans->id, pTrans->stage);
SArray *pArray = pTrans->undoLogs;
int32_t arraySize = taosArrayGetSize(pArray); int32_t arraySize = taosArrayGetSize(pArray);
for (int32_t i = 0; i < arraySize; ++i) { for (int32_t i = 0; i < arraySize; ++i) {
SSdbRaw *pRaw = taosArrayGet(pArray, i); SSdbRaw *pRaw = taosArrayGetP(pArray, i);
int32_t code = sdbWrite(pSdb, pRaw); int32_t code = sdbWrite(pSdb, pRaw);
if (code != 0) { if (code != 0) {
mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr()); mError("trans:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
return code; return code;
} }
} }
mDebug("trn:%d, delete from sdb", pTrans->id);
return 0; return 0;
} }
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pTrans, STrans *pDstTrans) { static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pTrans, STrans *pDstTrans) {
assert(true); mTrace("trans:%d, perform update action, stage:%d", pTrans->id, pTrans->stage);
SArray *pArray = pTrans->redoLogs;
SArray *pArray = pDstTrans->commitLogs;
int32_t arraySize = taosArrayGetSize(pArray); int32_t arraySize = taosArrayGetSize(pArray);
for (int32_t i = 0; i < arraySize; ++i) { for (int32_t i = 0; i < arraySize; ++i) {
SSdbRaw *pRaw = taosArrayGet(pArray, i); SSdbRaw *pRaw = taosArrayGetP(pArray, i);
int32_t code = sdbWrite(pSdb, pRaw); int32_t code = sdbWrite(pSdb, pRaw);
if (code != 0) { if (code != 0) {
mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr()); mError("trans:%d, failed to write raw:%p to sdb since %s", pDstTrans->id, pRaw, terrstr());
return code; return code;
} }
} }
pTrans->stage = pDstTrans->stage; pDstTrans->stage = pTrans->stage;
mDebug("trn:%d, update in sdb", pTrans->id);
return 0; return 0;
} }
...@@ -224,6 +276,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { ...@@ -224,6 +276,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
pTrans->id = trnGenerateTransId(); pTrans->id = trnGenerateTransId();
pTrans->stage = TRN_STAGE_PREPARE; pTrans->stage = TRN_STAGE_PREPARE;
pTrans->policy = policy; pTrans->policy = policy;
pTrans->pMnode = pMnode;
pTrans->rpcHandle = rpcHandle; pTrans->rpcHandle = rpcHandle;
pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
...@@ -238,13 +291,13 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { ...@@ -238,13 +291,13 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
return NULL; return NULL;
} }
mDebug("trn:%d, is created, %p", pTrans->id, pTrans); mDebug("trans:%d, data:%p is created", pTrans->id, pTrans);
return pTrans; return pTrans;
} }
static void trnDropArray(SArray *pArray) { static void trnDropArray(SArray *pArray) {
for (int32_t i = 0; i < pArray->size; ++i) { for (int32_t i = 0; i < pArray->size; ++i) {
SSdbRaw *pRaw = taosArrayGet(pArray, i); SSdbRaw *pRaw = taosArrayGetP(pArray, i);
tfree(pRaw); tfree(pRaw);
} }
...@@ -258,13 +311,13 @@ void mndTransDrop(STrans *pTrans) { ...@@ -258,13 +311,13 @@ void mndTransDrop(STrans *pTrans) {
trnDropArray(pTrans->redoActions); trnDropArray(pTrans->redoActions);
trnDropArray(pTrans->undoActions); trnDropArray(pTrans->undoActions);
mDebug("trn:%d, is dropped, %p", pTrans->id, pTrans); mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans);
tfree(pTrans); tfree(pTrans);
} }
void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) { void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) {
pTrans->rpcHandle = rpcHandle; pTrans->rpcHandle = rpcHandle;
mTrace("trn:%d, set rpc handle:%p", pTrans->id, rpcHandle); mTrace("trans:%d, set rpc handle:%p", pTrans->id, rpcHandle);
} }
static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) { static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
...@@ -273,7 +326,7 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) { ...@@ -273,7 +326,7 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
return -1; return -1;
} }
void *ptr = taosArrayPush(pArray, pRaw); void *ptr = taosArrayPush(pArray, &pRaw);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -284,92 +337,92 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) { ...@@ -284,92 +337,92 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = mndTransAppendArray(pTrans->redoLogs, pRaw); int32_t code = mndTransAppendArray(pTrans->redoLogs, pRaw);
mTrace("trn:%d, raw:%p append to redo logs, code:%d", pTrans->id, pRaw, code); mTrace("trans:%d, raw:%p append to redo logs, code:%d", pTrans->id, pRaw, code);
return code; return code;
} }
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = mndTransAppendArray(pTrans->undoLogs, pRaw); int32_t code = mndTransAppendArray(pTrans->undoLogs, pRaw);
mTrace("trn:%d, raw:%p append to undo logs, code:%d", pTrans->id, pRaw, code); mTrace("trans:%d, raw:%p append to undo logs, code:%d", pTrans->id, pRaw, code);
return code; return code;
} }
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = mndTransAppendArray(pTrans->commitLogs, pRaw); int32_t code = mndTransAppendArray(pTrans->commitLogs, pRaw);
mTrace("trn:%d, raw:%p append to commit logs, code:%d", pTrans->id, pRaw, code); mTrace("trans:%d, raw:%p append to commit logs, code:%d", pTrans->id, pRaw, code);
return code; return code;
} }
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
int32_t code = mndTransAppendArray(pTrans->redoActions, pMsg); int32_t code = mndTransAppendArray(pTrans->redoActions, pMsg);
mTrace("trn:%d, msg:%p append to redo actions", pTrans->id, pMsg); mTrace("trans:%d, msg:%p append to redo actions", pTrans->id, pMsg);
return code; return code;
} }
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
int32_t code = mndTransAppendArray(pTrans->undoActions, pMsg); int32_t code = mndTransAppendArray(pTrans->undoActions, pMsg);
mTrace("trn:%d, msg:%p append to undo actions", pTrans->id, pMsg); mTrace("trans:%d, msg:%p append to undo actions", pTrans->id, pMsg);
return code; return code;
} }
int32_t mndInitTrans(SMnode *pMnode) { int32_t mndTransPrepare(STrans *pTrans) {
SSdbTable table = {.sdbType = SDB_TRANS, mDebug("trans:%d, prepare transaction", pTrans->id);
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndTransActionEncode,
.decodeFp = (SdbDecodeFp)mndTransActionDecode,
.insertFp = (SdbInsertFp)mndTransActionInsert,
.updateFp = (SdbUpdateFp)mndTransActionUpdate,
.deleteFp = (SdbDeleteFp)mndTransActionDelete};
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupTrans(SMnode *pMnode) {}
int32_t mndTransPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)) {
if (syncfp == NULL) return -1;
SSdbRaw *pRaw = mndTransActionEncode(pTrans); SSdbRaw *pRaw = mndTransActionEncode(pTrans);
if (pRaw == NULL) { if (pRaw == NULL) {
mError("trn:%d, failed to decode trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
return -1; return -1;
} }
sdbSetRawStatus(pRaw, SDB_STATUS_CREATING); sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
if (sdbWrite(pTrans->pMnode->pSdb, pRaw) != 0) { if (sdbWriteNotFree(pTrans->pMnode->pSdb, pRaw) != 0) {
mError("trn:%d, failed to write trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to write trans since %s", pTrans->id, terrstr());
return -1; return -1;
} }
if ((*syncfp)(pRaw, pTrans->rpcHandle) != 0) { STransMsg *pMsg = calloc(1, sizeof(STransMsg));
mError("trn:%d, failed to sync trans since %s", pTrans->id, terrstr()); pMsg->id = pTrans->id;
pMsg->rpcHandle = pTrans->rpcHandle;
mDebug("trans:%d, start sync, RPC:%p pMsg:%p", pTrans->id, pTrans->rpcHandle, pMsg);
if (mndSyncPropose(pTrans->pMnode, pRaw, pMsg) != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
free(pMsg);
sdbFreeRaw(pRaw);
return -1; return -1;
} }
sdbFreeRaw(pRaw);
return 0; return 0;
} }
static void trnSendRpcRsp(void *rpcHandle, int32_t code) { static void trnSendRpcRsp(STransMsg *pMsg, int32_t code) {
if (rpcHandle != NULL) { mDebug("trans:%d, send rpc rsp, RPC:%p code:0x%x pMsg:%p", pMsg->id, pMsg->rpcHandle, code & 0xFFFF, pMsg);
SRpcMsg rspMsg = {.handle = rpcHandle, .code = terrno}; if (pMsg->rpcHandle != NULL) {
SRpcMsg rspMsg = {.handle = pMsg->rpcHandle, .code = code};
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
} }
}
int32_t mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, void *pData, int32_t code) { free(pMsg);
if (code != 0) { }
trnSendRpcRsp(pData, terrno);
return 0;
}
if (sdbWrite(pMnode->pSdb, pData) != 0) { void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) {
code = terrno; if (code == 0) {
trnSendRpcRsp(pData, code); mDebug("trans:%d, commit transaction", pMsg->id);
terrno = code; sdbSetRawStatus(pRaw, SDB_STATUS_READY);
return -1; if (sdbWrite(pMnode->pSdb, pRaw) != 0) {
code = terrno;
mError("trans:%d, failed to write sdb while commit since %s", pMsg->id, terrstr());
}
trnSendRpcRsp(pMsg, code);
} else {
mDebug("trans:%d, rollback transaction", pMsg->id);
sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
if (sdbWrite(pMnode->pSdb, pRaw) != 0) {
mError("trans:%d, failed to write sdb while rollback since %s", pMsg->id, terrstr());
}
trnSendRpcRsp(pMsg, code);
} }
return 0;
} }
static int32_t trnExecuteArray(SMnode *pMnode, SArray *pArray) { static int32_t trnExecuteArray(SMnode *pMnode, SArray *pArray) {
......
...@@ -32,6 +32,9 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass ...@@ -32,6 +32,9 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass
static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg); static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg);
static int32_t mndProcessAlterUserMsg(SMnodeMsg *pMsg); static int32_t mndProcessAlterUserMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropUserMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropUserMsg(SMnodeMsg *pMsg);
static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveUsers(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
int32_t mndInitUser(SMnode *pMnode) { int32_t mndInitUser(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_USER, SSdbTable table = {.sdbType = SDB_USER,
...@@ -47,6 +50,9 @@ int32_t mndInitUser(SMnode *pMnode) { ...@@ -47,6 +50,9 @@ int32_t mndInitUser(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_USER, mndProcessAlterUserMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_USER, mndProcessAlterUserMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_USER, mndProcessDropUserMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_USER, mndProcessDropUserMsg);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_USER, mndGetUserMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_USER, mndRetrieveUsers);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_USER, mndCancelGetNextUser);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
...@@ -70,7 +76,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char ...@@ -70,7 +76,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("user:%s, will be created while deploy sdb", userObj.user); mDebug("user:%s, will be created while deploy sdb", userObj.user);
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
} }
...@@ -164,14 +170,14 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { ...@@ -164,14 +170,14 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDstUser) { static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDstUser) {
mTrace("user:%s, perform update action", pSrcUser->user); mTrace("user:%s, perform update action", pSrcUser->user);
memcpy(pSrcUser->user, pDstUser->user, TSDB_USER_LEN); memcpy(pDstUser->user, pSrcUser->user, TSDB_USER_LEN);
memcpy(pSrcUser->pass, pDstUser->pass, TSDB_KEY_LEN); memcpy(pDstUser->pass, pSrcUser->pass, TSDB_KEY_LEN);
memcpy(pSrcUser->acct, pDstUser->acct, TSDB_USER_LEN); memcpy(pDstUser->acct, pSrcUser->acct, TSDB_USER_LEN);
pSrcUser->createdTime = pDstUser->createdTime; pDstUser->createdTime = pSrcUser->createdTime;
pSrcUser->updateTime = pDstUser->updateTime; pDstUser->updateTime = pSrcUser->updateTime;
pSrcUser->superAuth = pDstUser->superAuth; pDstUser->superAuth = pSrcUser->superAuth;
pSrcUser->readAuth = pDstUser->readAuth; pDstUser->readAuth = pSrcUser->readAuth;
pSrcUser->writeAuth = pDstUser->writeAuth; pDstUser->writeAuth = pSrcUser->writeAuth;
return 0; return 0;
} }
...@@ -197,11 +203,15 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, ...@@ -197,11 +203,15 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
userObj.writeAuth = 1; userObj.writeAuth = 1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) return -1; if (pTrans == NULL) {
mError("user:%s, failed to create since %s", user, terrstr());
return -1;
}
mDebug("trans:%d, used to create user:%s", pTrans->id, user);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj); SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("failed to append redo log since %s", terrstr()); mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
...@@ -209,7 +219,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, ...@@ -209,7 +219,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
SSdbRaw *pUndoRaw = mndUserActionEncode(&userObj); SSdbRaw *pUndoRaw = mndUserActionEncode(&userObj);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
mError("failed to append undo log since %s", terrstr()); mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
...@@ -217,13 +227,14 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, ...@@ -217,13 +227,14 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
SSdbRaw *pCommitRaw = mndUserActionEncode(&userObj); SSdbRaw *pCommitRaw = mndUserActionEncode(&userObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("failed to append commit log since %s", terrstr()); mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pTrans, mndSyncPropose) != 0) { if (mndTransPrepare(pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
...@@ -236,6 +247,8 @@ static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg) { ...@@ -236,6 +247,8 @@ static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont; SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont;
mDebug("user:%s, start to create", pCreate->user);
if (pCreate->user[0] == 0) { if (pCreate->user[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_USER_FORMAT; terrno = TSDB_CODE_MND_INVALID_USER_FORMAT;
mError("user:%s, failed to create since %s", pCreate->user, terrstr()); mError("user:%s, failed to create since %s", pCreate->user, terrstr());
...@@ -284,4 +297,102 @@ static int32_t mndProcessDropUserMsg(SMnodeMsg *pMsg) { ...@@ -284,4 +297,102 @@ static int32_t mndProcessDropUserMsg(SMnodeMsg *pMsg) {
terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED;
mError("failed to process drop user msg since %s", terrstr()); mError("failed to process drop user msg since %s", terrstr());
return -1; return -1;
}
static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "privilege");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "account");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
strcpy(pMeta->tableFname, "show users");
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_USER);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
return 0;
}
static int32_t mndRetrieveUsers(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SUserObj *pUser = NULL;
int32_t cols = 0;
char *pWrite;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_USER, pShow->pIter, (void **)&pUser);
if (pShow->pIter == NULL) break;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pUser->user, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pUser->superAuth) {
const char *src = "super";
STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
} else if (pUser->writeAuth) {
const char *src = "writable";
STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
} else {
const char *src = "readable";
STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
}
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pUser->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pUser->acct, pShow->bytes[cols]);
cols++;
numOfRows++;
sdbRelease(pSdb, pUser);
}
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows;
return numOfRows;
}
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
} }
\ No newline at end of file
...@@ -333,7 +333,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { ...@@ -333,7 +333,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg)); SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
if (pMsg == NULL) { if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to create msg since %s", terrstr()); mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr());
return NULL; return NULL;
} }
...@@ -341,7 +341,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { ...@@ -341,7 +341,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
if (rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { if (rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
mndCleanupMsg(pMsg); mndCleanupMsg(pMsg);
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
mError("failed to create msg since %s", terrstr()); mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr());
return NULL; return NULL;
} }
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
...@@ -350,13 +350,13 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { ...@@ -350,13 +350,13 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
pMsg->rpcMsg = *pRpcMsg; pMsg->rpcMsg = *pRpcMsg;
pMsg->createdTime = taosGetTimestampSec(); pMsg->createdTime = taosGetTimestampSec();
mTrace("msg:%p, is created", pMsg); mTrace("msg:%p, app:%p is created, RPC:%p", pMsg, pRpcMsg->ahandle, pRpcMsg->handle);
return pMsg; return pMsg;
} }
void mndCleanupMsg(SMnodeMsg *pMsg) { void mndCleanupMsg(SMnodeMsg *pMsg) {
mTrace("msg:%p, app:%p is destroyed, RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
mTrace("msg:%p, is destroyed", pMsg);
} }
void mndSendRsp(SMnodeMsg *pMsg, int32_t code) { void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {
...@@ -371,7 +371,7 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) { ...@@ -371,7 +371,7 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
void *ahandle = pMsg->rpcMsg.ahandle; void *ahandle = pMsg->rpcMsg.ahandle;
bool isReq = (msgType % 2 == 1); bool isReq = (msgType % 2 == 1);
mTrace("msg:%p, app:%p will be processed", pMsg, ahandle); mTrace("msg:%p, app:%p type:%s will be processed", pMsg, ahandle, taosMsg[msgType]);
if (isReq && !mndIsMaster(pMnode)) { if (isReq && !mndIsMaster(pMnode)) {
code = TSDB_CODE_APP_NOT_READY; code = TSDB_CODE_APP_NOT_READY;
...@@ -393,7 +393,10 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) { ...@@ -393,7 +393,10 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
} }
code = (*fp)(pMsg); code = (*fp)(pMsg);
if (code != 0) { if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mTrace("msg:%p, app:%p in progressing", pMsg, ahandle);
return;
} else if (code != 0) {
code = terrno; code = terrno;
mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr()); mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
goto PROCESS_RPC_END; goto PROCESS_RPC_END;
......
...@@ -71,7 +71,6 @@ typedef struct SSdb { ...@@ -71,7 +71,6 @@ typedef struct SSdb {
} SSdb; } SSdb;
int32_t sdbWriteFile(SSdb *pSdb); int32_t sdbWriteFile(SSdb *pSdb);
int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -94,7 +94,7 @@ void sdbCleanup(SSdb *pSdb) { ...@@ -94,7 +94,7 @@ void sdbCleanup(SSdb *pSdb) {
taosHashClear(hash); taosHashClear(hash);
taosHashCleanup(hash); taosHashCleanup(hash);
pSdb->hashObjs[i] = NULL; pSdb->hashObjs[i] = NULL;
mTrace("sdb table:%d is cleaned up", i); mDebug("sdb table:%d is cleaned up", i);
} }
free(pSdb); free(pSdb);
...@@ -129,7 +129,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { ...@@ -129,7 +129,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
pSdb->hashObjs[sdbType] = hash; pSdb->hashObjs[sdbType] = hash;
taosInitRWLatch(&pSdb->locks[sdbType]); taosInitRWLatch(&pSdb->locks[sdbType]);
mTrace("sdb table:%d is initialized", sdbType); mDebug("sdb table:%d is initialized", sdbType);
return 0; return 0;
} }
\ No newline at end of file
...@@ -118,7 +118,7 @@ int32_t sdbReadFile(SSdb *pSdb) { ...@@ -118,7 +118,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
break; break;
} }
code = sdbWriteRaw(pSdb, pRaw); code = sdbWriteNotFree(pSdb, pRaw);
if (code != 0) { if (code != 0) {
mError("failed to read file:%s since %s", file, terrstr()); mError("failed to read file:%s since %s", file, terrstr());
goto PARSE_SDB_DATA_ERROR; goto PARSE_SDB_DATA_ERROR;
......
...@@ -98,7 +98,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -98,7 +98,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
} }
SSdbRow *pDstRow = *ppDstRow; SSdbRow *pDstRow = *ppDstRow;
pRow->status = pRaw->status; pDstRow->status = pRaw->status;
taosRUnLockLatch(pLock); taosRUnLockLatch(pLock);
SdbUpdateFp updateFp = pSdb->updateFps[pRow->type]; SdbUpdateFp updateFp = pSdb->updateFps[pRow->type];
...@@ -138,7 +138,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -138,7 +138,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
return code; return code;
} }
int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw) { int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) {
SHashObj *hash = sdbGetHash(pSdb, pRaw->type); SHashObj *hash = sdbGetHash(pSdb, pRaw->type);
if (hash == NULL) return terrno; if (hash == NULL) return terrno;
...@@ -158,7 +158,6 @@ int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw) { ...@@ -158,7 +158,6 @@ int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw) {
code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize); code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize);
break; break;
case SDB_STATUS_READY: case SDB_STATUS_READY:
case SDB_STATUS_DROPPING:
code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize); code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize);
break; break;
case SDB_STATUS_DROPPED: case SDB_STATUS_DROPPED:
...@@ -170,7 +169,7 @@ int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw) { ...@@ -170,7 +169,7 @@ int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw) {
} }
int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) { int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) {
int32_t code = sdbWriteRaw(pSdb, pRaw); int32_t code = sdbWriteNotFree(pSdb, pRaw);
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
return code; return code;
} }
...@@ -201,7 +200,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) { ...@@ -201,7 +200,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) {
case SDB_STATUS_CREATING: case SDB_STATUS_CREATING:
terrno = TSDB_CODE_SDB_OBJ_CREATING; terrno = TSDB_CODE_SDB_OBJ_CREATING;
break; break;
case SDB_STATUS_DROPPING: case SDB_STATUS_DROPPED:
terrno = TSDB_CODE_SDB_OBJ_DROPPING; terrno = TSDB_CODE_SDB_OBJ_DROPPING;
break; break;
default: default:
......
...@@ -26,10 +26,15 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) { ...@@ -26,10 +26,15 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
pRaw->type = type; pRaw->type = type;
pRaw->sver = sver; pRaw->sver = sver;
pRaw->dataLen = dataLen; pRaw->dataLen = dataLen;
mTrace("raw:%p, is created, len:%d", pRaw, dataLen);
return pRaw; return pRaw;
} }
void sdbFreeRaw(SSdbRaw *pRaw) { free(pRaw); } void sdbFreeRaw(SSdbRaw *pRaw) {
mTrace("raw:%p, is freed", pRaw);
free(pRaw);
}
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val) { int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val) {
if (pRaw == NULL) { if (pRaw == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册