提交 a7e1c7ce 编写于 作者: S slguan

[TD-52] refactor sdb codes

上级 0e146b78
......@@ -49,11 +49,10 @@ typedef struct _mnode_obj {
int8_t reserved[14];
int8_t updateEnd[1];
int32_t refCount;
int8_t role;
int8_t status;
uint16_t port;
uint32_t privateIp;
uint32_t publicIp;
uint16_t port;
int8_t role;
char mnodeName[TSDB_NODE_NAME_LEN + 1];
} SMnodeObj;
......
......@@ -28,27 +28,28 @@ enum _TAOS_MN_STATUS {
TAOS_MN_STATUS_READY
};
// general implementation
int32_t mpeerInit();
void mpeerCleanup();
// special implementation
int32_t mpeerInitMnodes();
void mpeerCleanupMnodes();
int32_t mpeerAddMnode(int32_t dnodeId);
int32_t mpeerRemoveMnode(int32_t dnodeId);
void * mpeerGetMnode(int32_t mnodeId);
int32_t mpeerGetMnodesNum();
void * mpeerGetNextMnode(void *pNode, struct _mnode_obj **pMnode);
void mpeerReleaseMnode(struct _mnode_obj *pMnode);
bool mpeerInServerStatus();
bool mpeerIsMaster();
bool mpeerCheckRedirect();
void mpeerGetPrivateIpList(SRpcIpSet *ipSet);
void mpeerGetPublicIpList(SRpcIpSet *ipSet);
void mpeerGetMpeerInfos(void *mpeers);
char * mpeerGetMnodeStatusStr(int32_t status);
char * mpeerGetMnodeRoleStr(int32_t role);
int32_t mpeerAddMnode(int32_t dnodeId);
int32_t mpeerRemoveMnode(int32_t dnodeId);
int32_t sdbForwardDbReqToPeer(void *pHead);
int32_t mpeerForwardReqToPeer(void *pHead);
#ifdef __cplusplus
}
......
......@@ -31,6 +31,7 @@ struct _dnode_obj;
int32_t balanceInit();
void balanceCleanUp();
void balanceNotify();
void balanceReset();
int32_t balanceAllocVnodes(struct _vg_obj *pVgroup);
int32_t balanceDropDnode(struct _dnode_obj *pDnode);
......
......@@ -64,25 +64,22 @@ typedef struct {
int32_t (*encodeFp)(SSdbOperDesc *pOper);
int32_t (*decodeFp)(SSdbOperDesc *pDesc);
int32_t (*destroyFp)(SSdbOperDesc *pDesc);
int32_t (*updateAllFp)();
int32_t (*restoredFp)();
} SSdbTableDesc;
typedef struct {
int32_t code;
int64_t version;
void * sync;
void * wal;
sem_t sem;
pthread_mutex_t mutex;
} SSdbObject;
int32_t sdbInit();
void sdbCleanUp();
SSdbObject *sdbGetObj();
int sdbProcessWrite(void *param, void *data, int type);
void * sdbOpenTable(SSdbTableDesc *desc);
void sdbCloseTable(void *handle);
int sdbProcessWrite(void *param, void *data, int type);
int32_t sdbInsertRow(SSdbOperDesc *pOper);
int32_t sdbDeleteRow(SSdbOperDesc *pOper);
......
......@@ -102,7 +102,7 @@ static int32_t mgmtDbActionDecode(SSdbOperDesc *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtDbActionUpdateAll() {
static int32_t mgmtDbActionRestored() {
return 0;
}
......@@ -123,7 +123,7 @@ int32_t mgmtInitDbs() {
.encodeFp = mgmtDbActionEncode,
.decodeFp = mgmtDbActionDecode,
.destroyFp = mgmtDbActionDestroy,
.updateAllFp = mgmtDbActionUpdateAll
.restoredFp = mgmtDbActionRestored
};
tsDbSdb = sdbOpenTable(&tableDesc);
......
......@@ -19,12 +19,9 @@
#include "trpc.h"
#include "tsync.h"
#include "mpeer.h"
#include "mgmtSdb.h"
#include "mgmtShell.h"
#include "mgmtUser.h"
extern int32_t mpeerInitMnodes();
extern void mpeerCleanupMnodes();
static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......@@ -34,18 +31,24 @@ static SMnodeObj tsMnodeObj = {0};
int32_t mpeerInitMnodes() {
tsMnodeObj.mnodeId = 1;
tsMnodeObj.dnodeId = 1;
tsMnodeObj.privateIp = inet_addr(tsPrivateIp);
tsMnodeObj.publicIp = inet_addr(tsPublicIp);
tsMnodeObj.createdTime = taosGetTimestampMs();
tsMnodeObj.role = TAOS_SYNC_ROLE_MASTER;
tsMnodeObj.status = TAOS_MN_STATUS_READY;
tsMnodeObj.port = tsMnodeDnodePort;
sprintf(tsMnodeObj.mnodeName, "m%d", tsMnodeObj.mnodeId);
return TSDB_CODE_SUCCESS;
}
void mpeerCleanupMnodes() {}
int32_t mpeerAddMnode(int32_t dnodeId) { return TSDB_CODE_SUCCESS; }
int32_t mpeerRemoveMnode(int32_t dnodeId) { return TSDB_CODE_SUCCESS; }
void * mpeerGetMnode(int32_t mnodeId) { return &tsMnodeObj; }
int32_t mpeerGetMnodesNum() { return 1; }
void mpeerReleaseMnode(struct _mnode_obj *pMnode) {}
bool mpeerIsMaster() { return tsMnodeObj.role == TAOS_SYNC_ROLE_MASTER; }
void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) {
if (*pMnode == NULL) {
*pMnode = &tsMnodeObj;
......@@ -58,20 +61,21 @@ void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) {
void mpeerGetPrivateIpList(SRpcIpSet *ipSet) {
ipSet->inUse = 0;
ipSet->port = htons(tsMnodeDnodePort);
ipSet->numOfIps = 1;
ipSet->port = htons(tsMnodeObj.port);
ipSet->ip[0] = htonl(tsMnodeObj.privateIp);
}
void mpeerGetPublicIpList(SRpcIpSet *ipSet) {
ipSet->inUse = 0;
ipSet->port = htons(tsMnodeDnodePort);
ipSet->numOfIps = 1;
ipSet->port = htons(tsMnodeObj.port);
ipSet->ip[0] = htonl(tsMnodeObj.publicIp);
}
void mpeerGetMpeerInfos(void *param) {
SDMNodeInfos *mpeers = param;
mpeers->inUse = 0;
mpeers->nodeNum = 1;
mpeers->nodeInfos[0].nodeId = htonl(tsMnodeObj.mnodeId);
mpeers->nodeInfos[0].nodeIp = htonl(tsMnodeObj.privateIp);
......@@ -79,12 +83,9 @@ void mpeerGetMpeerInfos(void *param) {
strcpy(mpeers->nodeInfos[0].nodeName, tsMnodeObj.mnodeName);
}
void mpeerCleanupMnodes() {}
int32_t mpeerGetMnodesNum() { return 1; }
void mpeerReleaseMnode(struct _mnode_obj *pMnode) {}
bool mpeerInServerStatus() { return tsMnodeObj.status == TAOS_MN_STATUS_READY; }
bool mpeerIsMaster() { return tsMnodeObj.role == TAOS_SYNC_ROLE_MASTER; }
bool mpeerCheckRedirect() { return false; }
int32_t mpeerForwardReqToPeer(void *pHead) {
return TSDB_CODE_SUCCESS;
}
#endif
......@@ -98,20 +99,7 @@ void mpeerCleanup() {
mpeerCleanupMnodes();
}
char *mpeerGetMnodeStatusStr(int32_t status) {
switch (status) {
case TAOS_MN_STATUS_OFFLINE:
return "offline";
case TAOS_MN_STATUS_DROPPING:
return "dropping";
case TAOS_MN_STATUS_READY:
return "ready";
default:
return "undefined";
}
}
char *mpeerGetMnodeRoleStr(int32_t role) {
static char *mpeerGetMnodeRoleStr(int32_t role) {
switch (role) {
case TAOS_SYNC_ROLE_OFFLINE:
return "offline";
......@@ -159,12 +147,6 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "role");
......@@ -219,14 +201,12 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, mpeerGetMnodeStatusStr(pMnode->status));
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, mpeerGetMnodeRoleStr(pMnode->role));
cols++;
numOfRows++;
mpeerReleaseMnode(pMnode);
}
pShow->numOfReads += numOfRows;
......
......@@ -15,18 +15,13 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "trpc.h"
#include "tutil.h"
#include "twal.h"
#include "tsync.h"
#include "mpeer.h"
#include "hashint.h"
#include "hashstr.h"
#include "mpeer.h"
#include "mgmtSdb.h"
typedef struct _SSdbTable {
......@@ -39,13 +34,13 @@ typedef struct _SSdbTable {
int32_t autoIndex;
int64_t numOfRows;
void * iHandle;
int32_t (*insertFp)(SSdbOperDesc *pDesc);
int32_t (*deleteFp)(SSdbOperDesc *pOper);
int32_t (*updateFp)(SSdbOperDesc *pOper);
int32_t (*decodeFp)(SSdbOperDesc *pOper);
int32_t (*encodeFp)(SSdbOperDesc *pOper);
int32_t (*destroyFp)(SSdbOperDesc *pOper);
int32_t (*updateAllFp)();
int32_t (*insertFp)(SSdbOperDesc *pDesc);
int32_t (*deleteFp)(SSdbOperDesc *pOper);
int32_t (*updateFp)(SSdbOperDesc *pOper);
int32_t (*decodeFp)(SSdbOperDesc *pOper);
int32_t (*encodeFp)(SSdbOperDesc *pOper);
int32_t (*destroyFp)(SSdbOperDesc *pOper);
int32_t (*restoredFp)();
pthread_mutex_t mutex;
} SSdbTable;
......@@ -105,15 +100,8 @@ static void *sdbGetTableFromId(int32_t tableId) {
return tsSdbTableList[tableId];
}
#ifndef _MPEER
int32_t sdbForwardDbReqToPeer(void *pHead) {
return TSDB_CODE_SUCCESS;
}
#endif
int32_t sdbInit() {
tsSdbObj = calloc(1, sizeof(SSdbObject));
sem_init(&tsSdbObj->sem, 0, 0);
pthread_mutex_init(&tsSdbObj->mutex, NULL);
SWalCfg walCfg = {.commitLog = 2, .wals = 2, .keep = 1};
......@@ -131,8 +119,8 @@ int32_t sdbInit() {
for (int32_t tableId = SDB_TABLE_MNODE; tableId < SDB_TABLE_MAX; ++tableId) {
SSdbTable *pTable = sdbGetTableFromId(tableId);
if (pTable == NULL) continue;
if (pTable->updateAllFp) {
(*pTable->updateAllFp)();
if (pTable->restoredFp) {
(*pTable->restoredFp)();
}
totalRows += pTable->numOfRows;
......@@ -146,7 +134,6 @@ int32_t sdbInit() {
void sdbCleanUp() {
if (tsSdbObj) {
sem_destroy(&tsSdbObj->sem);
pthread_mutex_destroy(&tsSdbObj->mutex);
walClose(tsSdbObj->wal);
free(tsSdbObj);
......@@ -268,7 +255,7 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_
tsSdbObj->version++;
pHead->version = tsSdbObj->version;
code = sdbForwardDbReqToPeer(pHead);
code = mpeerForwardReqToPeer(pHead);
if (code != TSDB_CODE_SUCCESS) {
pthread_mutex_unlock(&tsSdbObj->mutex);
sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName,
......@@ -523,7 +510,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
pTable->encodeFp = pDesc->encodeFp;
pTable->decodeFp = pDesc->decodeFp;
pTable->destroyFp = pDesc->destroyFp;
pTable->updateAllFp = pDesc->updateAllFp;
pTable->restoredFp = pDesc->restoredFp;
if (sdbInitIndexFp[pTable->keyType] != NULL) {
pTable->iHandle = (*sdbInitIndexFp[pTable->keyType])(pTable->maxRowSize, sizeof(SRowMeta));
......
......@@ -42,7 +42,6 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *sec
static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg);
static void mgmtProcessMsgFromShell(SRpcMsg *pMsg);
static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg);
static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg);
static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessHeartBeatMsg(SQueuedMsg *queuedMsg);
......@@ -142,19 +141,13 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return;
}
if (mpeerCheckRedirect()) {
if (!mpeerIsMaster()) {
// rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect());
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NO_MASTER);
rpcFreeCont(rpcMsg->pCont);
return;
}
if (!mpeerInServerStatus()) {
mgmtProcessMsgWhileNotReady(rpcMsg);
rpcFreeCont(rpcMsg->pCont);
return;
}
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_GRANT_EXPIRED);
rpcFreeCont(rpcMsg->pCont);
......@@ -501,18 +494,6 @@ static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) {
rpcSendResponse(&rpcRsp);
}
static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg) {
mTrace("%s is ignored since SDB is not ready", taosMsg[rpcMsg->msgType]);
SRpcMsg rpcRsp = {
.msgType = 0,
.pCont = 0,
.contLen = 0,
.code = TSDB_CODE_NOT_READY,
.handle = rpcMsg->handle
};
rpcSendResponse(&rpcRsp);
}
void mgmtSendSimpleResp(void *thandle, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = 0,
......
......@@ -220,7 +220,7 @@ static int32_t mgmtChildTableActionDecode(SSdbOperDesc *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtChildTableActionUpdateAll() {
static int32_t mgmtChildTableActionRestored() {
void *pNode = NULL;
void *pLastNode = NULL;
SChildTableObj *pTable = NULL;
......@@ -320,7 +320,7 @@ static int32_t mgmtInitChildTables() {
.encodeFp = mgmtChildTableActionEncode,
.decodeFp = mgmtChildTableActionDecode,
.destroyFp = mgmtChildTableActionDestroy,
.updateAllFp = mgmtChildTableActionUpdateAll
.restoredFp = mgmtChildTableActionRestored
};
tsChildTableSdb = sdbOpenTable(&tableDesc);
......@@ -414,7 +414,7 @@ static int32_t mgmtSuperTableActionDecode(SSdbOperDesc *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtSuperTableActionUpdateAll() {
static int32_t mgmtSuperTableActionRestored() {
return 0;
}
......@@ -435,7 +435,7 @@ static int32_t mgmtInitSuperTables() {
.encodeFp = mgmtSuperTableActionEncode,
.decodeFp = mgmtSuperTableActionDecode,
.destroyFp = mgmtSuperTableActionDestroy,
.updateAllFp = mgmtSuperTableActionUpdateAll
.restoredFp = mgmtSuperTableActionRestored
};
tsSuperTableSdb = sdbOpenTable(&tableDesc);
......
......@@ -84,7 +84,7 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtUserActionUpdateAll() {
static int32_t mgmtUserActionRestored() {
SAcctObj *pAcct = acctGetAcct("root");
mgmtCreateUser(pAcct, "root", "taosdata");
mgmtCreateUser(pAcct, "monitor", tsInternalPass);
......@@ -111,7 +111,7 @@ int32_t mgmtInitUsers() {
.encodeFp = mgmtUserActionEncode,
.decodeFp = mgmtUserActionDecode,
.destroyFp = mgmtUserActionDestroy,
.updateAllFp = mgmtUserActionUpdateAll
.restoredFp = mgmtUserActionRestored
};
tsUserSdb = sdbOpenTable(&tableDesc);
......
......@@ -152,7 +152,7 @@ static int32_t mgmtVgroupActionDecode(SSdbOperDesc *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtVgroupActionUpdateAll() {
static int32_t mgmtVgroupActionRestored() {
return 0;
}
......@@ -173,7 +173,7 @@ int32_t mgmtInitVgroups() {
.encodeFp = mgmtVgroupActionEncode,
.decodeFp = mgmtVgroupActionDecode,
.destroyFp = mgmtVgroupActionDestroy,
.updateAllFp = mgmtVgroupActionUpdateAll,
.restoredFp = mgmtVgroupActionRestored,
};
tsVgroupSdb = sdbOpenTable(&tableDesc);
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2
system sh/exec_up.sh -n dnode1 -s start
system sh/exec_up.sh -n dnode2 -s start
sql connect
\ No newline at end of file
system sh/deploy.sh -n dnode3 -m 192.168.0.1 -i 192.168.0.3
system sh/cfg.sh -n dnode1 -c numOfMPeers -v 3
system sh/cfg.sh -n dnode2 -c numOfMPeers -v 3
system sh/cfg.sh -n dnode3 -c numOfMPeers -v 3
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册