未验证 提交 98a86b3c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4376 from taosdata/feature/wal

TD-2266
...@@ -444,12 +444,12 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) { ...@@ -444,12 +444,12 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
SCreateMnodeMsg *pCfg = pMsg->pCont; SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
if (pCfg->dnodeId != dnodeGetDnodeId()) { if (pCfg->dnodeId != dnodeGetDnodeId()) {
dError("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId()); dDebug("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
} }
if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) { if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
dError("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp); dDebug("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED; return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
} }
......
...@@ -79,10 +79,13 @@ typedef struct { ...@@ -79,10 +79,13 @@ typedef struct {
int32_t (*fpRestored)(); int32_t (*fpRestored)();
} SSdbTableDesc; } SSdbTableDesc;
int32_t sdbInitRef();
void sdbCleanUpRef();
int32_t sdbInit(); int32_t sdbInit();
void sdbCleanUp(); void sdbCleanUp();
void * sdbOpenTable(SSdbTableDesc *desc); int64_t sdbOpenTable(SSdbTableDesc *desc);
void sdbCloseTable(void *handle); void sdbCloseTable(int64_t rid);
void* sdbGetTableByRid(int64_t rid);
bool sdbIsMaster(); bool sdbIsMaster();
bool sdbIsServing(); bool sdbIsServing();
void sdbUpdateMnodeRoles(); void sdbUpdateMnodeRoles();
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
int64_t tsAcctRid = -1;
void * tsAcctSdb = NULL; void * tsAcctSdb = NULL;
static int32_t tsAcctUpdateSize; static int32_t tsAcctUpdateSize;
static int32_t mnodeCreateRootAcct(); static int32_t mnodeCreateRootAcct();
...@@ -114,7 +115,8 @@ int32_t mnodeInitAccts() { ...@@ -114,7 +115,8 @@ int32_t mnodeInitAccts() {
.fpRestored = mnodeAcctActionRestored .fpRestored = mnodeAcctActionRestored
}; };
tsAcctSdb = sdbOpenTable(&desc); tsAcctRid = sdbOpenTable(&desc);
tsAcctSdb = sdbGetTableByRid(tsAcctRid);
if (tsAcctSdb == NULL) { if (tsAcctSdb == NULL) {
mError("table:%s, failed to create hash", desc.name); mError("table:%s, failed to create hash", desc.name);
return -1; return -1;
...@@ -126,7 +128,7 @@ int32_t mnodeInitAccts() { ...@@ -126,7 +128,7 @@ int32_t mnodeInitAccts() {
void mnodeCleanupAccts() { void mnodeCleanupAccts() {
acctCleanUp(); acctCleanUp();
sdbCloseTable(tsAcctSdb); sdbCloseTable(tsAcctRid);
tsAcctSdb = NULL; tsAcctSdb = NULL;
} }
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "mnodeShow.h" #include "mnodeShow.h"
#include "tglobal.h" #include "tglobal.h"
int64_t tsClusterRid = -1;
static void * tsClusterSdb = NULL; static void * tsClusterSdb = NULL;
static int32_t tsClusterUpdateSize; static int32_t tsClusterUpdateSize;
static char tsClusterId[TSDB_CLUSTER_ID_LEN]; static char tsClusterId[TSDB_CLUSTER_ID_LEN];
...@@ -101,9 +102,10 @@ int32_t mnodeInitCluster() { ...@@ -101,9 +102,10 @@ int32_t mnodeInitCluster() {
.fpRestored = mnodeClusterActionRestored .fpRestored = mnodeClusterActionRestored
}; };
tsClusterSdb = sdbOpenTable(&desc); tsClusterRid = sdbOpenTable(&desc);
tsClusterSdb = sdbGetTableByRid(tsClusterRid);
if (tsClusterSdb == NULL) { if (tsClusterSdb == NULL) {
mError("table:%s, failed to create hash", desc.name); mError("table:%s, rid:%" PRId64 ", failed to create hash", desc.name, tsClusterRid);
return -1; return -1;
} }
...@@ -116,7 +118,7 @@ int32_t mnodeInitCluster() { ...@@ -116,7 +118,7 @@ int32_t mnodeInitCluster() {
} }
void mnodeCleanupCluster() { void mnodeCleanupCluster() {
sdbCloseTable(tsClusterSdb); sdbCloseTable(tsClusterRid);
tsClusterSdb = NULL; tsClusterSdb = NULL;
} }
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
#define VG_LIST_SIZE 8 #define VG_LIST_SIZE 8
int64_t tsDbRid = -1;
static void * tsDbSdb = NULL; static void * tsDbSdb = NULL;
static int32_t tsDbUpdateSize; static int32_t tsDbUpdateSize;
...@@ -160,7 +161,8 @@ int32_t mnodeInitDbs() { ...@@ -160,7 +161,8 @@ int32_t mnodeInitDbs() {
.fpRestored = mnodeDbActionRestored .fpRestored = mnodeDbActionRestored
}; };
tsDbSdb = sdbOpenTable(&desc); tsDbRid = sdbOpenTable(&desc);
tsDbSdb = sdbGetTableByRid(tsDbRid);
if (tsDbSdb == NULL) { if (tsDbSdb == NULL) {
mError("failed to init db data"); mError("failed to init db data");
return -1; return -1;
...@@ -496,7 +498,7 @@ void mnodeRemoveVgroupFromDb(SVgObj *pVgroup) { ...@@ -496,7 +498,7 @@ void mnodeRemoveVgroupFromDb(SVgObj *pVgroup) {
} }
void mnodeCleanupDbs() { void mnodeCleanupDbs() {
sdbCloseTable(tsDbSdb); sdbCloseTable(tsDbRid);
tsDbSdb = NULL; tsDbSdb = NULL;
} }
......
...@@ -39,6 +39,7 @@ ...@@ -39,6 +39,7 @@
#include "mnodeCluster.h" #include "mnodeCluster.h"
int32_t tsAccessSquence = 0; int32_t tsAccessSquence = 0;
int64_t tsDnodeRid = -1;
static void * tsDnodeSdb = NULL; static void * tsDnodeSdb = NULL;
static int32_t tsDnodeUpdateSize = 0; static int32_t tsDnodeUpdateSize = 0;
extern void * tsMnodeSdb; extern void * tsMnodeSdb;
...@@ -187,7 +188,8 @@ int32_t mnodeInitDnodes() { ...@@ -187,7 +188,8 @@ int32_t mnodeInitDnodes() {
.fpRestored = mnodeDnodeActionRestored .fpRestored = mnodeDnodeActionRestored
}; };
tsDnodeSdb = sdbOpenTable(&desc); tsDnodeRid = sdbOpenTable(&desc);
tsDnodeSdb = sdbGetTableByRid(tsDnodeRid);
if (tsDnodeSdb == NULL) { if (tsDnodeSdb == NULL) {
mError("failed to init dnodes data"); mError("failed to init dnodes data");
return -1; return -1;
...@@ -213,7 +215,7 @@ int32_t mnodeInitDnodes() { ...@@ -213,7 +215,7 @@ int32_t mnodeInitDnodes() {
} }
void mnodeCleanupDnodes() { void mnodeCleanupDnodes() {
sdbCloseTable(tsDnodeSdb); sdbCloseTable(tsDnodeRid);
pthread_mutex_destroy(&tsDnodeEpsMutex); pthread_mutex_destroy(&tsDnodeEpsMutex);
free(tsDnodeEps); free(tsDnodeEps);
tsDnodeEps = NULL; tsDnodeEps = NULL;
......
...@@ -47,6 +47,7 @@ void *tsMnodeTmr = NULL; ...@@ -47,6 +47,7 @@ void *tsMnodeTmr = NULL;
static bool tsMgmtIsRunning = false; static bool tsMgmtIsRunning = false;
static const SMnodeComponent tsMnodeComponents[] = { static const SMnodeComponent tsMnodeComponents[] = {
{"sdbref", sdbInitRef, sdbCleanUpRef},
{"profile", mnodeInitProfile, mnodeCleanupProfile}, {"profile", mnodeInitProfile, mnodeCleanupProfile},
{"cluster", mnodeInitCluster, mnodeCleanupCluster}, {"cluster", mnodeInitCluster, mnodeCleanupCluster},
{"accts", mnodeInitAccts, mnodeCleanupAccts}, {"accts", mnodeInitAccts, mnodeCleanupAccts},
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
int64_t tsMnodeRid = -1;
static void * tsMnodeSdb = NULL; static void * tsMnodeSdb = NULL;
static int32_t tsMnodeUpdateSize = 0; static int32_t tsMnodeUpdateSize = 0;
static SRpcEpSet tsMnodeEpSetForShell; static SRpcEpSet tsMnodeEpSetForShell;
...@@ -153,7 +154,8 @@ int32_t mnodeInitMnodes() { ...@@ -153,7 +154,8 @@ int32_t mnodeInitMnodes() {
.fpRestored = mnodeMnodeActionRestored .fpRestored = mnodeMnodeActionRestored
}; };
tsMnodeSdb = sdbOpenTable(&desc); tsMnodeRid = sdbOpenTable(&desc);
tsMnodeSdb = sdbGetTableByRid(tsMnodeRid);
if (tsMnodeSdb == NULL) { if (tsMnodeSdb == NULL) {
mError("failed to init mnodes data"); mError("failed to init mnodes data");
return -1; return -1;
...@@ -168,7 +170,7 @@ int32_t mnodeInitMnodes() { ...@@ -168,7 +170,7 @@ int32_t mnodeInitMnodes() {
} }
void mnodeCleanupMnodes() { void mnodeCleanupMnodes() {
sdbCloseTable(tsMnodeSdb); sdbCloseTable(tsMnodeRid);
tsMnodeSdb = NULL; tsMnodeSdb = NULL;
mnodeMnodeDestroyLock(); mnodeMnodeDestroyLock();
} }
...@@ -251,12 +253,30 @@ void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) { ...@@ -251,12 +253,30 @@ void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) {
mnodeMnodeRdLock(); mnodeMnodeRdLock();
*epSet = tsMnodeEpSetForPeer; *epSet = tsMnodeEpSetForPeer;
mnodeMnodeUnLock(); mnodeMnodeUnLock();
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mTrace("mnode:%d, for peer ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mTrace("mpeer:%d, for peer ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
} }
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) { void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) {
mnodeMnodeRdLock(); mnodeMnodeRdLock();
*epSet = tsMnodeEpSetForShell; *epSet = tsMnodeEpSetForShell;
mnodeMnodeUnLock(); mnodeMnodeUnLock();
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mTrace("mnode:%d, for shell ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mTrace("mnode:%d, for shell ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
} }
char* mnodeGetMnodeMasterEp() { char* mnodeGetMnodeMasterEp() {
......
...@@ -58,16 +58,8 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { ...@@ -58,16 +58,8 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("msg:%p, ahandle:%p type:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d", pMsg, mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle,
pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("mpeer:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("mpeer:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
} }
......
...@@ -34,7 +34,6 @@ ...@@ -34,7 +34,6 @@
#define QUERY_ID_SIZE 20 #define QUERY_ID_SIZE 20
#define QUERY_STREAM_SAVE_SIZE 20 #define QUERY_STREAM_SAVE_SIZE 20
extern void *tsMnodeTmr;
static SCacheObj *tsMnodeConnCache = NULL; static SCacheObj *tsMnodeConnCache = NULL;
static int32_t tsConnIndex = 0; static int32_t tsConnIndex = 0;
......
...@@ -51,21 +51,12 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { ...@@ -51,21 +51,12 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp; SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
mnodeGetMnodeEpSetForShell(epSet); mnodeGetMnodeEpSetForShell(epSet);
mDebug("msg:%p, app:%p type:%s in mread queue will be redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "taoserror.h" #include "taoserror.h"
#include "hash.h" #include "hash.h"
#include "tutil.h" #include "tutil.h"
#include "tref.h"
#include "tbalance.h" #include "tbalance.h"
#include "tqueue.h" #include "tqueue.h"
#include "twal.h" #include "twal.h"
...@@ -98,6 +99,7 @@ typedef struct { ...@@ -98,6 +99,7 @@ typedef struct {
SSdbWorker *worker; SSdbWorker *worker;
} SSdbWorkerPool; } SSdbWorkerPool;
int32_t tsSdbRid;
extern void * tsMnodeTmr; extern void * tsMnodeTmr;
static void * tsSdbTmr; static void * tsSdbTmr;
static SSdbMgmt tsSdbMgmt = {0}; static SSdbMgmt tsSdbMgmt = {0};
...@@ -118,6 +120,7 @@ static void sdbFreeQueue(); ...@@ -118,6 +120,7 @@ static void sdbFreeQueue();
static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow); static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow);
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow); static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow);
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow); static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow);
static void sdbCloseTableObj(void *handle);
int32_t sdbGetId(void *pTable) { int32_t sdbGetId(void *pTable) {
return ((SSdbTable *)pTable)->autoIndex; return ((SSdbTable *)pTable)->autoIndex;
...@@ -385,6 +388,17 @@ void sdbUpdateSync(void *pMnodes) { ...@@ -385,6 +388,17 @@ void sdbUpdateSync(void *pMnodes) {
sdbUpdateMnodeRoles(); sdbUpdateMnodeRoles();
} }
int32_t sdbInitRef() {
tsSdbRid = taosOpenRef(10, sdbCloseTableObj);
if (tsSdbRid <= 0) {
sdbError("failed to init sdb ref");
return -1;
}
return 0;
}
void sdbCleanUpRef() { taosCloseRef(tsSdbRid); }
int32_t sdbInit() { int32_t sdbInit() {
pthread_mutex_init(&tsSdbMgmt.mutex, NULL); pthread_mutex_init(&tsSdbMgmt.mutex, NULL);
...@@ -423,7 +437,7 @@ void sdbCleanUp() { ...@@ -423,7 +437,7 @@ void sdbCleanUp() {
walClose(tsSdbMgmt.wal); walClose(tsSdbMgmt.wal);
tsSdbMgmt.wal = NULL; tsSdbMgmt.wal = NULL;
} }
pthread_mutex_destroy(&tsSdbMgmt.mutex); pthread_mutex_destroy(&tsSdbMgmt.mutex);
} }
...@@ -506,7 +520,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow) { ...@@ -506,7 +520,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow) {
atomic_add_fetch_32(&pTable->autoIndex, 1); atomic_add_fetch_32(&pTable->autoIndex, 1);
} }
sdbDebug("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->name, sdbTrace("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->name,
sdbGetRowStr(pTable, pRow->pObj), pRow->rowSize, pTable->numOfRows, pRow->pMsg); sdbGetRowStr(pTable, pRow->pObj), pRow->rowSize, pTable->numOfRows, pRow->pMsg);
int32_t code = (*pTable->fpInsert)(pRow); int32_t code = (*pTable->fpInsert)(pRow);
...@@ -542,7 +556,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) { ...@@ -542,7 +556,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) {
atomic_sub_fetch_32(&pTable->numOfRows, 1); atomic_sub_fetch_32(&pTable->numOfRows, 1);
sdbDebug("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->name, sdbTrace("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->name,
sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg); sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg);
sdbDecRef(pTable, pRow->pObj); sdbDecRef(pTable, pRow->pObj);
...@@ -551,7 +565,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) { ...@@ -551,7 +565,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) {
} }
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow) { static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow) {
sdbDebug("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->name, sdbTrace("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->name,
sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg); sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg);
(*pTable->fpUpdate)(pRow); (*pTable->fpUpdate)(pRow);
...@@ -649,7 +663,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void * ...@@ -649,7 +663,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
return syncCode; return syncCode;
} }
sdbDebug("vgId:1, sdb:%s, record from wal/fwd is disposed, action:%s key:%s hver:%" PRIu64, pTable->name, sdbTrace("vgId:1, sdb:%s, record from %s is disposed, action:%s key:%s hver:%" PRIu64, pTable->name, qtypeStr[qtype],
actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version); actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version);
// even it is WAL/FWD, it shall be called to update version in sync // even it is WAL/FWD, it shall be called to update version in sync
...@@ -801,10 +815,10 @@ void sdbFreeIter(void *tparam, void *pIter) { ...@@ -801,10 +815,10 @@ void sdbFreeIter(void *tparam, void *pIter) {
taosHashCancelIterate(pTable->iHandle, pIter); taosHashCancelIterate(pTable->iHandle, pIter);
} }
void *sdbOpenTable(SSdbTableDesc *pDesc) { int64_t sdbOpenTable(SSdbTableDesc *pDesc) {
SSdbTable *pTable = (SSdbTable *)calloc(1, sizeof(SSdbTable)); SSdbTable *pTable = (SSdbTable *)calloc(1, sizeof(SSdbTable));
if (pTable == NULL) return NULL; if (pTable == NULL) return -1;
pthread_mutex_init(&pTable->mutex, NULL); pthread_mutex_init(&pTable->mutex, NULL);
tstrncpy(pTable->name, pDesc->name, SDB_TABLE_LEN); tstrncpy(pTable->name, pDesc->name, SDB_TABLE_LEN);
...@@ -829,10 +843,21 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { ...@@ -829,10 +843,21 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
tsSdbMgmt.numOfTables++; tsSdbMgmt.numOfTables++;
tsSdbMgmt.tableList[pTable->id] = pTable; tsSdbMgmt.tableList[pTable->id] = pTable;
return pTable;
return taosAddRef(tsSdbRid, pTable);
}
void sdbCloseTable(int64_t rid) {
taosRemoveRef(tsSdbRid, rid);
}
void *sdbGetTableByRid(int64_t rid) {
void *handle = taosAcquireRef(tsSdbRid, rid);
taosReleaseRef(tsSdbRid, rid);
return handle;
} }
void sdbCloseTable(void *handle) { static void sdbCloseTableObj(void *handle) {
SSdbTable *pTable = (SSdbTable *)handle; SSdbTable *pTable = (SSdbTable *)handle;
if (pTable == NULL) return; if (pTable == NULL) return;
...@@ -855,6 +880,7 @@ void sdbCloseTable(void *handle) { ...@@ -855,6 +880,7 @@ void sdbCloseTable(void *handle) {
taosHashCancelIterate(pTable->iHandle, pIter); taosHashCancelIterate(pTable->iHandle, pIter);
taosHashCleanup(pTable->iHandle); taosHashCleanup(pTable->iHandle);
pTable->iHandle = NULL;
pthread_mutex_destroy(&pTable->mutex); pthread_mutex_destroy(&pTable->mutex);
sdbDebug("vgId:1, sdb:%s, is closed, numOfTables:%d", pTable->name, tsSdbMgmt.numOfTables); sdbDebug("vgId:1, sdb:%s, is closed, numOfTables:%d", pTable->name, tsSdbMgmt.numOfTables);
......
...@@ -52,7 +52,6 @@ static bool mnodeCheckShowFinished(SShowObj *pShow); ...@@ -52,7 +52,6 @@ static bool mnodeCheckShowFinished(SShowObj *pShow);
static void *mnodePutShowObj(SShowObj *pShow); static void *mnodePutShowObj(SShowObj *pShow);
static void mnodeReleaseShowObj(SShowObj *pShow, bool forceRemove); static void mnodeReleaseShowObj(SShowObj *pShow, bool forceRemove);
extern void *tsMnodeTmr;
static void *tsMnodeShowCache = NULL; static void *tsMnodeShowCache = NULL;
static int32_t tsShowObjIndex = 0; static int32_t tsShowObjIndex = 0;
static SShowMetaFp tsMnodeShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowMetaFp tsMnodeShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0};
......
...@@ -49,7 +49,9 @@ ...@@ -49,7 +49,9 @@
#define CREATE_CTABLE_RETRY_TIMES 10 #define CREATE_CTABLE_RETRY_TIMES 10
#define CREATE_CTABLE_RETRY_SEC 14 #define CREATE_CTABLE_RETRY_SEC 14
int64_t tsCTableRid = -1;
static void * tsChildTableSdb; static void * tsChildTableSdb;
int64_t tsSTableRid = -1;
static void * tsSuperTableSdb; static void * tsSuperTableSdb;
static int32_t tsChildTableUpdateSize; static int32_t tsChildTableUpdateSize;
static int32_t tsSuperTableUpdateSize; static int32_t tsSuperTableUpdateSize;
...@@ -350,7 +352,7 @@ static int32_t mnodeInitChildTables() { ...@@ -350,7 +352,7 @@ static int32_t mnodeInitChildTables() {
SCTableObj tObj; SCTableObj tObj;
tsChildTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; tsChildTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type;
SSdbTableDesc tableDesc = { SSdbTableDesc desc = {
.id = SDB_TABLE_CTABLE, .id = SDB_TABLE_CTABLE,
.name = "ctables", .name = "ctables",
.hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE, .hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE,
...@@ -366,7 +368,8 @@ static int32_t mnodeInitChildTables() { ...@@ -366,7 +368,8 @@ static int32_t mnodeInitChildTables() {
.fpRestored = mnodeChildTableActionRestored .fpRestored = mnodeChildTableActionRestored
}; };
tsChildTableSdb = sdbOpenTable(&tableDesc); tsCTableRid = sdbOpenTable(&desc);
tsChildTableSdb = sdbGetTableByRid(tsCTableRid);
if (tsChildTableSdb == NULL) { if (tsChildTableSdb == NULL) {
mError("failed to init child table data"); mError("failed to init child table data");
return -1; return -1;
...@@ -377,7 +380,7 @@ static int32_t mnodeInitChildTables() { ...@@ -377,7 +380,7 @@ static int32_t mnodeInitChildTables() {
} }
static void mnodeCleanupChildTables() { static void mnodeCleanupChildTables() {
sdbCloseTable(tsChildTableSdb); sdbCloseTable(tsCTableRid);
tsChildTableSdb = NULL; tsChildTableSdb = NULL;
} }
...@@ -543,7 +546,7 @@ static int32_t mnodeInitSuperTables() { ...@@ -543,7 +546,7 @@ static int32_t mnodeInitSuperTables() {
SSTableObj tObj; SSTableObj tObj;
tsSuperTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; tsSuperTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type;
SSdbTableDesc tableDesc = { SSdbTableDesc desc = {
.id = SDB_TABLE_STABLE, .id = SDB_TABLE_STABLE,
.name = "stables", .name = "stables",
.hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE, .hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE,
...@@ -559,7 +562,8 @@ static int32_t mnodeInitSuperTables() { ...@@ -559,7 +562,8 @@ static int32_t mnodeInitSuperTables() {
.fpRestored = mnodeSuperTableActionRestored .fpRestored = mnodeSuperTableActionRestored
}; };
tsSuperTableSdb = sdbOpenTable(&tableDesc); tsSTableRid = sdbOpenTable(&desc);
tsSuperTableSdb = sdbGetTableByRid(tsSTableRid);
if (tsSuperTableSdb == NULL) { if (tsSuperTableSdb == NULL) {
mError("failed to init stables data"); mError("failed to init stables data");
return -1; return -1;
...@@ -570,7 +574,7 @@ static int32_t mnodeInitSuperTables() { ...@@ -570,7 +574,7 @@ static int32_t mnodeInitSuperTables() {
} }
static void mnodeCleanupSuperTables() { static void mnodeCleanupSuperTables() {
sdbCloseTable(tsSuperTableSdb); sdbCloseTable(tsSTableRid);
tsSuperTableSdb = NULL; tsSuperTableSdb = NULL;
} }
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
#include "mnodeWrite.h" #include "mnodeWrite.h"
#include "mnodePeer.h" #include "mnodePeer.h"
int64_t tsUserRid = -1;
static void * tsUserSdb = NULL; static void * tsUserSdb = NULL;
static int32_t tsUserUpdateSize = 0; static int32_t tsUserUpdateSize = 0;
static int32_t mnodeGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
...@@ -165,7 +166,8 @@ int32_t mnodeInitUsers() { ...@@ -165,7 +166,8 @@ int32_t mnodeInitUsers() {
.fpRestored = mnodeUserActionRestored .fpRestored = mnodeUserActionRestored
}; };
tsUserSdb = sdbOpenTable(&desc); tsUserRid = sdbOpenTable(&desc);
tsUserSdb = sdbGetTableByRid(tsUserRid);
if (tsUserSdb == NULL) { if (tsUserSdb == NULL) {
mError("table:%s, failed to create hash", desc.name); mError("table:%s, failed to create hash", desc.name);
return -1; return -1;
...@@ -185,7 +187,7 @@ int32_t mnodeInitUsers() { ...@@ -185,7 +187,7 @@ int32_t mnodeInitUsers() {
} }
void mnodeCleanupUsers() { void mnodeCleanupUsers() {
sdbCloseTable(tsUserSdb); sdbCloseTable(tsUserRid);
tsUserSdb = NULL; tsUserSdb = NULL;
} }
......
...@@ -51,6 +51,7 @@ char* vgroupStatus[] = { ...@@ -51,6 +51,7 @@ char* vgroupStatus[] = {
"updating" "updating"
}; };
int64_t tsVgroupRid = -1;
static void *tsVgroupSdb = NULL; static void *tsVgroupSdb = NULL;
static int32_t tsVgUpdateSize = 0; static int32_t tsVgUpdateSize = 0;
...@@ -222,7 +223,8 @@ int32_t mnodeInitVgroups() { ...@@ -222,7 +223,8 @@ int32_t mnodeInitVgroups() {
.fpRestored = mnodeVgroupActionRestored, .fpRestored = mnodeVgroupActionRestored,
}; };
tsVgroupSdb = sdbOpenTable(&desc); tsVgroupRid = sdbOpenTable(&desc);
tsVgroupSdb = sdbGetTableByRid(tsVgroupRid);
if (tsVgroupSdb == NULL) { if (tsVgroupSdb == NULL) {
mError("failed to init vgroups data"); mError("failed to init vgroups data");
return -1; return -1;
...@@ -610,7 +612,7 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) { ...@@ -610,7 +612,7 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) {
} }
void mnodeCleanupVgroups() { void mnodeCleanupVgroups() {
sdbCloseTable(tsVgroupSdb); sdbCloseTable(tsVgroupRid);
tsVgroupSdb = NULL; tsVgroupSdb = NULL;
} }
......
...@@ -54,18 +54,8 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { ...@@ -54,18 +54,8 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("msg:%p, app:%p type:%s in write queue, will be redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle, mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("msg:%p, app:%p mnode index:%d ep:%s:%d, set inUse to %d", pMsg, pMsg->rpcMsg.ahandle, i, epSet->fqdn[i],
htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("msg:%p, app:%p mnode index:%d ep:%s:%d", pMsg, pMsg->rpcMsg.ahandle, i, epSet->fqdn[i],
htons(epSet->port[i]));
}
}
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册