未验证 提交 1ba2f77c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9474 from taosdata/feature/dnode3

fix invalid write in sdb
...@@ -642,8 +642,6 @@ typedef struct { ...@@ -642,8 +642,6 @@ typedef struct {
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
int8_t dropped;
char reserved[7];
} SDnodeCfg; } SDnodeCfg;
typedef struct { typedef struct {
......
...@@ -25,7 +25,7 @@ extern "C" { ...@@ -25,7 +25,7 @@ extern "C" {
#define SDB_GET_INT64(pData, pRow, dataPos, val) \ #define SDB_GET_INT64(pData, pRow, dataPos, val) \
{ \ { \
if (sdbGetRawInt64(pRaw, dataPos, val) != 0) { \ if (sdbGetRawInt64(pRaw, dataPos, val) != 0) { \
sdbFreeRow(pRow); \ tfree(pRow); \
return NULL; \ return NULL; \
} \ } \
dataPos += sizeof(int64_t); \ dataPos += sizeof(int64_t); \
...@@ -34,7 +34,7 @@ extern "C" { ...@@ -34,7 +34,7 @@ extern "C" {
#define SDB_GET_INT32(pData, pRow, dataPos, val) \ #define SDB_GET_INT32(pData, pRow, dataPos, val) \
{ \ { \
if (sdbGetRawInt32(pRaw, dataPos, val) != 0) { \ if (sdbGetRawInt32(pRaw, dataPos, val) != 0) { \
sdbFreeRow(pRow); \ tfree(pRow); \
return NULL; \ return NULL; \
} \ } \
dataPos += sizeof(int32_t); \ dataPos += sizeof(int32_t); \
...@@ -43,7 +43,7 @@ extern "C" { ...@@ -43,7 +43,7 @@ extern "C" {
#define SDB_GET_INT16(pData, pRow, dataPos, val) \ #define SDB_GET_INT16(pData, pRow, dataPos, val) \
{ \ { \
if (sdbGetRawInt16(pRaw, dataPos, val) != 0) { \ if (sdbGetRawInt16(pRaw, dataPos, val) != 0) { \
sdbFreeRow(pRow); \ tfree(pRow); \
return NULL; \ return NULL; \
} \ } \
dataPos += sizeof(int16_t); \ dataPos += sizeof(int16_t); \
...@@ -52,7 +52,7 @@ extern "C" { ...@@ -52,7 +52,7 @@ extern "C" {
#define SDB_GET_INT8(pData, pRow, dataPos, val) \ #define SDB_GET_INT8(pData, pRow, dataPos, val) \
{ \ { \
if (sdbGetRawInt8(pRaw, dataPos, val) != 0) { \ if (sdbGetRawInt8(pRaw, dataPos, val) != 0) { \
sdbFreeRow(pRow); \ tfree(pRow); \
return NULL; \ return NULL; \
} \ } \
dataPos += sizeof(int8_t); \ dataPos += sizeof(int8_t); \
...@@ -61,7 +61,7 @@ extern "C" { ...@@ -61,7 +61,7 @@ extern "C" {
#define SDB_GET_BINARY(pRaw, pRow, dataPos, val, valLen) \ #define SDB_GET_BINARY(pRaw, pRow, dataPos, val, valLen) \
{ \ { \
if (sdbGetRawBinary(pRaw, dataPos, val, valLen) != 0) { \ if (sdbGetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
sdbFreeRow(pRow); \ tfree(pRow); \
return NULL; \ return NULL; \
} \ } \
dataPos += valLen; \ dataPos += valLen; \
...@@ -71,7 +71,7 @@ extern "C" { ...@@ -71,7 +71,7 @@ extern "C" {
{ \ { \
char val[valLen] = {0}; \ char val[valLen] = {0}; \
if (sdbGetRawBinary(pRaw, dataPos, val, valLen) != 0) { \ if (sdbGetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
sdbFreeRow(pRow); \ tfree(pRow); \
return NULL; \ return NULL; \
} \ } \
dataPos += valLen; \ dataPos += valLen; \
...@@ -325,7 +325,7 @@ int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver); ...@@ -325,7 +325,7 @@ int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver);
int32_t sdbGetRawTotalSize(SSdbRaw *pRaw); int32_t sdbGetRawTotalSize(SSdbRaw *pRaw);
SSdbRow *sdbAllocRow(int32_t objSize); SSdbRow *sdbAllocRow(int32_t objSize);
void sdbFreeRow(SSdbRow *pRow); void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow);
void *sdbGetRowObj(SSdbRow *pRow); void *sdbGetRowObj(SSdbRow *pRow);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -162,16 +162,6 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); ...@@ -162,16 +162,6 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
*/ */
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/**
* @brief Process a consume message.
*
* @param pVnode The vnode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure
*/
int vnodeProcessConsumeReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/* ------------------------ SVnodeCfg ------------------------ */ /* ------------------------ SVnodeCfg ------------------------ */
/** /**
* @brief Initialize VNODE options. * @brief Initialize VNODE options.
......
...@@ -188,14 +188,21 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -188,14 +188,21 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
} }
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg->pData; SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
tsem_post(&pRequest->body.rspSem);
return code;
}
SUseDbRsp* pUseDbRsp = (SUseDbRsp*)pMsg->pData;
SName name = {0}; SName name = {0};
tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT|T_NAME_DB); tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT | T_NAME_DB);
char db[TSDB_DB_NAME_LEN] = {0}; char db[TSDB_DB_NAME_LEN] = {0};
tNameGetDbName(&name, db); tNameGetDbName(&name, db);
SRequestObj* pRequest = param;
setConnectionDB(pRequest->pTscObj, db); setConnectionDB(pRequest->pTscObj, db);
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);
......
...@@ -139,7 +139,7 @@ void dmnWaitSignal() { ...@@ -139,7 +139,7 @@ void dmnWaitSignal() {
void dmnInitOption(SDnodeOpt *pOption) { void dmnInitOption(SDnodeOpt *pOption) {
pOption->sver = 30000000; //3.0.0.0 pOption->sver = 30000000; //3.0.0.0
pOption->numOfCores = tsNumOfCores; pOption->numOfCores = tsNumOfCores;
pOption->numOfSupportVnodes = 1; pOption->numOfSupportVnodes = 16;
pOption->numOfCommitThreads = 1; pOption->numOfCommitThreads = 1;
pOption->statusInterval = tsStatusInterval; pOption->statusInterval = tsStatusInterval;
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore; pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore;
......
...@@ -393,13 +393,11 @@ void dndSendStatusMsg(SDnode *pDnode) { ...@@ -393,13 +393,11 @@ void dndSendStatusMsg(SDnode *pDnode) {
static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) { if (pMgmt->dnodeId == 0) {
dInfo("set dnodeId:%d clusterId:% " PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); dInfo("set dnodeId:%d clusterId:% " PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
pMgmt->dnodeId = pCfg->dnodeId; pMgmt->dnodeId = pCfg->dnodeId;
pMgmt->clusterId = pCfg->clusterId; pMgmt->clusterId = pCfg->clusterId;
pMgmt->dropped = pCfg->dropped;
dndWriteDnodes(pDnode); dndWriteDnodes(pDnode);
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
} }
...@@ -430,6 +428,11 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -430,6 +428,11 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) { if (pMsg->code != TSDB_CODE_SUCCESS) {
pMgmt->statusSent = 0; pMgmt->statusSent = 0;
if (pMsg->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId);
pMgmt->dropped = 1;
dndWriteDnodes(pDnode);
}
return; return;
} }
...@@ -439,11 +442,6 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -439,11 +442,6 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
pCfg->clusterId = htobe64(pCfg->clusterId); pCfg->clusterId = htobe64(pCfg->clusterId);
dndUpdateDnodeCfg(pDnode, pCfg); dndUpdateDnodeCfg(pDnode, pCfg);
if (pCfg->dropped) {
pMgmt->statusSent = 0;
return;
}
SDnodeEps *pDnodeEps = &pRsp->dnodeEps; SDnodeEps *pDnodeEps = &pRsp->dnodeEps;
pDnodeEps->num = htonl(pDnodeEps->num); pDnodeEps->num = htonl(pDnodeEps->num);
for (int32_t i = 0; i < pDnodeEps->num; ++i) { for (int32_t i = 0; i < pDnodeEps->num; ++i) {
...@@ -487,7 +485,7 @@ static void *dnodeThreadRoutine(void *param) { ...@@ -487,7 +485,7 @@ static void *dnodeThreadRoutine(void *param) {
pthread_testcancel(); pthread_testcancel();
taosMsleep(ms); taosMsleep(ms);
if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent) { if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent && !pMgmt->dropped) {
dndSendStatusMsg(pDnode); dndSendStatusMsg(pDnode);
} }
} }
...@@ -522,6 +520,11 @@ int32_t dndInitDnode(SDnode *pDnode) { ...@@ -522,6 +520,11 @@ int32_t dndInitDnode(SDnode *pDnode) {
return -1; return -1;
} }
if (pMgmt->dropped) {
dError("dnode will not start for its already dropped");
return -1;
}
if (dndInitMgmtWorker(pDnode) != 0) { if (dndInitMgmtWorker(pDnode) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
......
...@@ -124,12 +124,8 @@ typedef struct { ...@@ -124,12 +124,8 @@ typedef struct {
int64_t rebootTime; int64_t rebootTime;
int64_t lastAccessTime; int64_t lastAccessTime;
int32_t accessTimes; int32_t accessTimes;
int16_t numOfMnodes;
int16_t numOfVnodes; int16_t numOfVnodes;
int16_t numOfQnodes;
int16_t numOfSupportMnodes;
int16_t numOfSupportVnodes; int16_t numOfSupportVnodes;
int16_t numOfSupportQnodes;
int16_t numOfCores; int16_t numOfCores;
EDndStatus status; EDndStatus status;
EDndReason offlineReason; EDndReason offlineReason;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "mndDef.h" #include "mndDef.h"
#include "sdb.h" #include "sdb.h"
#include "tcache.h" #include "tcache.h"
#include "tep.h"
#include "tqueue.h" #include "tqueue.h"
#include "ttime.h" #include "ttime.h"
......
...@@ -29,6 +29,7 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup); ...@@ -29,6 +29,7 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup);
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup); SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup); SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
......
...@@ -18,8 +18,7 @@ ...@@ -18,8 +18,7 @@
#include "mndMnode.h" #include "mndMnode.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "tep.h" #include "mndVgroup.h"
#include "ttime.h"
#define TSDB_DNODE_VER_NUMBER 1 #define TSDB_DNODE_VER_NUMBER 1
#define TSDB_DNODE_RESERVE_SIZE 64 #define TSDB_DNODE_RESERVE_SIZE 64
...@@ -370,7 +369,6 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { ...@@ -370,7 +369,6 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
} }
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
pRsp->dnodeCfg.dropped = 0;
pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId); pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId);
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
...@@ -700,7 +698,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i ...@@ -700,7 +698,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDnode->numOfVnodes; *(int16_t *)pWrite = mndGetVnodesNum(pMnode, pDnode->id);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
...@@ -118,17 +118,17 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, ...@@ -118,17 +118,17 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid,
SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000); SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
if (pConn == NULL) { if (pConn == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("conn:%d, data:%p failed to put into cache since %s, user:%s", connId, pConn, pInfo->user, terrstr()); mError("conn:%d, failed to put into cache since %s, user:%s", connId, pInfo->user, terrstr());
return NULL; return NULL;
} else { } else {
mTrace("conn:%d, data:%p created, user:%s", pConn->id, pConn, pInfo->user); mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, pInfo->user);
return pConn; return pConn;
} }
} }
static void mndFreeConn(SConnObj *pConn) { static void mndFreeConn(SConnObj *pConn) {
tfree(pConn->pQueries); tfree(pConn->pQueries);
mTrace("conn:%d, data:%p destroyed", pConn->id, pConn); mTrace("conn:%d, is destroyed, data:%p", pConn->id, pConn);
} }
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) { static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) {
...@@ -143,13 +143,13 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) { ...@@ -143,13 +143,13 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) {
int32_t keepTime = pMnode->cfg.shellActivityTimer * 3; int32_t keepTime = pMnode->cfg.shellActivityTimer * 3;
pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs(); pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
mTrace("conn:%d, data:%p acquired from cache", pConn->id, pConn); mTrace("conn:%d, acquired from cache, data:%p", pConn->id, pConn);
return pConn; return pConn;
} }
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) { static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
if (pConn == NULL) return; if (pConn == NULL) return;
mTrace("conn:%d, data:%p released from cache", pConn->id, pConn); mTrace("conn:%d, released from cache, data:%p", pConn->id, pConn);
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false); taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
......
...@@ -56,31 +56,24 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg) { ...@@ -56,31 +56,24 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg) {
if (showId == 0) atomic_add_fetch_32(&pMgmt->showId, 1); if (showId == 0) atomic_add_fetch_32(&pMgmt->showId, 1);
int32_t size = sizeof(SShowObj) + pMsg->payloadLen; int32_t size = sizeof(SShowObj) + pMsg->payloadLen;
SShowObj *pShow = calloc(1, size); SShowObj showObj = {0};
if (pShow != NULL) { showObj.id = showId;
pShow->id = showId; showObj.pMnode = pMnode;
pShow->pMnode = pMnode; showObj.type = pMsg->type;
pShow->type = pMsg->type; showObj.payloadLen = pMsg->payloadLen;
pShow->payloadLen = pMsg->payloadLen; memcpy(showObj.db, pMsg->db, TSDB_DB_FNAME_LEN);
memcpy(pShow->db, pMsg->db, TSDB_DB_FNAME_LEN); memcpy(showObj.payload, pMsg->payload, pMsg->payloadLen);
memcpy(pShow->payload, pMsg->payload, pMsg->payloadLen);
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to process show-meta msg:%s since %s", mndShowStr(pMsg->type), terrstr());
return NULL;
}
int32_t keepTime = pMnode->cfg.shellActivityTimer * 6 * 1000; int32_t keepTime = pMnode->cfg.shellActivityTimer * 6 * 1000;
SShowObj *pShowRet = taosCachePut(pMgmt->cache, &showId, sizeof(int32_t), pShow, size, keepTime); SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int32_t), &showObj, size, keepTime);
free(pShow); if (pShow == NULL) {
if (pShowRet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:%d, failed to put into cache since %s", showId, terrstr()); mError("show:%d, failed to put into cache since %s", showId, terrstr());
return NULL; return NULL;
} else {
mTrace("show:%d, data:%p created", showId, pShowRet);
return pShowRet;
} }
mTrace("show:%d, is created, data:%p", showId, pShow);
return pShow;
} }
static void mndFreeShowObj(SShowObj *pShow) { static void mndFreeShowObj(SShowObj *pShow) {
...@@ -94,7 +87,7 @@ static void mndFreeShowObj(SShowObj *pShow) { ...@@ -94,7 +87,7 @@ static void mndFreeShowObj(SShowObj *pShow) {
} }
} }
mTrace("show:%d, data:%p destroyed", pShow->id, pShow); mTrace("show:%d, is destroyed, data:%p", pShow->id, pShow);
} }
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId) { static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId) {
...@@ -106,13 +99,13 @@ static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId) { ...@@ -106,13 +99,13 @@ static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId) {
return NULL; return NULL;
} }
mTrace("show:%d, data:%p acquired from cache", pShow->id, pShow); mTrace("show:%d, acquired from cache, data:%p", pShow->id, pShow);
return pShow; return pShow;
} }
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) { static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
if (pShow == NULL) return; if (pShow == NULL) return;
mTrace("show:%d, data:%p released from cache, force:%d", pShow->id, pShow, forceRemove); mTrace("show:%d, released from cache, data:%p force:%d", pShow->id, pShow, forceRemove);
// A bug in tcache.c // A bug in tcache.c
forceRemove = 0; forceRemove = 0;
...@@ -158,8 +151,8 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) { ...@@ -158,8 +151,8 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
} }
int32_t code = (*metaFp)(pMnodeMsg, pShow, &pRsp->tableMeta); int32_t code = (*metaFp)(pMnodeMsg, pShow, &pRsp->tableMeta);
mDebug("show:%d, data:%p get meta finished, numOfRows:%d cols:%d type:%s result:%s", pShow->id, pShow, mDebug("show:%d, get meta finished, numOfRows:%d cols:%d type:%s result:%s", pShow->id, pShow->numOfRows,
pShow->numOfRows, pShow->numOfColumns, mndShowStr(type), tstrerror(code)); pShow->numOfColumns, mndShowStr(type), tstrerror(code));
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pMnodeMsg->contLen = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns; pMnodeMsg->contLen = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
...@@ -195,16 +188,15 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { ...@@ -195,16 +188,15 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
if (retrieveFp == NULL) { if (retrieveFp == NULL) {
mndReleaseShowObj(pShow, false); mndReleaseShowObj(pShow, false);
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
mError("show:%d, data:%p failed to retrieve data since %s", pShow->id, pShow, terrstr()); mError("show:%d, failed to retrieve data since %s", pShow->id, terrstr());
return -1; return -1;
} }
mDebug("show:%d, data:%p start retrieve data, numOfReads:%d numOfRows:%d type:%s", pShow->id, pShow, mDebug("show:%d, start retrieve data, numOfReads:%d numOfRows:%d type:%s", pShow->id, pShow->numOfReads,
pShow->numOfReads, pShow->numOfRows, mndShowStr(pShow->type)); pShow->numOfRows, mndShowStr(pShow->type));
if (mndCheckRetrieveFinished(pShow)) { if (mndCheckRetrieveFinished(pShow)) {
mDebug("show:%d, data:%p read finished, numOfReads:%d numOfRows:%d", pShow->id, pShow, pShow->numOfReads, mDebug("show:%d, read finished, numOfReads:%d numOfRows:%d", pShow->id, pShow->numOfReads, pShow->numOfRows);
pShow->numOfRows);
pShow->numOfReads = pShow->numOfRows; pShow->numOfReads = pShow->numOfRows;
} }
...@@ -227,7 +219,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { ...@@ -227,7 +219,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
if (pRsp == NULL) { if (pRsp == NULL) {
mndReleaseShowObj(pShow, false); mndReleaseShowObj(pShow, false);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:%d, data:%p failed to retrieve data since %s", pShow->id, pShow, terrstr()); mError("show:%d, failed to retrieve data since %s", pShow->id, terrstr());
return -1; return -1;
} }
...@@ -236,7 +228,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { ...@@ -236,7 +228,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
rowsRead = (*retrieveFp)(pMnodeMsg, pShow, pRsp->data, rowsToRead); rowsRead = (*retrieveFp)(pMnodeMsg, pShow, pRsp->data, rowsToRead);
} }
mDebug("show:%d, data:%p stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, pShow, rowsRead, rowsToRead); mDebug("show:%d, stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead);
pRsp->numOfRows = htonl(rowsRead); pRsp->numOfRows = htonl(rowsRead);
pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
...@@ -246,10 +238,10 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { ...@@ -246,10 +238,10 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
if (rowsRead == 0 || rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) { if (rowsRead == 0 || rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) {
pRsp->completed = 1; pRsp->completed = 1;
mDebug("show:%d, data:%p retrieve completed", pShow->id, pShow); mDebug("show:%d, retrieve completed", pShow->id);
mndReleaseShowObj(pShow, true); mndReleaseShowObj(pShow, true);
} else { } else {
mDebug("show:%d, data:%p retrieve not completed yet", pShow->id, pShow); mDebug("show:%d, retrieve not completed yet", pShow->id);
mndReleaseShowObj(pShow, false); mndReleaseShowObj(pShow, false);
} }
......
...@@ -294,18 +294,18 @@ TRANS_DECODE_OVER: ...@@ -294,18 +294,18 @@ TRANS_DECODE_OVER:
return NULL; return NULL;
} }
mTrace("trans:%d, decode from raw:%p", pTrans->id, pRaw); mTrace("trans:%d, decode from raw:%p, data:%p", pTrans->id, pRaw, pTrans);
return pRow; return pRow;
} }
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
pTrans->stage = TRN_STAGE_PREPARE; pTrans->stage = TRN_STAGE_PREPARE;
mTrace("trans:%d, perform insert action", pTrans->id); mTrace("trans:%d, perform insert action, data:%p", pTrans->id, pTrans);
return 0; return 0;
} }
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
mTrace("trans:%d, perform delete action", pTrans->id); mTrace("trans:%d, perform delete action, data:%p", pTrans->id, pTrans);
mndTransDropLogs(pTrans->redoLogs); mndTransDropLogs(pTrans->redoLogs);
mndTransDropLogs(pTrans->undoLogs); mndTransDropLogs(pTrans->undoLogs);
...@@ -317,7 +317,7 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { ...@@ -317,7 +317,7 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
} }
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) { static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) {
mTrace("trans:%d, perform update action", pOldTrans->id); mTrace("trans:%d, perform update action, data:%p", pOldTrans->id, pOldTrans);
pOldTrans->stage = pNewTrans->stage; pOldTrans->stage = pNewTrans->stage;
return 0; return 0;
} }
...@@ -362,14 +362,14 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) { ...@@ -362,14 +362,14 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) {
return NULL; return NULL;
} }
mDebug("trans:%d, is created", pTrans->id); mDebug("trans:%d, is created, data:%p", pTrans->id, pTrans);
return pTrans; return pTrans;
} }
static void mndTransDropLogs(SArray *pArray) { static void mndTransDropLogs(SArray *pArray) {
for (int32_t i = 0; i < pArray->size; ++i) { for (int32_t i = 0; i < pArray->size; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i); SSdbRaw *pRaw = taosArrayGetP(pArray, i);
tfree(pRaw); sdbFreeRaw(pRaw);
} }
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
...@@ -391,7 +391,7 @@ void mndTransDrop(STrans *pTrans) { ...@@ -391,7 +391,7 @@ void mndTransDrop(STrans *pTrans) {
mndTransDropActions(pTrans->redoActions); mndTransDropActions(pTrans->redoActions);
mndTransDropActions(pTrans->undoActions); mndTransDropActions(pTrans->undoActions);
// mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans); mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans);
tfree(pTrans); tfree(pTrans);
} }
...@@ -442,7 +442,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { ...@@ -442,7 +442,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
} }
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("trans:%d, sync to other nodes", pTrans->id); mDebug("trans:%d, sync to other nodes", pTrans->id);
int32_t code = mndSyncPropose(pMnode, pRaw); int32_t code = mndSyncPropose(pMnode, pRaw);
if (code != 0) { if (code != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
...@@ -450,7 +450,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { ...@@ -450,7 +450,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
return -1; return -1;
} }
mTrace("trans:%d, sync finished", pTrans->id); mDebug("trans:%d, sync finished", pTrans->id);
code = sdbWrite(pMnode->pSdb, pRaw); code = sdbWrite(pMnode->pSdb, pRaw);
if (code != 0) { if (code != 0) {
......
...@@ -86,7 +86,6 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { ...@@ -86,7 +86,6 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
for (int8_t i = 0; i < pVgroup->replica; ++i) { for (int8_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId) SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId)
SDB_SET_INT8(pRaw, dataPos, pVgid->role)
} }
SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE) SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE)
SDB_SET_DATALEN(pRaw, dataPos); SDB_SET_DATALEN(pRaw, dataPos);
...@@ -121,7 +120,6 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { ...@@ -121,7 +120,6 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
for (int8_t i = 0; i < pVgroup->replica; ++i) { for (int8_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgid->dnodeId) SDB_GET_INT32(pRaw, pRow, dataPos, &pVgid->dnodeId)
SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pVgid->role)
} }
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_VGROUP_RESERVE_SIZE) SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_VGROUP_RESERVE_SIZE)
...@@ -237,44 +235,95 @@ SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *p ...@@ -237,44 +235,95 @@ SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *p
return pDrop; return pDrop;
} }
static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup) { static SArray *mndBuildDnodesArray(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t allocedVnodes = 0; int32_t numOfDnodes = mndGetDnodeSize(pMnode);
void *pIter = NULL; SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
if (pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
while (allocedVnodes < pVgroup->replica) { void *pIter = NULL;
while (1) {
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break; if (pIter == NULL) break;
// todo int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
if (mndIsDnodeInReadyStatus(pMnode, pDnode)) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[allocedVnodes]; bool isMnode = mndIsMnode(pMnode, pDnode->id);
pVgid->dnodeId = pDnode->id; if (isMnode) {
if (pVgroup->replica == 1) { pDnode->numOfVnodes++;
pVgid->role = TAOS_SYNC_STATE_LEADER;
} else {
pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
} }
allocedVnodes++;
bool isReady = mndIsDnodeInReadyStatus(pMnode, pDnode);
if (isReady) {
taosArrayPush(pArray, pDnode);
} }
mDebug("dnode:%d, numOfVnodes:%d numOfSupportVnodes:%d isMnode:%d ready:%d", pDnode->id, numOfVnodes,
pDnode->numOfSupportVnodes, isMnode, isReady);
sdbRelease(pSdb, pDnode); sdbRelease(pSdb, pDnode);
} }
if (allocedVnodes != pVgroup->replica) { return pArray;
}
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
float d1Score = (float)pDnode1->numOfVnodes / pDnode1->numOfSupportVnodes;
float d2Score = (float)pDnode2->numOfVnodes / pDnode2->numOfSupportVnodes;
return d1Score > d2Score ? 0 : 1;
}
static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
SSdb *pSdb = pMnode->pSdb;
int32_t allocedVnodes = 0;
void *pIter = NULL;
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pDnode = taosArrayGet(pArray, v);
if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1; return -1;
} }
pVgid->dnodeId = pDnode->id;
if (pVgroup->replica == 1) {
pVgid->role = TAOS_SYNC_STATE_LEADER;
} else {
pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
}
mDebug("db:%s, vgId:%d, vindex:%d dnodeId:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId);
pDnode->numOfVnodes++;
}
return 0; return 0;
} }
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
SVgObj *pVgroups = calloc(pDb->cfg.numOfVgroups, sizeof(SVgObj)); int32_t code = -1;
SArray *pArray = NULL;
SVgObj *pVgroups = NULL;
pVgroups = calloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
if (pVgroups == NULL) { if (pVgroups == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; goto ALLOC_VGROUP_OVER;
} }
pArray = mndBuildDnodesArray(pMnode);
if (pArray == NULL) {
goto ALLOC_VGROUP_OVER;
}
mDebug("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
int32_t allocedVgroups = 0; int32_t allocedVgroups = 0;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
uint32_t hashMin = 0; uint32_t hashMin = 0;
...@@ -298,17 +347,23 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { ...@@ -298,17 +347,23 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
pVgroup->dbUid = pDb->uid; pVgroup->dbUid = pDb->uid;
pVgroup->replica = pDb->cfg.replications; pVgroup->replica = pDb->cfg.replications;
if (mndGetAvailableDnode(pMnode, pVgroup) != 0) { if (mndGetAvailableDnode(pMnode, pVgroup, pArray) != 0) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
free(pVgroups); goto ALLOC_VGROUP_OVER;
return -1;
} }
allocedVgroups++; allocedVgroups++;
} }
*ppVgroups = pVgroups; *ppVgroups = pVgroups;
return 0; code = 0;
mDebug("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
ALLOC_VGROUP_OVER:
if (code != 0) free(pVgroups);
taosArrayDestroy(pArray);
return code;
} }
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) { SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) {
...@@ -348,6 +403,7 @@ static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { ...@@ -348,6 +403,7 @@ static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) {
} }
static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) { static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) {
...@@ -478,7 +534,7 @@ static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) { ...@@ -478,7 +534,7 @@ static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
static int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
void *pIter = NULL; void *pIter = NULL;
......
...@@ -146,15 +146,15 @@ static int32_t mndInitSteps(SMnode *pMnode) { ...@@ -146,15 +146,15 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1; if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1; if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1; if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1; if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1; if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1; if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
if (pMnode->clusterId <= 0) { if (pMnode->clusterId <= 0) {
if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
......
...@@ -72,6 +72,7 @@ typedef struct SSdb { ...@@ -72,6 +72,7 @@ typedef struct SSdb {
} SSdb; } SSdb;
int32_t sdbWriteFile(SSdb *pSdb); int32_t sdbWriteFile(SSdb *pSdb);
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -80,16 +80,12 @@ void sdbCleanup(SSdb *pSdb) { ...@@ -80,16 +80,12 @@ void sdbCleanup(SSdb *pSdb) {
SHashObj *hash = pSdb->hashObjs[i]; SHashObj *hash = pSdb->hashObjs[i];
if (hash == NULL) continue; if (hash == NULL) continue;
SdbDeleteFp deleteFp = pSdb->deleteFps[i];
SSdbRow **ppRow = taosHashIterate(hash, NULL); SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) { while (ppRow != NULL) {
SSdbRow *pRow = *ppRow; SSdbRow *pRow = *ppRow;
if (pRow == NULL) continue; if (pRow == NULL) continue;
if (deleteFp != NULL) { sdbFreeRow(pSdb, pRow);
(*deleteFp)(pSdb, pRow->pObj);
}
sdbFreeRow(pRow);
ppRow = taosHashIterate(hash, ppRow); ppRow = taosHashIterate(hash, ppRow);
} }
} }
......
...@@ -151,7 +151,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { ...@@ -151,7 +151,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
if (taosWriteFile(fd, pRaw, writeLen) != writeLen) { if (taosWriteFile(fd, pRaw, writeLen) != writeLen) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
taosHashCancelIterate(hash, ppRow); taosHashCancelIterate(hash, ppRow);
free(pRaw); sdbFreeRaw(pRaw);
break; break;
} }
...@@ -159,7 +159,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { ...@@ -159,7 +159,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) { if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
taosHashCancelIterate(hash, ppRow); taosHashCancelIterate(hash, ppRow);
free(pRaw); sdbFreeRaw(pRaw);
break; break;
} }
} else { } else {
...@@ -168,7 +168,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { ...@@ -168,7 +168,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
break; break;
} }
free(pRaw); sdbFreeRaw(pRaw);
ppRow = taosHashIterate(hash, ppRow); ppRow = taosHashIterate(hash, ppRow);
} }
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
......
...@@ -16,6 +16,50 @@ ...@@ -16,6 +16,50 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "sdbInt.h" #include "sdbInt.h"
static const char *sdbTableName(ESdbType type) {
switch (type) {
case SDB_TRANS:
return "trans";
case SDB_CLUSTER:
return "cluster";
case SDB_MNODE:
return "mnode";
case SDB_DNODE:
return "dnode";
case SDB_USER:
return "user";
case SDB_AUTH:
return "auth";
case SDB_ACCT:
return "acct";
case SDB_TOPIC:
return "topic";
case SDB_VGROUP:
return "vgId";
case SDB_STB:
return "stb";
case SDB_DB:
return "db";
case SDB_FUNC:
return "func";
default:
return "undefine";
}
}
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
EKeyType keyType = pSdb->keyTypes[pRow->type];
if (keyType == SDB_KEY_BINARY) {
mTrace("%s:%s, refCount:%d oper:%s", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount, oper);
} else if (keyType == SDB_KEY_INT32) {
mTrace("%s:%d, refCount:%d oper:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj, pRow->refCount, oper);
} else if (keyType == SDB_KEY_INT64) {
mTrace("%s:%" PRId64 ", refCount:%d oper:%s", sdbTableName(pRow->type), *(int64_t *)pRow->pObj, pRow->refCount, oper);
} else {
}
}
static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) { static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
if (type >= SDB_MAX || type <= SDB_START) { if (type >= SDB_MAX || type <= SDB_START) {
terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE; terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
...@@ -55,17 +99,18 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -55,17 +99,18 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (pOldRow != NULL) { if (pOldRow != NULL) {
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
sdbFreeRow(pRow); sdbFreeRow(pSdb, pRow);
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
return terrno; return terrno;
} }
pRow->refCount = 1; pRow->refCount = 0;
pRow->status = pRaw->status; pRow->status = pRaw->status;
sdbPrintOper(pSdb, pRow, "insertRow");
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
sdbFreeRow(pRow); sdbFreeRow(pSdb, pRow);
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
return terrno; return terrno;
} }
...@@ -83,7 +128,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -83,7 +128,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosWLockLatch(pLock); taosWLockLatch(pLock);
taosHashRemove(hash, pRow->pObj, keySize); taosHashRemove(hash, pRow->pObj, keySize);
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
sdbFreeRow(pRow); sdbFreeRow(pSdb, pRow);
terrno = code; terrno = code;
return terrno; return terrno;
} }
...@@ -113,7 +158,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -113,7 +158,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj); code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
} }
sdbFreeRow(pNewRow); sdbFreeRow(pSdb, pNewRow);
return code; return code;
} }
...@@ -123,14 +168,10 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -123,14 +168,10 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SRWLatch *pLock = &pSdb->locks[pRow->type]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock); taosWLockLatch(pLock);
// remove attached object such as trans
SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type];
if (deleteFp != NULL) (*deleteFp)(pSdb, pRow->pObj);
SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) { if (ppOldRow == NULL || *ppOldRow == NULL) {
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
sdbFreeRow(pRow); sdbFreeRow(pSdb, pRow);
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return terrno; return terrno;
} }
...@@ -140,8 +181,8 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -140,8 +181,8 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosHashRemove(hash, pOldRow->pObj, keySize); taosHashRemove(hash, pOldRow->pObj, keySize);
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
sdbRelease(pSdb, pOldRow->pObj); // sdbRelease(pSdb, pOldRow->pObj);
sdbFreeRow(pRow); sdbFreeRow(pSdb, pRow);
return code; return code;
} }
...@@ -206,6 +247,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) { ...@@ -206,6 +247,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) {
case SDB_STATUS_UPDATING: case SDB_STATUS_UPDATING:
atomic_add_fetch_32(&pRow->refCount, 1); atomic_add_fetch_32(&pRow->refCount, 1);
pRet = pRow->pObj; pRet = pRow->pObj;
sdbPrintOper(pSdb, pRow, "acquireRow");
break; break;
case SDB_STATUS_CREATING: case SDB_STATUS_CREATING:
terrno = TSDB_CODE_SDB_OBJ_CREATING; terrno = TSDB_CODE_SDB_OBJ_CREATING;
...@@ -232,13 +274,9 @@ void sdbRelease(SSdb *pSdb, void *pObj) { ...@@ -232,13 +274,9 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
taosRLockLatch(pLock); taosRLockLatch(pLock);
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "releaseRow");
if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) { if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type]; sdbFreeRow(pSdb, pRow);
if (deleteFp != NULL) {
(*deleteFp)(pSdb, pRow->pObj);
}
sdbFreeRow(pRow);
} }
taosRUnLockLatch(pLock); taosRUnLockLatch(pLock);
...@@ -255,9 +293,9 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { ...@@ -255,9 +293,9 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
if (pIter != NULL) { if (pIter != NULL) {
SSdbRow *pLastRow = *(SSdbRow **)pIter; SSdbRow *pLastRow = *(SSdbRow **)pIter;
int32_t ref = atomic_sub_fetch_32(&pLastRow->refCount, 1); int32_t ref = atomic_load_32(&pLastRow->refCount);
if (ref <= 0 && pLastRow->status == SDB_STATUS_DROPPED) { if (ref <= 0 && pLastRow->status == SDB_STATUS_DROPPED) {
sdbFreeRow(pLastRow); sdbFreeRow(pSdb, pLastRow);
} }
} }
...@@ -270,6 +308,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { ...@@ -270,6 +308,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
} }
atomic_add_fetch_32(&pRow->refCount, 1); atomic_add_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "fetchRow");
*ppObj = pRow->pObj; *ppObj = pRow->pObj;
break; break;
} }
......
...@@ -27,12 +27,12 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) { ...@@ -27,12 +27,12 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
pRaw->sver = sver; pRaw->sver = sver;
pRaw->dataLen = dataLen; pRaw->dataLen = dataLen;
// mTrace("raw:%p, is created, len:%d", pRaw, dataLen); mTrace("raw:%p, is created, len:%d", pRaw, dataLen);
return pRaw; return pRaw;
} }
void sdbFreeRaw(SSdbRaw *pRaw) { void sdbFreeRaw(SSdbRaw *pRaw) {
// mTrace("raw:%p, is freed", pRaw); mTrace("raw:%p, is freed", pRaw);
free(pRaw); free(pRaw);
} }
......
...@@ -35,4 +35,13 @@ void *sdbGetRowObj(SSdbRow *pRow) { ...@@ -35,4 +35,13 @@ void *sdbGetRowObj(SSdbRow *pRow) {
return pRow->pObj; return pRow->pObj;
} }
void sdbFreeRow(SSdbRow *pRow) { tfree(pRow); } void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow) {
// remove attached object such as trans
SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type];
if (deleteFp != NULL) {
(*deleteFp)(pSdb, pRow->pObj);
}
sdbPrintOper(pSdb, pRow, "freeRow");
tfree(pRow);
}
...@@ -28,8 +28,3 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -28,8 +28,3 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("sync message is processed"); vInfo("sync message is processed");
return 0; return 0;
} }
int vnodeProcessConsumeReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("consume message is processed");
return 0;
}
...@@ -59,9 +59,29 @@ if $data03 != 0 then ...@@ -59,9 +59,29 @@ if $data03 != 0 then
endi endi
print =============== show vgroups print =============== show vgroups
sql show databases
if $rows != 1 then
return -1
endi
sql_error use d1
sql use d4 sql use d4
sql show vgroups
if $rows == 0 then if $rows != 2 then
return -1
endi
print =============== show dnodes
sql show dnodes
if $data00 != 1 then
return -1
endi
if $data02 != 2 then
return -1 return -1
endi endi
......
...@@ -23,7 +23,7 @@ endi ...@@ -23,7 +23,7 @@ endi
print $data00 $data01 $data02 print $data00 $data01 $data02
sql create table st2 (ts timestamp, i float) tags (j bigint) sql create table st2 (ts timestamp, i float) tags (j int)
sql show stables sql show stables
if $rows != 2 then if $rows != 2 then
return -1 return -1
...@@ -39,15 +39,14 @@ if $rows != 1 then ...@@ -39,15 +39,14 @@ if $rows != 1 then
return -1 return -1
endi endi
print -->
print $data00 $data01 $data02 print $data00 $data01 $data02
print $data10 $data11 $data12 print $data10 $data11 $data12
return
print =============== create child table print =============== create child table
sql create table c1 using st tags(1) sql create table c1 using st tags(1)
sql create table c2 using st tags(2) sql create table c2 using st tags(2)
return
sql show tables sql show tables
if $rows != 2 then if $rows != 2 then
return -1 return -1
......
...@@ -17,7 +17,7 @@ OS_TYPE=`$UNAME_BIN` ...@@ -17,7 +17,7 @@ OS_TYPE=`$UNAME_BIN`
NODE_NAME= NODE_NAME=
EXEC_OPTON= EXEC_OPTON=
CLEAR_OPTION="false" CLEAR_OPTION="false"
while getopts "n:s:u:x:ct" arg while getopts "n:s:u:x:cv" arg
do do
case $arg in case $arg in
n) n)
...@@ -29,7 +29,7 @@ do ...@@ -29,7 +29,7 @@ do
c) c)
CLEAR_OPTION="clear" CLEAR_OPTION="clear"
;; ;;
t) v)
SHELL_OPTION="true" SHELL_OPTION="true"
;; ;;
u) u)
......
...@@ -2,20 +2,19 @@ system sh/stop_dnodes.sh ...@@ -2,20 +2,19 @@ system sh/stop_dnodes.sh
############## config parameter ##################### ############## config parameter #####################
$node1 = 192.168.101.174 $node1 = 192.168.0.201
$node2 = 192.168.0.202 $node2 = 192.168.0.202
$node2 = 192.168.0.203 $node3 = 192.168.0.203
$node3 = 192.168.0.204 $node4 = 192.168.0.204
$first = 1
$num = 5
$self = $node1 $self = $node1
$num = 25
############### deploy firstEp ##################### ############### deploy firstEp #####################
$firstEp = $node1 . :7100 $firstEp = $node1 . :7100
$firstPort = 7100 $firstPort = 7100
if $first == 1 then if $self == $node1 then
system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c firstEp -v $firstEp system sh/cfg.sh -n dnode1 -c firstEp -v $firstEp
system sh/cfg.sh -n dnode1 -c secondEp -v $firstEp system sh/cfg.sh -n dnode1 -c secondEp -v $firstEp
...@@ -28,7 +27,7 @@ if $first == 1 then ...@@ -28,7 +27,7 @@ if $first == 1 then
$i = 0 $i = 0
while $i < $num while $i < $num
$port = $i * 100 $port = $i * 100
$port = $port + 8000 $port = $port + 8100
$i = $i + 1 $i = $i + 1
sql create dnode $node1 port $port sql create dnode $node1 port $port
endw endw
...@@ -36,7 +35,7 @@ if $first == 1 then ...@@ -36,7 +35,7 @@ if $first == 1 then
$i = 0 $i = 0
while $i < $num while $i < $num
$port = $i * 100 $port = $i * 100
$port = $port + 8000 $port = $port + 8100
$i = $i + 1 $i = $i + 1
sql create dnode $node2 port $port sql create dnode $node2 port $port
endw endw
...@@ -44,7 +43,7 @@ if $first == 1 then ...@@ -44,7 +43,7 @@ if $first == 1 then
$i = 0 $i = 0
while $i < $num while $i < $num
$port = $i * 100 $port = $i * 100
$port = $port + 8000 $port = $port + 8100
$i = $i + 1 $i = $i + 1
sql create dnode $node3 port $port sql create dnode $node3 port $port
endw endw
...@@ -52,7 +51,7 @@ if $first == 1 then ...@@ -52,7 +51,7 @@ if $first == 1 then
$i = 0 $i = 0
while $i < $num while $i < $num
$port = $i * 100 $port = $i * 100
$port = $port + 8000 $port = $port + 8100
$i = $i + 1 $i = $i + 1
sql create dnode $node4 port $port sql create dnode $node4 port $port
endw endw
...@@ -64,7 +63,7 @@ $i = 0 ...@@ -64,7 +63,7 @@ $i = 0
while $i < $num while $i < $num
$index = $i + 80 $index = $i + 80
$port = $i * 100 $port = $i * 100
$port = $port + 8000 $port = $port + 8100
$dnodename = dnode . $index $dnodename = dnode . $index
$i = $i + 1 $i = $i + 1
...@@ -74,5 +73,5 @@ while $i < $num ...@@ -74,5 +73,5 @@ while $i < $num
system sh/cfg.sh -n $dnodename -c fqdn -v $self system sh/cfg.sh -n $dnodename -c fqdn -v $self
system sh/cfg.sh -n $dnodename -c serverPort -v $port system sh/cfg.sh -n $dnodename -c serverPort -v $port
#system sh/exec.sh -n $dnodename -s start system sh/exec.sh -n $dnodename -s start
endw endw
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册