提交 8527587b 编写于 作者: S slguan

[TBASE-1186]

上级 b1bb894e
...@@ -134,6 +134,7 @@ extern "C" { ...@@ -134,6 +134,7 @@ extern "C" {
#define TSDB_CODE_INVALID_SUBMIT_MSG 113 #define TSDB_CODE_INVALID_SUBMIT_MSG 113
#define TSDB_CODE_NOT_ACTIVE_TABLE 114 #define TSDB_CODE_NOT_ACTIVE_TABLE 114
#define TSDB_CODE_INVALID_TABLE_ID 115 #define TSDB_CODE_INVALID_TABLE_ID 115
#define TSDB_CODE_INVALID_VNODE_STATUS 116
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -44,22 +44,40 @@ extern "C" { ...@@ -44,22 +44,40 @@ extern "C" {
#define TSDB_TIME_PRECISION_MILLI_STR "ms" #define TSDB_TIME_PRECISION_MILLI_STR "ms"
#define TSDB_TIME_PRECISION_MICRO_STR "us" #define TSDB_TIME_PRECISION_MICRO_STR "us"
enum _status { enum _vnode_status {
TSDB_STATUS_OFFLINE, TSDB_VNODE_STATUS_OFFLINE,
TSDB_STATUS_CREATING, TSDB_VNODE_STATUS_CREATING,
TSDB_STATUS_UNSYNCED, TSDB_VNODE_STATUS_UNSYNCED,
TSDB_STATUS_SLAVE, TSDB_VNODE_STATUS_SLAVE,
TSDB_STATUS_MASTER, TSDB_VNODE_STATUS_MASTER,
TSDB_STATUS_READY, TSDB_VNODE_STATUS_CLOSING,
TSDB_VNODE_STATUS_DELETING,
}; };
enum _syncstatus { enum _vnode_sync_status {
STDB_SSTATUS_INIT, STDB_SSTATUS_INIT,
TSDB_SSTATUS_SYNCING, TSDB_SSTATUS_SYNCING,
TSDB_SSTATUS_SYNC_CACHE, TSDB_SSTATUS_SYNC_CACHE,
TSDB_SSTATUS_SYNC_FILE, TSDB_SSTATUS_SYNC_FILE,
}; };
enum _dnode_status {
TSDB_DNODE_STATUS_OFFLINE,
TSDB_DNODE_STATUS_READY
};
enum _dnode_balance_status {
LB_DNODE_STATE_BALANCED,
LB_DNODE_STATE_BALANCING,
LB_DNODE_STATE_OFFLINE_REMOVING,
LB_DNODE_STATE_SHELL_REMOVING
};
enum _vgroup_status {
LB_VGROUP_STATE_READY,
LB_VGROUP_STATE_UPDATE
};
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes #define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte #define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes #define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
......
...@@ -20,10 +20,11 @@ ...@@ -20,10 +20,11 @@
extern "C" { extern "C" {
#endif #endif
extern char *sdbDnodeStatusStr[]; const char* taosGetVnodeStatusStr(int vnodeStatus);
extern char *sdbDnodeBalanceStateStr[]; const char* taosGetDnodeStatusStr(int dnodeStatus);
extern char *sdbVnodeDropStateStr[]; const char* taosGetDnodeBalanceStateStr(int dnodeBalanceStatus);
extern char *sdbVnodeSyncStatusStr[]; const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus);
const char* taosGetVnodeDropStatusStr(int dropping);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -354,6 +354,8 @@ int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle) { ...@@ -354,6 +354,8 @@ int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle) {
STaosRpc * pServer = (STaosRpc *)handle; STaosRpc * pServer = (STaosRpc *)handle;
SRpcChann *pChann; SRpcChann *pChann;
tTrace("cid:%d, handle:%p open rpc chann", cid, handle);
if (pServer == NULL) return -1; if (pServer == NULL) return -1;
if (cid >= pServer->numOfChanns || cid < 0) { if (cid >= pServer->numOfChanns || cid < 0) {
tError("%s: cid:%d, chann is out of range, max:%d", pServer->label, cid, pServer->numOfChanns); tError("%s: cid:%d, chann is out of range, max:%d", pServer->label, cid, pServer->numOfChanns);
...@@ -402,6 +404,8 @@ void taosCloseRpcChann(void *handle, int cid) { ...@@ -402,6 +404,8 @@ void taosCloseRpcChann(void *handle, int cid) {
STaosRpc * pServer = (STaosRpc *)handle; STaosRpc * pServer = (STaosRpc *)handle;
SRpcChann *pChann; SRpcChann *pChann;
tTrace("cid:%d, handle:%p close rpc chann", cid, handle);
if (pServer == NULL) return; if (pServer == NULL) return;
if (cid >= pServer->numOfChanns || cid < 0) { if (cid >= pServer->numOfChanns || cid < 0) {
tError("%s cid:%d, chann is out of range, max:%d", pServer->label, cid, pServer->numOfChanns); tError("%s cid:%d, chann is out of range, max:%d", pServer->label, cid, pServer->numOfChanns);
......
...@@ -240,4 +240,5 @@ char *tsError[] = {"success", ...@@ -240,4 +240,5 @@ char *tsError[] = {"success",
"invalid submit message", "invalid submit message",
"not active table(not created yet or deleted already)", //114 "not active table(not created yet or deleted already)", //114
"invalid table id", "invalid table id",
"invalid vnode status", //116
}; };
...@@ -28,15 +28,6 @@ extern "C" { ...@@ -28,15 +28,6 @@ extern "C" {
#include "tstatus.h" #include "tstatus.h"
#include "ttime.h" #include "ttime.h"
enum {
LB_DNODE_STATE_BALANCED,
LB_DNODE_STATE_BALANCING,
LB_DNODE_STATE_OFFLINE_REMOVING,
LB_DNODE_STATE_SHELL_REMOVING
};
enum { LB_VGROUP_STATE_READY, LB_VGROUP_STATE_UPDATE };
void mgmtCreateDnodeOrderList(); void mgmtCreateDnodeOrderList();
void mgmtReleaseDnodeOrderList(); void mgmtReleaseDnodeOrderList();
......
...@@ -92,7 +92,7 @@ typedef struct { ...@@ -92,7 +92,7 @@ typedef struct {
SVPeerDesc vpeers[TSDB_VNODES_SUPPORT]; SVPeerDesc vpeers[TSDB_VNODES_SUPPORT];
SVnodePeer * peerInfo[TSDB_VNODES_SUPPORT]; SVnodePeer * peerInfo[TSDB_VNODES_SUPPORT];
char selfIndex; char selfIndex;
char status; char vnodeStatus;
char accessState; // Vnode access state, Readable/Writable char accessState; // Vnode access state, Readable/Writable
char syncStatus; char syncStatus;
char commitInProcess; char commitInProcess;
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "vnodeMgmt.h" #include "vnodeMgmt.h"
#include "vnodeSystem.h" #include "vnodeSystem.h"
#include "vnodeUtil.h" #include "vnodeUtil.h"
#include "tstatus.h"
SMgmtObj mgmtObj; SMgmtObj mgmtObj;
extern uint64_t tsCreatedTime; extern uint64_t tsCreatedTime;
...@@ -330,7 +331,7 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { ...@@ -330,7 +331,7 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
return -1; return -1;
} }
if (vnodeList[vnode].status == TSDB_STATUS_CREATING) { if (vnodeList[vnode].vnodeStatus == TSDB_VNODE_STATUS_CREATING) {
dTrace("vid:%d, vnode is still under creating", vnode); dTrace("vid:%d, vnode is still under creating", vnode);
return 0; return 0;
} }
...@@ -359,13 +360,23 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { ...@@ -359,13 +360,23 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
} }
if (vnodeList[vnode].cfg.maxSessions == 0) { if (vnodeList[vnode].cfg.maxSessions == 0) {
dTrace("vid:%d, vnode is empty", vnode);
if (pCfg->maxSessions > 0) { if (pCfg->maxSessions > 0) {
return vnodeCreateVnode(vnode, pCfg, pMsg->vpeerDesc); if (vnodeList[vnode].vnodeStatus == TSDB_VNODE_STATUS_OFFLINE) {
dTrace("vid:%d, status:%s, start to create vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
return vnodeCreateVnode(vnode, pCfg, pMsg->vpeerDesc);
} else {
dTrace("vid:%d, status:%s, cannot preform create vnode operation", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
return TSDB_CODE_INVALID_VNODE_STATUS;
}
} }
} else { } else {
dTrace("vid:%d, vnode is not empty", vnode);
if (pCfg->maxSessions > 0) { if (pCfg->maxSessions > 0) {
dTrace("vid:%d, status:%s, start to update vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
/*
if (pCfg->maxSessions != vnodeList[vnode].cfg.maxSessions) { if (pCfg->maxSessions != vnodeList[vnode].cfg.maxSessions) {
vnodeCleanUpOneVnode(vnode); vnodeCleanUpOneVnode(vnode);
} }
vnodeConfigVPeers(vnode, pCfg->replications, pMsg->vpeerDesc); vnodeConfigVPeers(vnode, pCfg->replications, pMsg->vpeerDesc);
...@@ -376,7 +387,10 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { ...@@ -376,7 +387,10 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
vnodeList[vnode].cfg.maxSessions = pCfg->maxSessions; vnodeList[vnode].cfg.maxSessions = pCfg->maxSessions;
vnodeOpenVnode(vnode); vnodeOpenVnode(vnode);
} }
*/
return 0;
} else { } else {
dTrace("vid:%d, status:%s, start to delete vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
vnodeRemoveVnode(vnode); vnodeRemoveVnode(vnode);
} }
} }
...@@ -434,11 +448,11 @@ int vnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { ...@@ -434,11 +448,11 @@ int vnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) {
pFree->vnode = htons(pFree->vnode); pFree->vnode = htons(pFree->vnode);
if (pFree->vnode < 0 || pFree->vnode >= TSDB_MAX_VNODES) { if (pFree->vnode < 0 || pFree->vnode >= TSDB_MAX_VNODES) {
dWarn("vid:%d out of range", pFree->vnode); dWarn("vid:%d, out of range", pFree->vnode);
return -1; return -1;
} }
dTrace("vid:%d receive free vnode message", pFree->vnode); dTrace("vid:%d, receive free vnode message", pFree->vnode);
int32_t code = vnodeRemoveVnode(pFree->vnode); int32_t code = vnodeRemoveVnode(pFree->vnode);
assert(code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS); assert(code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS);
......
...@@ -141,11 +141,10 @@ int mgmtCheckDbParams(SCreateDbMsg *pCreate) { ...@@ -141,11 +141,10 @@ int mgmtCheckDbParams(SCreateDbMsg *pCreate) {
if (pCreate->cacheNumOfBlocks.fraction < 0) pCreate->cacheNumOfBlocks.fraction = tsAverageCacheBlocks; // if (pCreate->cacheNumOfBlocks.fraction < 0) pCreate->cacheNumOfBlocks.fraction = tsAverageCacheBlocks; //
//-1 for balance //-1 for balance
#ifdef CLUSTER if (pCreate->replications <= 0 || pCreate->replications > TSDB_REPLICA_MAX_NUM) {
if (pCreate->replications > TSDB_VNODES_SUPPORT - 1) pCreate->replications = TSDB_VNODES_SUPPORT - 1; mTrace("invalid db option replications: %d", pCreate->replications);
#else return TSDB_CODE_INVALID_OPTION;
pCreate->replications = 1; }
#endif
if (pCreate->commitLog < 0 || pCreate->commitLog > 1) { if (pCreate->commitLog < 0 || pCreate->commitLog > 1) {
mTrace("invalid db option commitLog: %d", pCreate->commitLog); mTrace("invalid db option commitLog: %d", pCreate->commitLog);
...@@ -316,7 +315,7 @@ bool mgmtCheckDropDbFinished(SDbObj *pDb) { ...@@ -316,7 +315,7 @@ bool mgmtCheckDropDbFinished(SDbObj *pDb) {
SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip); SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip);
if (pDnode == NULL) continue; if (pDnode == NULL) continue;
if (pDnode->status == TSDB_STATUS_OFFLINE) continue; if (pDnode->status == TSDB_DNODE_STATUS_OFFLINE) continue;
SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode]; SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode];
if (pVload->dropStatus == TSDB_VN_STATUS_DROPPING) { if (pVload->dropStatus == TSDB_VN_STATUS_DROPPING) {
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "mgmt.h" #include "mgmt.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tstatus.h" #include "tstatus.h"
#include "tstatus.h"
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType); bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType);
int mgmtGetDnodesNum(); int mgmtGetDnodesNum();
...@@ -43,9 +44,9 @@ void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { ...@@ -43,9 +44,9 @@ void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) {
pDnode->openVnodes = 0; pDnode->openVnodes = 0;
#ifdef CLUSTER #ifdef CLUSTER
pDnode->status = TSDB_STATUS_OFFLINE; pDnode->status = TSDB_DNODE_STATUS_OFFLINE;
#else #else
pDnode->status = TSDB_STATUS_READY; pDnode->status = TSDB_DNODE_STATUS_READY;
#endif #endif
} }
...@@ -57,9 +58,9 @@ void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) { ...@@ -57,9 +58,9 @@ void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) {
if (pVload->vgId != 0) { if (pVload->vgId != 0) {
mTrace("dnode:%s, calc free vnodes, exist vnode:%d, vgroup:%d, state:%d %s, dropstate:%d %s, syncstatus:%d %s", mTrace("dnode:%s, calc free vnodes, exist vnode:%d, vgroup:%d, state:%d %s, dropstate:%d %s, syncstatus:%d %s",
taosIpStr(pDnode->privateIp), i, pVload->vgId, taosIpStr(pDnode->privateIp), i, pVload->vgId,
pVload->status, sdbDnodeStatusStr[pVload->status], pVload->status, taosGetDnodeStatusStr(pVload->status),
pVload->dropStatus, sdbVnodeDropStateStr[pVload->dropStatus], pVload->dropStatus, taosGetVnodeDropStatusStr(pVload->dropStatus),
pVload->syncStatus, sdbVnodeSyncStatusStr[pVload->syncStatus]); pVload->syncStatus, taosGetVnodeSyncStatusStr(pVload->syncStatus));
totalVnodes++; totalVnodes++;
} }
} }
...@@ -196,11 +197,11 @@ int mgmtRetrieveDnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { ...@@ -196,11 +197,11 @@ int mgmtRetrieveDnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, sdbDnodeStatusStr[pDnode->status]); strcpy(pWrite, taosGetDnodeStatusStr(pDnode->status) );
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, sdbDnodeBalanceStateStr[pDnode->lbState]); strcpy(pWrite, taosGetDnodeBalanceStateStr(pDnode->lbState));
cols++; cols++;
tinet_ntoa(ipstr, pDnode->publicIp); tinet_ntoa(ipstr, pDnode->publicIp);
...@@ -292,7 +293,7 @@ int mgmtRetrieveModules(SShowObj *pShow, char *data, int rows, SConnObj *pConn) ...@@ -292,7 +293,7 @@ int mgmtRetrieveModules(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, sdbDnodeStatusStr[pDnode->status]); strcpy(pWrite, taosGetDnodeStatusStr(pDnode->status) );
cols++; cols++;
numOfRows++; numOfRows++;
......
...@@ -296,7 +296,7 @@ pthread_t vnodeCreateCommitThread(SVnodeObj *pVnode) { ...@@ -296,7 +296,7 @@ pthread_t vnodeCreateCommitThread(SVnodeObj *pVnode) {
taosTmrStopA(&pVnode->commitTimer); taosTmrStopA(&pVnode->commitTimer);
if (pVnode->status == TSDB_STATUS_UNSYNCED) { if (pVnode->vnodeStatus == TSDB_VNODE_STATUS_UNSYNCED) {
taosTmrReset(vnodeProcessCommitTimer, pVnode->cfg.commitTime * 1000, pVnode, vnodeTmrCtrl, &pVnode->commitTimer); taosTmrReset(vnodeProcessCommitTimer, pVnode->cfg.commitTime * 1000, pVnode, vnodeTmrCtrl, &pVnode->commitTimer);
dTrace("vid:%d, it is in unsyc state, commit later", pVnode->vnode); dTrace("vid:%d, it is in unsyc state, commit later", pVnode->vnode);
return pVnode->commitThread; return pVnode->commitThread;
......
...@@ -1290,7 +1290,7 @@ int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pCompBlock, SData *data[] ...@@ -1290,7 +1290,7 @@ int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pCompBlock, SData *data[]
pCompBlock->len += wlen; pCompBlock->len += wlen;
} }
dTrace("vid: %d vnode compStorage size is: %ld", pObj->vnode, pVnode->vnodeStatistic.compStorage); dTrace("vid:%d, vnode compStorage size is: %ld", pObj->vnode, pVnode->vnodeStatistic.compStorage);
pCompBlock->algorithm = pCfg->compression; pCompBlock->algorithm = pCfg->compression;
pCompBlock->numOfPoints = points; pCompBlock->numOfPoints = points;
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "vnode.h" #include "vnode.h"
#include "vnodeRead.h" #include "vnodeRead.h"
#include "vnodeUtil.h" #include "vnodeUtil.h"
#include "vnodeStore.h"
#pragma GCC diagnostic ignored "-Wint-conversion" #pragma GCC diagnostic ignored "-Wint-conversion"
extern int tsMaxQueues; extern int tsMaxQueues;
...@@ -154,6 +155,11 @@ int vnodeInitShell() { ...@@ -154,6 +155,11 @@ int vnodeInitShell() {
} }
int vnodeOpenShellVnode(int vnode) { int vnodeOpenShellVnode(int vnode) {
if (shellList[vnode] != NULL) {
dError("vid:%d, shell is already opened", vnode);
return -1;
}
const int32_t MIN_NUM_OF_SESSIONS = 300; const int32_t MIN_NUM_OF_SESSIONS = 300;
SVnodeCfg *pCfg = &vnodeList[vnode].cfg; SVnodeCfg *pCfg = &vnodeList[vnode].cfg;
...@@ -162,23 +168,29 @@ int vnodeOpenShellVnode(int vnode) { ...@@ -162,23 +168,29 @@ int vnodeOpenShellVnode(int vnode) {
size_t size = sessions * sizeof(SShellObj); size_t size = sessions * sizeof(SShellObj);
shellList[vnode] = (SShellObj *)calloc(1, size); shellList[vnode] = (SShellObj *)calloc(1, size);
if (shellList[vnode] == NULL) { if (shellList[vnode] == NULL) {
dError("vid:%d failed to allocate shellObj, size:%d", vnode, size); dError("vid:%d, sessions:%d, failed to allocate shellObj, size:%d", vnode, pCfg->maxSessions, size);
return -1; return -1;
} }
if(taosOpenRpcChannWithQ(pShellServer, vnode, sessions, rpcQhandle[(vnode+1)%tsMaxQueues]) != TSDB_CODE_SUCCESS) { if(taosOpenRpcChannWithQ(pShellServer, vnode, sessions, rpcQhandle[(vnode+1)%tsMaxQueues]) != TSDB_CODE_SUCCESS) {
dError("vid:%d, sessions:%d, failed to open shell", vnode, pCfg->maxSessions);
return -1; return -1;
} }
dTrace("vid:%d, sessions:%d, shell is opened", vnode, pCfg->maxSessions);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void vnodeDelayedFreeResource(void *param, void *tmrId) { static void vnodeDelayedFreeResource(void *param, void *tmrId) {
int32_t vnode = *(int32_t*) param; int32_t vnode = *(int32_t*) param;
taosCloseRpcChann(pShellServer, vnode); // close connection dTrace("vid:%d, start to free resources", vnode);
tfree (shellList[vnode]); //free SShellObj
taosCloseRpcChann(pShellServer, vnode); // close connection
tfree(shellList[vnode]); //free SShellObj
tfree(param); tfree(param);
memset(vnodeList + vnode, 0, sizeof(SVnodeObj));
vnodeCalcOpenVnodes();
} }
void vnodeCloseShellVnode(int vnode) { void vnodeCloseShellVnode(int vnode) {
...@@ -197,7 +209,7 @@ void vnodeCloseShellVnode(int vnode) { ...@@ -197,7 +209,7 @@ void vnodeCloseShellVnode(int vnode) {
* 2. Free connection may cause *(SRpcConn*)pObj->thandle to be invalid to access. * 2. Free connection may cause *(SRpcConn*)pObj->thandle to be invalid to access.
*/ */
dTrace("vid:%d, free resources in 500ms", vnode); dTrace("vid:%d, free resources in 500ms", vnode);
taosTmrStart(vnodeDelayedFreeResource, 500, v, vnodeTmrCtrl); taosTmrStart(vnodeDelayedFreeResource, 5000, v, vnodeTmrCtrl);
} }
void vnodeCleanUpShell() { void vnodeCleanUpShell() {
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "vnode.h" #include "vnode.h"
#include "vnodeStore.h" #include "vnodeStore.h"
#include "vnodeUtil.h" #include "vnodeUtil.h"
#include "tstatus.h"
#pragma GCC diagnostic push #pragma GCC diagnostic push
#pragma GCC diagnostic warning "-Woverflow" #pragma GCC diagnostic warning "-Woverflow"
...@@ -30,12 +31,14 @@ int tsMaxVnode = -1; ...@@ -30,12 +31,14 @@ int tsMaxVnode = -1;
int tsOpenVnodes = 0; int tsOpenVnodes = 0;
SVnodeObj *vnodeList = NULL; SVnodeObj *vnodeList = NULL;
int vnodeInitStoreVnode(int vnode) { static int vnodeInitStoreVnode(int vnode) {
SVnodeObj *pVnode = vnodeList + vnode; SVnodeObj *pVnode = vnodeList + vnode;
pVnode->vnode = vnode; pVnode->vnode = vnode;
vnodeOpenMetersVnode(vnode); vnodeOpenMetersVnode(vnode);
if (pVnode->cfg.maxSessions == 0) return 0; if (pVnode->cfg.maxSessions <= 0) {
return TSDB_CODE_SUCCESS;
}
pVnode->firstKey = taosGetTimestamp(pVnode->cfg.precision); pVnode->firstKey = taosGetTimestamp(pVnode->cfg.precision);
...@@ -45,9 +48,10 @@ int vnodeInitStoreVnode(int vnode) { ...@@ -45,9 +48,10 @@ int vnodeInitStoreVnode(int vnode) {
return -1; return -1;
} }
if (vnodeInitFile(vnode) < 0) return -1; if (vnodeInitFile(vnode) < 0) {
dError("vid:%d, files init failed.", pVnode->vnode);
// vnodeOpenMeterMgmtStoreVnode(vnode); return -1;
}
if (vnodeInitCommit(vnode) < 0) { if (vnodeInitCommit(vnode) < 0) {
dError("vid:%d, commit init failed.", pVnode->vnode); dError("vid:%d, commit init failed.", pVnode->vnode);
...@@ -70,10 +74,17 @@ int vnodeOpenVnode(int vnode) { ...@@ -70,10 +74,17 @@ int vnodeOpenVnode(int vnode) {
pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->accessState = TSDB_VN_ALL_ACCCESS;
// vnode is empty // vnode is empty
if (pVnode->cfg.maxSessions == 0) return 0; if (pVnode->cfg.maxSessions <= 0) {
return TSDB_CODE_SUCCESS;
}
if (!(pVnode->vnodeStatus == TSDB_VNODE_STATUS_OFFLINE || pVnode->vnodeStatus == TSDB_VNODE_STATUS_CREATING)) {
dError("vid:%d, status:%s, cannot enter open operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
return TSDB_CODE_INVALID_VNODE_STATUS;
}
dTrace("vid:%d, status:%s, start to open", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
pthread_mutex_lock(&dmutex); pthread_mutex_lock(&dmutex);
// vnodeOpenMeterMgmtVnode(vnode);
// not enough memory, abort // not enough memory, abort
if ((code = vnodeOpenShellVnode(vnode)) != TSDB_CODE_SUCCESS) { if ((code = vnodeOpenShellVnode(vnode)) != TSDB_CODE_SUCCESS) {
...@@ -93,14 +104,13 @@ int vnodeOpenVnode(int vnode) { ...@@ -93,14 +104,13 @@ int vnodeOpenVnode(int vnode) {
vnodeOpenStreams(pVnode, NULL); vnodeOpenStreams(pVnode, NULL);
#endif #endif
dTrace("vid:%d, vnode is opened, openVnodes:%d", vnode, tsOpenVnodes); dTrace("vid:%d, vnode is opened, openVnodes:%d, status:%s", vnode, tsOpenVnodes, taosGetVnodeStatusStr(pVnode->vnodeStatus));
return 0; return TSDB_CODE_SUCCESS;
} }
static int32_t vnodeMarkAllMetersDropped(SVnodeObj* pVnode) { static int32_t vnodeMarkAllMetersDropped(SVnodeObj* pVnode) {
if (pVnode->meterList == NULL) { if (pVnode->meterList == NULL) {
assert(pVnode->cfg.maxSessions == 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -119,7 +129,7 @@ static int32_t vnodeMarkAllMetersDropped(SVnodeObj* pVnode) { ...@@ -119,7 +129,7 @@ static int32_t vnodeMarkAllMetersDropped(SVnodeObj* pVnode) {
return ready? TSDB_CODE_SUCCESS:TSDB_CODE_ACTION_IN_PROGRESS; return ready? TSDB_CODE_SUCCESS:TSDB_CODE_ACTION_IN_PROGRESS;
} }
int vnodeCloseVnode(int vnode) { static int vnodeCloseVnode(int vnode) {
if (vnodeList == NULL) return TSDB_CODE_SUCCESS; if (vnodeList == NULL) return TSDB_CODE_SUCCESS;
SVnodeObj* pVnode = &vnodeList[vnode]; SVnodeObj* pVnode = &vnodeList[vnode];
...@@ -130,12 +140,23 @@ int vnodeCloseVnode(int vnode) { ...@@ -130,12 +140,23 @@ int vnodeCloseVnode(int vnode) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pVnode->vnodeStatus == TSDB_VNODE_STATUS_DELETING) {
dTrace("vid:%d, status:%s, another performed delete operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
return TSDB_CODE_SUCCESS;
} else {
dTrace("vid:%d, status:%s, enter close operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
pVnode->vnodeStatus = TSDB_VNODE_STATUS_CLOSING;
}
// set the meter is dropped flag // set the meter is dropped flag
if (vnodeMarkAllMetersDropped(pVnode) != TSDB_CODE_SUCCESS) { if (vnodeMarkAllMetersDropped(pVnode) != TSDB_CODE_SUCCESS) {
pthread_mutex_unlock(&dmutex); pthread_mutex_unlock(&dmutex);
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
dTrace("vid:%d, status:%s, enter delete operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
pVnode->vnodeStatus = TSDB_VNODE_STATUS_DELETING;
vnodeCloseStream(vnodeList + vnode); vnodeCloseStream(vnodeList + vnode);
vnodeCancelCommit(vnodeList + vnode); vnodeCancelCommit(vnodeList + vnode);
vnodeClosePeerVnode(vnode); vnodeClosePeerVnode(vnode);
...@@ -149,9 +170,6 @@ int vnodeCloseVnode(int vnode) { ...@@ -149,9 +170,6 @@ int vnodeCloseVnode(int vnode) {
if (tsMaxVnode == vnode) tsMaxVnode = vnode - 1; if (tsMaxVnode == vnode) tsMaxVnode = vnode - 1;
tfree(vnodeList[vnode].meterIndex); tfree(vnodeList[vnode].meterIndex);
memset(vnodeList + vnode, 0, sizeof(SVnodeObj));
vnodeCalcOpenVnodes();
pthread_mutex_unlock(&dmutex); pthread_mutex_unlock(&dmutex);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -160,7 +178,12 @@ int vnodeCloseVnode(int vnode) { ...@@ -160,7 +178,12 @@ int vnodeCloseVnode(int vnode) {
int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc) { int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc) {
char fileName[128]; char fileName[128];
vnodeList[vnode].status = TSDB_STATUS_CREATING; if (vnodeList[vnode].vnodeStatus != TSDB_VNODE_STATUS_OFFLINE) {
dError("vid:%d, status:%s, cannot enter create operation", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
return TSDB_CODE_INVALID_VNODE_STATUS;
}
vnodeList[vnode].vnodeStatus = TSDB_VNODE_STATUS_CREATING;
sprintf(fileName, "%s/vnode%d", tsDirectory, vnode); sprintf(fileName, "%s/vnode%d", tsDirectory, vnode);
mkdir(fileName, 0755); mkdir(fileName, 0755);
...@@ -177,14 +200,14 @@ int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc) { ...@@ -177,14 +200,14 @@ int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc) {
return TSDB_CODE_VG_INIT_FAILED; return TSDB_CODE_VG_INIT_FAILED;
} }
if (vnodeInitStoreVnode(vnode) != 0) { if (vnodeInitStoreVnode(vnode) < 0) {
return TSDB_CODE_VG_COMMITLOG_INIT_FAILED; return TSDB_CODE_VG_COMMITLOG_INIT_FAILED;
} }
return vnodeOpenVnode(vnode); return vnodeOpenVnode(vnode);
} }
void vnodeRemoveDataFiles(int vnode) { static void vnodeRemoveDataFiles(int vnode) {
char vnodeDir[TSDB_FILENAME_LEN]; char vnodeDir[TSDB_FILENAME_LEN];
char dfilePath[TSDB_FILENAME_LEN]; char dfilePath[TSDB_FILENAME_LEN];
char linkFile[TSDB_FILENAME_LEN]; char linkFile[TSDB_FILENAME_LEN];
...@@ -234,12 +257,21 @@ int vnodeRemoveVnode(int vnode) { ...@@ -234,12 +257,21 @@ int vnodeRemoveVnode(int vnode) {
if (vnodeList == NULL) return TSDB_CODE_SUCCESS; if (vnodeList == NULL) return TSDB_CODE_SUCCESS;
if (vnodeList[vnode].cfg.maxSessions > 0) { if (vnodeList[vnode].cfg.maxSessions > 0) {
int32_t ret = vnodeCloseVnode(vnode); SVnodeObj* pVnode = &vnodeList[vnode];
if (ret != TSDB_CODE_SUCCESS) { if (pVnode->vnodeStatus == TSDB_VNODE_STATUS_CREATING
return ret; || pVnode->vnodeStatus == TSDB_VNODE_STATUS_OFFLINE
|| pVnode->vnodeStatus == TSDB_VNODE_STATUS_DELETING) {
dError("vid:%d, status:%s, cannot enter close/delete operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
return TSDB_CODE_ACTION_IN_PROGRESS;
} else {
int32_t ret = vnodeCloseVnode(vnode);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
vnodeRemoveDataFiles(vnode);
} }
vnodeRemoveDataFiles(vnode);
} else { } else {
dTrace("vid:%d, max sessions:%d, this vnode already dropped!!!", vnode, vnodeList[vnode].cfg.maxSessions); dTrace("vid:%d, max sessions:%d, this vnode already dropped!!!", vnode, vnodeList[vnode].cfg.maxSessions);
vnodeList[vnode].cfg.maxSessions = 0; //reset value vnodeList[vnode].cfg.maxSessions = 0; //reset value
...@@ -293,7 +325,7 @@ void vnodeCleanUpOneVnode(int vnode) { ...@@ -293,7 +325,7 @@ void vnodeCleanUpOneVnode(int vnode) {
again = 1; again = 1;
if (vnodeList[vnode].pCachePool) { if (vnodeList[vnode].pCachePool) {
vnodeList[vnode].status = TSDB_STATUS_OFFLINE; vnodeList[vnode].vnodeStatus = TSDB_VNODE_STATUS_OFFLINE;
vnodeClosePeerVnode(vnode); vnodeClosePeerVnode(vnode);
} }
...@@ -322,7 +354,7 @@ void vnodeCleanUpVnodes() { ...@@ -322,7 +354,7 @@ void vnodeCleanUpVnodes() {
for (int vnode = 0; vnode < TSDB_MAX_VNODES; ++vnode) { for (int vnode = 0; vnode < TSDB_MAX_VNODES; ++vnode) {
if (vnodeList[vnode].pCachePool) { if (vnodeList[vnode].pCachePool) {
vnodeList[vnode].status = TSDB_STATUS_OFFLINE; vnodeList[vnode].vnodeStatus = TSDB_VNODE_STATUS_OFFLINE;
vnodeClosePeerVnode(vnode); vnodeClosePeerVnode(vnode);
} }
} }
......
...@@ -171,7 +171,7 @@ void vnodeCloseStream(SVnodeObj *pVnode) { ...@@ -171,7 +171,7 @@ void vnodeCloseStream(SVnodeObj *pVnode) {
void vnodeUpdateStreamRole(SVnodeObj *pVnode) { void vnodeUpdateStreamRole(SVnodeObj *pVnode) {
/* SMeterObj *pObj; */ /* SMeterObj *pObj; */
int newRole = (pVnode->status == TSDB_STATUS_MASTER) ? 1 : 0; int newRole = (pVnode->vnodeStatus == TSDB_VNODE_STATUS_MASTER) ? 1 : 0;
if (newRole != pVnode->streamRole) { if (newRole != pVnode->streamRole) {
dTrace("vid:%d, stream role is changed to:%d", pVnode->vnode, newRole); dTrace("vid:%d, stream role is changed to:%d", pVnode->vnode, newRole);
pVnode->streamRole = newRole; pVnode->streamRole = newRole;
......
...@@ -30,7 +30,7 @@ int mgmtInitDnodes() { ...@@ -30,7 +30,7 @@ int mgmtInitDnodes() {
dnodeObj.createdTime = (int64_t)tsRebootTime * 1000; dnodeObj.createdTime = (int64_t)tsRebootTime * 1000;
dnodeObj.lastReboot = tsRebootTime; dnodeObj.lastReboot = tsRebootTime;
dnodeObj.numOfCores = (uint16_t)tsNumOfCores; dnodeObj.numOfCores = (uint16_t)tsNumOfCores;
dnodeObj.status = TSDB_STATUS_READY; dnodeObj.status = TSDB_DNODE_STATUS_READY;
dnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY; dnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY;
dnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes; dnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes;
dnodeObj.thandle = (void*)(1); //hack way dnodeObj.thandle = (void*)(1); //hack way
......
...@@ -82,7 +82,7 @@ void mgmtCleanUpDnodeInt() {} ...@@ -82,7 +82,7 @@ void mgmtCleanUpDnodeInt() {}
void mgmtProcessDnodeStatus(void *handle, void *tmrId) { void mgmtProcessDnodeStatus(void *handle, void *tmrId) {
SDnodeObj *pObj = &dnodeObj; SDnodeObj *pObj = &dnodeObj;
pObj->openVnodes = tsOpenVnodes; pObj->openVnodes = tsOpenVnodes;
pObj->status = TSDB_STATUS_READY; pObj->status = TSDB_DNODE_STATUS_READY;
float memoryUsedMB = 0; float memoryUsedMB = 0;
taosGetSysMemory(&memoryUsedMB); taosGetSysMemory(&memoryUsedMB);
...@@ -97,7 +97,7 @@ void mgmtProcessDnodeStatus(void *handle, void *tmrId) { ...@@ -97,7 +97,7 @@ void mgmtProcessDnodeStatus(void *handle, void *tmrId) {
if (vnodeList[vnode].cfg.maxSessions <= 0) { if (vnodeList[vnode].cfg.maxSessions <= 0) {
pVload->dropStatus = TSDB_VN_STATUS_READY; pVload->dropStatus = TSDB_VN_STATUS_READY;
pVload->status = TSDB_VN_STATUS_READY; pVload->status = TSDB_VN_STATUS_READY;
mPrint("vid:%d, drop finished", pObj->privateIp, vnode); mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode);
taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, mgmtTmr); taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, mgmtTmr);
} }
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "vnode.h" #include "vnode.h"
#include "tstatus.h"
int vnodeInitPeer(int numOfThreads) { return 0; } int vnodeInitPeer(int numOfThreads) { return 0; }
...@@ -30,8 +31,8 @@ void vnodeBroadcastStatusToUnsyncedPeer(SVnodeObj *pVnode) {} ...@@ -30,8 +31,8 @@ void vnodeBroadcastStatusToUnsyncedPeer(SVnodeObj *pVnode) {}
int vnodeOpenPeerVnode(int vnode) { int vnodeOpenPeerVnode(int vnode) {
SVnodeObj *pVnode = vnodeList + vnode; SVnodeObj *pVnode = vnodeList + vnode;
pVnode->status = (pVnode->cfg.replications > 1) ? TSDB_STATUS_UNSYNCED : TSDB_STATUS_MASTER; pVnode->vnodeStatus = (pVnode->cfg.replications > 1) ? TSDB_VNODE_STATUS_UNSYNCED : TSDB_VNODE_STATUS_MASTER;
dTrace("vid:%d, vnode status:%d numOfPeers:%d", vnode, pVnode->status, pVnode->cfg.replications-1); dTrace("vid:%d, status:%s numOfPeers:%d", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus), pVnode->cfg.replications - 1);
vnodeUpdateStreamRole(pVnode); vnodeUpdateStreamRole(pVnode);
return 0; return 0;
} }
......
...@@ -13,10 +13,54 @@ ...@@ -13,10 +13,54 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
char* sdbDnodeStatusStr[] = {"offline", "creating", "unsynced", "slave", "master", "ready"}; #include "taosmsg.h"
#include "tsdb.h"
char* sdbDnodeBalanceStateStr[] = {"balanced", "balancing", "offline removing", "shell removing"}; const char* taosGetVnodeStatusStr(int vnodeStatus) {
switch (vnodeStatus) {
case TSDB_VNODE_STATUS_OFFLINE:return "offline";
case TSDB_VNODE_STATUS_CREATING: return "creating";
case TSDB_VNODE_STATUS_UNSYNCED: return "unsynced";
case TSDB_VNODE_STATUS_SLAVE: return "slave";
case TSDB_VNODE_STATUS_MASTER: return "master";
case TSDB_VNODE_STATUS_CLOSING: return "closing";
case TSDB_VNODE_STATUS_DELETING: return "deleting";
default: return "undefined";
}
}
char* sdbVnodeSyncStatusStr[] = {"init", "syncing", "sync_cache", "sync_file"}; const char* taosGetDnodeStatusStr(int dnodeStatus) {
switch (dnodeStatus) {
case TSDB_DNODE_STATUS_OFFLINE: return "offline";
case TSDB_DNODE_STATUS_READY: return "ready";
default: return "undefined";
}
}
char* sdbVnodeDropStateStr[] = {"ready", "dropping"}; const char* taosGetDnodeBalanceStateStr(int dnodeBalanceStatus) {
switch (dnodeBalanceStatus) {
case LB_DNODE_STATE_BALANCED: return "balanced";
case LB_DNODE_STATE_BALANCING: return "balancing";
case LB_DNODE_STATE_OFFLINE_REMOVING: return "offline removing";
case LB_DNODE_STATE_SHELL_REMOVING: return "removing";
default: return "undefined";
}
}
const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus) {
switch (vnodeSyncStatus) {
case STDB_SSTATUS_INIT: return "init";
case TSDB_SSTATUS_SYNCING: return "syncing";
case TSDB_SSTATUS_SYNC_CACHE: return "sync_cache";
case TSDB_SSTATUS_SYNC_FILE: return "sync_file";
default: return "undefined";
}
}
const char* taosGetVnodeDropStatusStr(int dropping) {
switch (dropping) {
case 0: return "ready";
case 1: return "dropping";
default: return "undefined";
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册