提交 e7d408c9 编写于 作者: H hjxilinx

[td-98] merge develop branch

...@@ -5280,8 +5280,8 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { ...@@ -5280,8 +5280,8 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) { int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
char msg[512] = {0}; char msg[512] = {0};
if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 1)) { if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 2)) {
snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog); snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
} }
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "vnode.h" #include "vnode.h"
static int32_t dnodeOpenVnodes(); static int32_t dnodeOpenVnodes();
static void dnodeCloseVnodes();
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
...@@ -64,10 +65,6 @@ int32_t dnodeInitMgmt() { ...@@ -64,10 +65,6 @@ int32_t dnodeInitMgmt() {
return -1; return -1;
} }
if ( vnodeInitModule() != TSDB_CODE_SUCCESS) {
return -1;
}
int32_t code = dnodeOpenVnodes(); int32_t code = dnodeOpenVnodes();
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return -1; return -1;
...@@ -88,7 +85,7 @@ void dnodeCleanupMgmt() { ...@@ -88,7 +85,7 @@ void dnodeCleanupMgmt() {
tsDnodeTmr = NULL; tsDnodeTmr = NULL;
} }
vnodeCleanupModule(); dnodeCloseVnodes();
} }
void dnodeMgmt(SRpcMsg *pMsg) { void dnodeMgmt(SRpcMsg *pMsg) {
...@@ -107,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) { ...@@ -107,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
static int32_t dnodeOpenVnodes() { static int dnodeGetVnodeList(int32_t vnodeList[]) {
DIR *dir = opendir(tsVnodeDir); DIR *dir = opendir(tsVnodeDir);
if (dir == NULL) { if (dir == NULL) {
return TSDB_CODE_NO_WRITE_ACCESS; return TSDB_CODE_NO_WRITE_ACCESS;
...@@ -122,18 +119,42 @@ static int32_t dnodeOpenVnodes() { ...@@ -122,18 +119,42 @@ static int32_t dnodeOpenVnodes() {
int32_t vnode = atoi(de->d_name + 5); int32_t vnode = atoi(de->d_name + 5);
if (vnode == 0) continue; if (vnode == 0) continue;
char vnodeDir[TSDB_FILENAME_LEN * 3]; vnodeList[numOfVnodes] = vnode;
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name); numOfVnodes++;
int32_t code = vnodeOpen(vnode, vnodeDir);
if (code == 0) {
numOfVnodes++;
}
} }
} }
closedir(dir); closedir(dir);
dPrint("dnode mgmt is opened, vnodes:%d", numOfVnodes); return numOfVnodes;
return TSDB_CODE_SUCCESS; }
static int32_t dnodeOpenVnodes() {
char vnodeDir[TSDB_FILENAME_LEN * 3];
int failed = 0;
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000);
int numOfVnodes = dnodeGetVnodeList(vnodeList);
for (int i=0; i<numOfVnodes; ++i) {
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]);
if (vnodeOpen(vnodeList[i], vnodeDir) <0) failed++;
}
free(vnodeList);
dPrint("there are total vnodes:%d, failed to open:%d", numOfVnodes, failed);
return TSDB_CODE_SUCCESS;
}
static void dnodeCloseVnodes() {
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000);
int numOfVnodes = dnodeGetVnodeList(vnodeList);
for (int i=0; i<numOfVnodes; ++i)
vnodeClose(vnodeList[i]);
free(vnodeList);
dPrint("total vnodes:%d are all closed", numOfVnodes);
} }
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
...@@ -142,6 +163,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -142,6 +163,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
pCreate->cfg.commitLog = pCreate->cfg.commitLog;
return vnodeCreate(pCreate); return vnodeCreate(pCreate);
} }
......
...@@ -25,13 +25,10 @@ typedef struct { ...@@ -25,13 +25,10 @@ typedef struct {
void *rsp; void *rsp;
} SRspRet; } SRspRet;
int32_t vnodeInitModule();
void vnodeCleanupModule();
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId); int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vnode, char *rootDir); int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeClose(void *pVnode); int32_t vnodeClose(int32_t vgId);
void vnodeRelease(void *pVnode); void vnodeRelease(void *pVnode);
void* vnodeGetVnode(int32_t vgId); void* vnodeGetVnode(int32_t vgId);
......
...@@ -166,8 +166,8 @@ SDbObj *mgmtGetDbByTableId(char *tableId) { ...@@ -166,8 +166,8 @@ SDbObj *mgmtGetDbByTableId(char *tableId) {
} }
static int32_t mgmtCheckDBParams(SCMCreateDbMsg *pCreate) { static int32_t mgmtCheckDBParams(SCMCreateDbMsg *pCreate) {
if (pCreate->commitLog < 0 || pCreate->commitLog > 1) { if (pCreate->commitLog < 0 || pCreate->commitLog > 2) {
mError("invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog); mError("invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
......
...@@ -490,6 +490,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { ...@@ -490,6 +490,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2); pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2);
pCfg->daysToKeep = htonl(pCfg->daysToKeep); pCfg->daysToKeep = htonl(pCfg->daysToKeep);
pCfg->commitTime = htonl(pCfg->commitTime); pCfg->commitTime = htonl(pCfg->commitTime);
pCfg->commitLog = pCfg->commitLog;
pCfg->blocksPerTable = htons(pCfg->blocksPerTable); pCfg->blocksPerTable = htons(pCfg->blocksPerTable);
pCfg->replications = (char) pVgroup->numOfVnodes; pCfg->replications = (char) pVgroup->numOfVnodes;
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
......
...@@ -1567,8 +1567,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -1567,8 +1567,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
destroyResultBuf(pRuntimeEnv->pResultBuf); destroyResultBuf(pRuntimeEnv->pResultBuf);
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
tfree(pRuntimeEnv->pQuery);
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
} }
...@@ -2247,11 +2245,8 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { ...@@ -2247,11 +2245,8 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
} }
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
// tSidSetDestroy(&pQInfo->pSidSet);
if (pQInfo->pTableDataInfo != NULL) { if (pQInfo->pTableDataInfo != NULL) {
// size_t num = taosHashGetSize(pQInfo->pTableIdList); // size_t num = taosHashGetSize(pQInfo->pTableIdList);
for (int32_t j = 0; j < 0; ++j) { for (int32_t j = 0; j < 0; ++j) {
...@@ -5974,6 +5969,8 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5974,6 +5969,8 @@ static void freeQInfo(SQInfo *pQInfo) {
} }
tfree(pQuery->pGroupbyExpr); tfree(pQuery->pGroupbyExpr);
tfree(pQuery);
taosArrayDestroy(pQInfo->pTableIdList); taosArrayDestroy(pQInfo->pTableIdList);
dTrace("QInfo:%p QInfo is freed", pQInfo); dTrace("QInfo:%p QInfo is freed", pQInfo);
......
...@@ -828,13 +828,15 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -828,13 +828,15 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
if (pConn->inType) { if (pConn->inType) {
// if there are pending request, notify the app // if there are pending request, notify the app
tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn); tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn);
/*
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.pCont = NULL; rpcMsg.pCont = NULL;
rpcMsg.contLen = 0; rpcMsg.contLen = 0;
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
rpcMsg.msgType = pConn->inType; rpcMsg.msgType = pConn->inType;
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
// (*(pRpc->cfp))(&rpcMsg); (*(pRpc->cfp))(&rpcMsg);
*/
} }
rpcCloseConn(pConn); rpcCloseConn(pConn);
...@@ -1157,13 +1159,15 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { ...@@ -1157,13 +1159,15 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
if (pConn->inType && pRpc->cfp) { if (pConn->inType && pRpc->cfp) {
// if there are pending request, notify the app // if there are pending request, notify the app
tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn); tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn);
/*
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.pCont = NULL; rpcMsg.pCont = NULL;
rpcMsg.contLen = 0; rpcMsg.contLen = 0;
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
rpcMsg.msgType = pConn->inType; rpcMsg.msgType = pConn->inType;
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
// (*(pRpc->cfp))(&rpcMsg); (*(pRpc->cfp))(&rpcMsg);
*/
} }
rpcCloseConn(pConn); rpcCloseConn(pConn);
} else { } else {
......
...@@ -632,7 +632,7 @@ static void doInitGlobalConfig() { ...@@ -632,7 +632,7 @@ static void doInitGlobalConfig() {
tsInitConfigOption(cfg++, "clog", &tsCommitLog, TSDB_CFG_VTYPE_SHORT, tsInitConfigOption(cfg++, "clog", &tsCommitLog, TSDB_CFG_VTYPE_SHORT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
0, 1, 0, TSDB_CFG_UTYPE_NONE); 0, 2, 0, TSDB_CFG_UTYPE_NONE);
tsInitConfigOption(cfg++, "comp", &tsCompression, TSDB_CFG_VTYPE_SHORT, tsInitConfigOption(cfg++, "comp", &tsCompression, TSDB_CFG_VTYPE_SHORT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
0, 2, 0, TSDB_CFG_UTYPE_NONE); 0, 2, 0, TSDB_CFG_UTYPE_NONE);
......
...@@ -33,27 +33,22 @@ static void *tsDnodeVnodesHash; ...@@ -33,27 +33,22 @@ static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeCleanUp(SVnodeObj *pVnode);
static void vnodeBuildVloadMsg(char *pNode, void * param); static void vnodeBuildVloadMsg(char *pNode, void * param);
int32_t vnodeInitModule() { static int tsOpennedVnodes;
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
static void vnodeInit() {
vnodeInitWriteFp(); vnodeInitWriteFp();
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt);
if (tsDnodeVnodesHash == NULL) { if (tsDnodeVnodesHash == NULL) {
dError("failed to init vnode list"); dError("failed to init vnode list");
return -1;
} }
return 0;
}
typedef void (*CleanupFp)(char *);
void vnodeCleanupModule() {
taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose);
taosCleanUpIntHash(tsDnodeVnodesHash);
} }
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t code; int32_t code;
pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId); SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
...@@ -93,7 +88,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -93,7 +88,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
return code; return code;
} }
dPrint("vgId:%d, vnode is created", pVnodeCfg->cfg.vgId); dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog);
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir); code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
return code; return code;
...@@ -116,6 +111,7 @@ int32_t vnodeDrop(int32_t vgId) { ...@@ -116,6 +111,7 @@ int32_t vnodeDrop(int32_t vgId) {
int32_t vnodeOpen(int32_t vnode, char *rootDir) { int32_t vnodeOpen(int32_t vnode, char *rootDir) {
char temp[TSDB_FILENAME_LEN]; char temp[TSDB_FILENAME_LEN];
pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj vnodeObj = {0}; SVnodeObj vnodeObj = {0};
vnodeObj.vgId = vnode; vnodeObj.vgId = vnode;
...@@ -147,11 +143,14 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -147,11 +143,14 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->status = VN_STATUS_READY; pVnode->status = VN_STATUS_READY;
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
tsOpennedVnodes++;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeClose(void *param) { int32_t vnodeClose(int32_t vgId) {
SVnodeObj *pVnode = (SVnodeObj *)param;
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
if (pVnode == NULL) return 0;
dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId); dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId);
pVnode->status = VN_STATUS_CLOSING; pVnode->status = VN_STATUS_CLOSING;
...@@ -183,6 +182,13 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -183,6 +182,13 @@ void vnodeRelease(void *pVnodeRaw) {
} }
dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId); dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId);
tsOpennedVnodes--;
if (tsOpennedVnodes <= 0) {
taosCleanUpIntHash(tsDnodeVnodesHash);
vnodeModuleInit = PTHREAD_ONCE_INIT;
tsDnodeVnodesHash = NULL;
}
} }
void *vnodeGetVnode(int32_t vgId) { void *vnodeGetVnode(int32_t vgId) {
...@@ -235,10 +241,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) { ...@@ -235,10 +241,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) {
} }
static void vnodeCleanUp(SVnodeObj *pVnode) { static void vnodeCleanUp(SVnodeObj *pVnode) {
if (pVnode->status == VN_STATUS_DELETING) { taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
// fix deadlock occured while close system
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
}
//syncStop(pVnode->sync); //syncStop(pVnode->sync);
tsdbCloseRepo(pVnode->tsdb); tsdbCloseRepo(pVnode->tsdb);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册