未验证 提交 caee36aa 编写于 作者: H hzcheng 提交者: GitHub

Merge pull request #1363 from taosdata/refact/slguan

Refact/slguan
...@@ -57,11 +57,10 @@ void dnodeCleanupMClient() { ...@@ -57,11 +57,10 @@ void dnodeCleanupMClient() {
} }
static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) { static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) {
if (dnodeProcessMgmtRspFp[pMsg->msgType]) { if (dnodeProcessMgmtRspFp[pMsg->msgType]) {
(*dnodeProcessMgmtRspFp[pMsg->msgType])(pMsg); (*dnodeProcessMgmtRspFp[pMsg->msgType])(pMsg);
} else { } else {
dError("%s is not processed", taosMsg[pMsg->msgType]); dError("%s is not processed in mclient", taosMsg[pMsg->msgType]);
} }
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
......
...@@ -226,20 +226,18 @@ static void dnodeCheckDataDirOpenned(char *dir) { ...@@ -226,20 +226,18 @@ static void dnodeCheckDataDirOpenned(char *dir) {
static int32_t dnodeInitStorage() { static int32_t dnodeInitStorage() {
struct stat dirstat; struct stat dirstat;
strcpy(tsDirectory, dataDir);
if (stat(dataDir, &dirstat) < 0) { if (stat(dataDir, &dirstat) < 0) {
mkdir(dataDir, 0755); mkdir(dataDir, 0755);
} }
char fileName[128]; sprintf(tsMnodeDir, "%s/mnode", dataDir);
sprintf(fileName, "%s/tsdb", tsDirectory); sprintf(tsVnodeDir, "%s/vnode", dataDir);
mkdir(fileName, 0755); sprintf(tsDnodeDir, "%s/dnode", dataDir);
sprintf(fileName, "%s/data", tsDirectory); mkdir(tsMnodeDir, 0755);
mkdir(fileName, 0755); mkdir(tsVnodeDir, 0755);
sprintf(tsMgmtDirectory, "%s/mgmt", tsDirectory); mkdir(tsDnodeDir, 0755);
sprintf(tsDirectory, "%s/tsdb", dataDir);
dnodeCheckDataDirOpenned(dataDir); dnodeCheckDataDirOpenned(tsDnodeDir);
dPrint("storage directory is initialized"); dPrint("storage directory is initialized");
return 0; return 0;
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
typedef struct { typedef struct {
int32_t vgId; // global vnode group ID int32_t vgId; // global vnode group ID
int32_t vnode;
int32_t status; // status: master, slave, notready, deleting int32_t status; // status: master, slave, notready, deleting
int32_t refCount; // reference count int32_t refCount; // reference count
int64_t version; int64_t version;
...@@ -43,7 +44,7 @@ typedef struct { ...@@ -43,7 +44,7 @@ typedef struct {
static int32_t dnodeOpenVnodes(); static int32_t dnodeOpenVnodes();
static void dnodeCleanupVnodes(); static void dnodeCleanupVnodes();
static int32_t dnodeOpenVnode(int32_t vgId); static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir);
static void dnodeCleanupVnode(SVnodeObj *pVnode); static void dnodeCleanupVnode(SVnodeObj *pVnode);
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *cfg); static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *cfg);
static void dnodeDropVnode(SVnodeObj *pVnode); static void dnodeDropVnode(SVnodeObj *pVnode);
...@@ -79,7 +80,25 @@ int32_t dnodeInitMgmt() { ...@@ -79,7 +80,25 @@ int32_t dnodeInitMgmt() {
} }
taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer); taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
return dnodeOpenVnodes(); SMDCreateVnodeMsg cfg;
cfg.cfg.vgId = 1;
cfg.cfg.precision = 0;
cfg.vnode = 1;
cfg.cfg.maxSessions = 1000;
cfg.cfg.daysPerFile = 10;
dnodeCreateVnode(&cfg);
SVnodeObj *pVnode = dnodeGetVnode(cfg.cfg.vgId);
dnodeDropVnode(pVnode);
dnodeCreateVnode(&cfg);
SVnodeObj *pVnode = dnodeGetVnode(cfg.cfg.vgId);
dnodeCleanupVnodes();
dnodeOpenVnodes();
dnodeCleanupVnodes();
//return dnodeOpenVnodes();
} }
void dnodeCleanupMgmt() { void dnodeCleanupMgmt() {
...@@ -98,14 +117,13 @@ void dnodeMgmt(SRpcMsg *pMsg) { ...@@ -98,14 +117,13 @@ void dnodeMgmt(SRpcMsg *pMsg) {
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
(*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else { } else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; SRpcMsg rsp;
rsp.handle = pMsg->handle;
rsp.code = TSDB_CODE_MSG_NOT_PROCESSED;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
} }
SRpcMsg rsp;
rsp.handle = pMsg->handle;
rsp.code = terrno;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont); // free the received message rpcFreeCont(pMsg->pCont); // free the received message
} }
...@@ -150,28 +168,55 @@ void dnodeReleaseVnode(void *pVnode) { ...@@ -150,28 +168,55 @@ void dnodeReleaseVnode(void *pVnode) {
} }
static int32_t dnodeOpenVnodes() { static int32_t dnodeOpenVnodes() {
dPrint("open all vnodes"); DIR *dir = opendir(tsVnodeDir);
if (dir == NULL) {
return TSDB_CODE_NO_WRITE_ACCESS;
}
int32_t numOfVnodes = 0;
struct dirent *de = NULL;
while ((de = readdir(dir)) != NULL) {
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
if (de->d_type & DT_DIR) {
if (strncmp("vnode", de->d_name, 5) != 0) continue;
int32_t vnode = atoi(de->d_name + 5);
if (vnode == 0) continue;
char tsdbDir[TSDB_FILENAME_LEN];
sprintf(tsdbDir, "%s/%s", tsVnodeDir, de->d_name);
int32_t code = dnodeOpenVnode(vnode, tsdbDir);
if (code == 0) {
numOfVnodes++;
}
}
}
closedir(dir);
dPrint("all vnodes is opened, num:%d", numOfVnodes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void dnodeCleanupVnodes() { static void dnodeCleanupVnodes() {
dPrint("clean all vnodes"); int32_t num = taosGetIntHashSize(tsDnodeVnodesHash);
taosCleanUpIntHashWithFp(tsDnodeVnodesHash, dnodeCleanupVnode);
dPrint("all vnodes is opened, num:%d", num);
} }
static int32_t dnodeOpenVnode(int32_t vgId) { static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) {
char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsDirectory, vgId);
void *pTsdb = tsdbOpenRepo(rootDir); void *pTsdb = tsdbOpenRepo(rootDir);
if (pTsdb != NULL) { if (pTsdb == NULL) {
dError("failed to open vnode:%d in dir:%s, reason:%s", vnode, rootDir, tstrerror(terrno));
return terrno; return terrno;
} }
//STsdbRepoInfo *tsdbInfo = tsdbGetStatus(pTsdb);
SVnodeObj vnodeObj; SVnodeObj vnodeObj;
vnodeObj.vgId = vgId; vnodeObj.vgId = vnode; //tsdbInfo->tsdbCfg.vgId;
vnodeObj.vnode = vnode; //tsdbInfo->tsdbCfg.tsdbId;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1; vnodeObj.refCount = 1;
vnodeObj.version = 0; vnodeObj.version = version;
vnodeObj.wworker = dnodeAllocateWriteWorker(); vnodeObj.wworker = dnodeAllocateWriteWorker();
vnodeObj.rworker = dnodeAllocateReadWorker(); vnodeObj.rworker = dnodeAllocateReadWorker();
vnodeObj.wal = NULL; vnodeObj.wal = NULL;
...@@ -182,6 +227,7 @@ static int32_t dnodeOpenVnode(int32_t vgId) { ...@@ -182,6 +227,7 @@ static int32_t dnodeOpenVnode(int32_t vgId) {
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj));
dTrace("open vnode:%d in %s", vnodeObj.vnode, rootDir);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -210,11 +256,12 @@ static void dnodeCleanupVnode(SVnodeObj *pVnode) { ...@@ -210,11 +256,12 @@ static void dnodeCleanupVnode(SVnodeObj *pVnode) {
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
} }
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); dTrace("cleanup vnode:%d", pVnode->vnode);
} }
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg;
tsdbCfg.vgId = pVnodeCfg->cfg.vgId;
tsdbCfg.precision = pVnodeCfg->cfg.precision; tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.tsdbId = pVnodeCfg->vnode; tsdbCfg.tsdbId = pVnodeCfg->vnode;
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
...@@ -225,15 +272,16 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -225,15 +272,16 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
tsdbCfg.maxCacheSize = -1; tsdbCfg.maxCacheSize = -1;
char rootDir[TSDB_FILENAME_LEN] = {0}; char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsDirectory, pVnodeCfg->cfg.vgId); sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL); void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL);
if (pTsdb != NULL) { if (pTsdb == NULL) {
return terrno; return terrno;
} }
SVnodeObj vnodeObj; SVnodeObj vnodeObj;
vnodeObj.vgId = pVnodeCfg->cfg.vgId; vnodeObj.vgId = pVnodeCfg->cfg.vgId;
vnodeObj.vnode = pVnodeCfg->vnode;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1; vnodeObj.refCount = 1;
vnodeObj.version = 0; vnodeObj.version = 0;
...@@ -264,6 +312,7 @@ static void dnodeDropVnode(SVnodeObj *pVnode) { ...@@ -264,6 +312,7 @@ static void dnodeDropVnode(SVnodeObj *pVnode) {
} }
dnodeCleanupVnode(pVnode); dnodeCleanupVnode(pVnode);
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
} }
static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
...@@ -281,9 +330,8 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -281,9 +330,8 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
} else { } else {
rpcRsp.code = dnodeCreateVnode(pCreate); rpcRsp.code = dnodeCreateVnode(pCreate);
} }
rpcRsp.code = TSDB_CODE_SUCCESS;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
rpcFreeCont(rpcMsg->pCont);
} }
static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
...@@ -301,7 +349,6 @@ static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -301,7 +349,6 @@ static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
} }
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
rpcFreeCont(rpcMsg->pCont);
} }
static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
...@@ -321,7 +368,6 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -321,7 +368,6 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
} }
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
rpcFreeCont(rpcMsg->pCont);
} }
static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
...@@ -342,7 +388,6 @@ static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { ...@@ -342,7 +388,6 @@ static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
// dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); // dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
} }
static void dnodeSendStatusMsg(void *handle, void *tmrId) { static void dnodeSendStatusMsg(void *handle, void *tmrId) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
if (tsStatusTimer == NULL) { if (tsStatusTimer == NULL) {
......
...@@ -81,7 +81,7 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) { ...@@ -81,7 +81,7 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) {
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
(*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else { } else {
dError("%s is not processed", taosMsg[pMsg->msgType]); dError("%s is not processed in mserver", taosMsg[pMsg->msgType]);
rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED; rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED;
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
......
...@@ -60,11 +60,13 @@ int32_t dnodeInitRead() { ...@@ -60,11 +60,13 @@ int32_t dnodeInitRead() {
maxThreads = tsNumOfCores*tsNumOfThreadsPerCore; maxThreads = tsNumOfCores*tsNumOfThreadsPerCore;
if (maxThreads <= minThreads*2) maxThreads = 2*minThreads; if (maxThreads <= minThreads*2) maxThreads = 2*minThreads;
dPrint("dnode read is opened");
return 0; return 0;
} }
void dnodeCleanupRead() { void dnodeCleanupRead() {
taosCloseQset(readQset); taosCloseQset(readQset);
dPrint("dnode read is closed");
} }
void dnodeRead(SRpcMsg *pMsg) { void dnodeRead(SRpcMsg *pMsg) {
......
...@@ -76,35 +76,43 @@ int32_t dnodeInitWrite() { ...@@ -76,35 +76,43 @@ int32_t dnodeInitWrite() {
wWorkerPool.writeWorker[i].workerId = i; wWorkerPool.writeWorker[i].workerId = i;
} }
dPrint("dnode write is opened");
return 0; return 0;
} }
void dnodeCleanupWrite() { void dnodeCleanupWrite() {
free(wWorkerPool.writeWorker); free(wWorkerPool.writeWorker);
dPrint("dnode write is closed");
} }
void dnodeWrite(SRpcMsg *pMsg) { void dnodeWrite(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen; int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont; char *pCont = (char *) pMsg->pCont;
int32_t contLen = 0;
int32_t numOfVnodes = 0;
int32_t vgId = 0;
SRpcContext *pRpcContext = NULL; SRpcContext *pRpcContext = NULL;
// parse head, get number of vnodes; int32_t numOfVnodes = 0;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
// TODO parse head, get number of vnodes;
numOfVnodes = 1;
} else {
numOfVnodes = 1;
}
if ( numOfVnodes > 1) { if (numOfVnodes > 1) {
pRpcContext = calloc(sizeof(SRpcContext), 1); pRpcContext = calloc(sizeof(SRpcContext), 1);
pRpcContext->numOfVnodes = numOfVnodes; pRpcContext->numOfVnodes = numOfVnodes;
} }
while (leftLen > 0) { while (leftLen > 0) {
// todo: parse head, get vgId, contLen SWriteMsgHead *pHead = (SWriteMsgHead *) pCont;
int32_t vgId = htonl(pHead->vgId);
int32_t contLen = htonl(pHead->contLen);
// get pVnode from vgId
void *pVnode = dnodeGetVnode(vgId); void *pVnode = dnodeGetVnode(vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
leftLen -= contLen;
pCont -= contLen;
continue; continue;
} }
...@@ -118,20 +126,37 @@ void dnodeWrite(SRpcMsg *pMsg) { ...@@ -118,20 +126,37 @@ void dnodeWrite(SRpcMsg *pMsg) {
taos_queue queue = dnodeGetVnodeWworker(pVnode); taos_queue queue = dnodeGetVnodeWworker(pVnode);
taosWriteQitem(queue, &writeMsg); taosWriteQitem(queue, &writeMsg);
// next vnode // next vnode
leftLen -= contLen; leftLen -= contLen;
pCont -= contLen; pCont -= contLen;
queuedMsgNum++;
}
if (queuedMsgNum == 0) {
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0,
.code = TSDB_CODE_INVALID_VGROUP_ID,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
} }
} }
void *dnodeAllocateWriteWorker() { void *dnodeAllocateWriteWorker() {
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg));
if (queue != NULL) return queue;
if (pWorker->qset == NULL) { if (pWorker->qset == NULL) {
pWorker->qset = taosOpenQset(); pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) return NULL; if (pWorker->qset == NULL) return NULL;
taosAddIntoQset(pWorker->qset, queue);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
pthread_attr_t thAttr; pthread_attr_t thAttr;
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
...@@ -140,14 +165,11 @@ void *dnodeAllocateWriteWorker() { ...@@ -140,14 +165,11 @@ void *dnodeAllocateWriteWorker() {
dError("failed to create thread to process read queue, reason:%s", strerror(errno)); dError("failed to create thread to process read queue, reason:%s", strerror(errno));
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
} }
} } else {
taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg));
if (queue) {
taosAddIntoQset(pWorker->qset, queue); taosAddIntoQset(pWorker->qset, queue);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
} }
return queue; return queue;
} }
......
...@@ -39,12 +39,7 @@ extern "C" { ...@@ -39,12 +39,7 @@ extern "C" {
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
// internal globals typedef struct {
extern char version[];
extern void *tsMgmtTmr;
extern char tsMgmtDirectory[];
typedef struct {
uint32_t privateIp; uint32_t privateIp;
int32_t sid; int32_t sid;
uint32_t moduleStatus; uint32_t moduleStatus;
...@@ -87,11 +82,6 @@ typedef struct { ...@@ -87,11 +82,6 @@ typedef struct {
int32_t vnode; int32_t vnode;
} SVnodeGid; } SVnodeGid;
typedef struct {
int32_t sid;
int32_t vgId; // vnode group ID
} STableGid;
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type; int8_t type;
...@@ -248,16 +238,32 @@ typedef struct { ...@@ -248,16 +238,32 @@ typedef struct {
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
int16_t bytes[TSDB_MAX_COLUMNS]; int16_t bytes[TSDB_MAX_COLUMNS];
void * signature; void * signature;
uint16_t payloadLen; /* length of payload*/ uint16_t payloadLen;
char payload[]; /* payload for wildcard match in show tables */ char payload[];
} SShowObj; } SShowObj;
//mgmtSystem typedef struct {
uint8_t msgType;
int8_t expected;
int8_t received;
int8_t successed;
int32_t contLen;
int32_t code;
void *ahandle;
void *thandle;
void *pCont;
SDbObj *pDb;
SUserObj *pUser;
} SQueuedMsg;
int32_t mgmtInitSystem(); int32_t mgmtInitSystem();
int32_t mgmtStartSystem(); int32_t mgmtStartSystem();
void mgmtCleanUpSystem(); void mgmtCleanUpSystem();
void mgmtStopSystem(); void mgmtStopSystem();
extern char version[];
extern void *tsMgmtTmr;
extern char tsMnodeDir[];
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -238,11 +238,20 @@ typedef struct SSchema { ...@@ -238,11 +238,20 @@ typedef struct SSchema {
} SSchema; } SSchema;
typedef struct { typedef struct {
int32_t vgId;
int32_t vnode; //the index of vnode int32_t vnode; //the index of vnode
uint32_t ip; uint32_t ip;
} SVnodeDesc; } SVnodeDesc;
typedef struct { typedef struct {
int32_t contLen;
int32_t vgId;
} SWriteMsgHead;
typedef struct {
int32_t contLen;
int32_t vgId;
int8_t tableType; int8_t tableType;
int16_t numOfColumns; int16_t numOfColumns;
int16_t numOfTags; int16_t numOfTags;
...@@ -250,7 +259,6 @@ typedef struct { ...@@ -250,7 +259,6 @@ typedef struct {
int32_t sversion; int32_t sversion;
int32_t tagDataLen; int32_t tagDataLen;
int32_t sqlDataLen; int32_t sqlDataLen;
int32_t contLen;
int32_t numOfVPeers; int32_t numOfVPeers;
uint64_t uid; uint64_t uid;
uint64_t superTableUid; uint64_t superTableUid;
...@@ -336,6 +344,7 @@ typedef struct { ...@@ -336,6 +344,7 @@ typedef struct {
} SMgmtHead; } SMgmtHead;
typedef struct { typedef struct {
int32_t vgId;
int32_t sid; int32_t sid;
int32_t numOfVPeers; int32_t numOfVPeers;
uint64_t uid; uint64_t uid;
......
...@@ -28,22 +28,6 @@ bool mgmtCheckQhandle(uint64_t qhandle); ...@@ -28,22 +28,6 @@ bool mgmtCheckQhandle(uint64_t qhandle);
void mgmtSaveQhandle(void *qhandle); void mgmtSaveQhandle(void *qhandle);
void mgmtFreeQhandle(void *qhandle); void mgmtFreeQhandle(void *qhandle);
enum {
TSDB_PROCESS_CREATE_VGROUP,
TSDB_PROCESS_CREATE_VGROUP_GET_META,
TSDB_PROCESS_CREATE_TABLE,
TSDB_PROCESS_CREATE_TABLE_GET_META,
};
typedef struct {
void *thandle; // come from uplayer
void *ahandle; // object to process
void *cont; // additional information of object to process
int32_t type; // the type of sync process
int32_t received; // num of received, such as numOfVnodes
int32_t contLen; // the length of additional information
} SProcessInfo;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -23,13 +23,16 @@ extern "C" { ...@@ -23,13 +23,16 @@ extern "C" {
int32_t mgmtInitShell(); int32_t mgmtInitShell();
void mgmtCleanUpShell(); void mgmtCleanUpShell();
void mgmtAddShellMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void mgmtAddShellMsgHandle(uint8_t msgType, void (*fp)(SQueuedMsg *queuedMsg));
typedef int32_t (*SShowMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn);
typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp); void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp);
void mgmtAddShellShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp); void mgmtAddShellShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp);
void mgmtAddToShellQueue(SQueuedMsg *queuedMsg);
void mgmtSendSimpleResp(void *thandle, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -33,7 +33,6 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); ...@@ -33,7 +33,6 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp); int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp);
int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
int32_t mgmtCreateTable(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore); int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore);
int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter); int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter);
...@@ -44,10 +43,6 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty); ...@@ -44,10 +43,6 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty);
SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable); SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable); SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle);
void mgmtProcessCreateTable(SVgObj *pVgroup, SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
void mgmtProcessCreateVgroup(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -29,7 +29,7 @@ void mgmtCleanUpVgroups(); ...@@ -29,7 +29,7 @@ void mgmtCleanUpVgroups();
SVgObj *mgmtGetVgroup(int32_t vgId); SVgObj *mgmtGetVgroup(int32_t vgId);
SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode); SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode);
SVgObj *mgmtCreateVgroup(SDbObj *pDb); void mgmtCreateVgroup(SQueuedMsg *pMsg);
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup); int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup);
void mgmtUpdateVgroup(SVgObj *pVgroup); void mgmtUpdateVgroup(SVgObj *pVgroup);
......
...@@ -56,10 +56,10 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) { ...@@ -56,10 +56,10 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
} }
if (selectedVnode == -1) { if (selectedVnode == -1) {
mError("vgroup:%d alloc vnode failed, free vnodes:%d", pVgroup->vgId, pDnode->numOfFreeVnodes); mError("alloc vnode failed, free vnodes:%d", pDnode->numOfFreeVnodes);
return -1; return -1;
} else { } else {
mTrace("vgroup:%d allocate vnode:%d, last allocated vnode:%d", pVgroup->vgId, selectedVnode, lastAllocVode); mTrace("allocate vnode:%d, last allocated vnode:%d", selectedVnode, lastAllocVode);
pVgroup->vnodeGid[0].vnode = selectedVnode; pVgroup->vnodeGid[0].vnode = selectedVnode;
pDnode->lastAllocVnode = selectedVnode + 1; pDnode->lastAllocVnode = selectedVnode + 1;
if (pDnode->lastAllocVnode >= pDnode->numOfVnodes) pDnode->lastAllocVnode = 0; if (pDnode->lastAllocVnode >= pDnode->numOfVnodes) pDnode->lastAllocVnode = 0;
......
...@@ -198,7 +198,7 @@ int32_t mgmtInitChildTables() { ...@@ -198,7 +198,7 @@ int32_t mgmtInitChildTables() {
tsChildTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; tsChildTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsChildTableSdb = sdbOpenTable(tsMaxTables, tsChildTableUpdateSize, tsChildTableSdb = sdbOpenTable(tsMaxTables, tsChildTableUpdateSize,
"ctables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtChildTableAction); "ctables", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtChildTableAction);
if (tsChildTableSdb == NULL) { if (tsChildTableSdb == NULL) {
mError("failed to init child table data"); mError("failed to init child table data");
return -1; return -1;
...@@ -359,7 +359,7 @@ int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj ...@@ -359,7 +359,7 @@ int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj
*pTableOut = (STableInfo *) pTable; *pTableOut = (STableInfo *) pTable;
mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , mTrace("table:%s, create ctable in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 ,
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -78,45 +78,12 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) { ...@@ -78,45 +78,12 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
if (mgmtProcessDnodeRspFp[rpcMsg->msgType]) { if (mgmtProcessDnodeRspFp[rpcMsg->msgType]) {
(*mgmtProcessDnodeRspFp[rpcMsg->msgType])(rpcMsg); (*mgmtProcessDnodeRspFp[rpcMsg->msgType])(rpcMsg);
} else { } else {
dError("%s is not processed", taosMsg[rpcMsg->msgType]); mError("%s is not processed in dclient", taosMsg[rpcMsg->msgType]);
} }
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
} }
//static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
// mTrace("create table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
// if (rpcMsg->handle == NULL) return;
//
// SProcessInfo *info = rpcMsg->handle;
// assert(info->type == TSDB_PROCESS_CREATE_TABLE || info->type == TSDB_PROCESS_CREATE_TABLE_GET_META);
//
// STableInfo *pTable = info->ahandle;
// if (rpcMsg->code != TSDB_CODE_SUCCESS) {
// mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId, rpcMsg->code);
// mgmtSetTableDirty(pTable, true);
// } else {
// mTrace("table:%s, created in dnode", pTable->tableId);
// mgmtSetTableDirty(pTable, false);
// }
//
// if (rpcMsg->code != TSDB_CODE_SUCCESS) {
// SRpcMsg rpcRsp = {.handle = info->thandle, .pCont = NULL, .contLen = 0, .code = rpcMsg->code, .msgType = 0};
// rpcSendResponse(&rpcMsg);
// } else {
// if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) {
// mTrace("table:%s, start to process get meta", pTable->tableId);
// mgmtProcessGetTableMeta(pTable, rpcMsg->handle);
// } else {
// SRpcMsg rpcRsp = {.handle = info->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
// rpcSendResponse(&rpcMsg);
// }
// }
//
// free(info);
//}
//
//static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { //static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
// mTrace("drop table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); // mTrace("drop table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
//} //}
...@@ -125,27 +92,6 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) { ...@@ -125,27 +92,6 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
// mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); // mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
//} //}
// //
//static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
// mTrace("create vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
// if (rpcMsg->handle == NULL) return;
//
// SProcessInfo *info = rpcMsg->handle;
// assert(info->type == TSDB_PROCESS_CREATE_VGROUP || info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META);
//
// info->received++;
// SVgObj *pVgroup = info->ahandle;
//
// bool isGetMeta = false;
// if (info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META) {
// isGetMeta = true;
// }
//
// mTrace("vgroup:%d, received:%d numOfVnodes:%d", pVgroup->vgId, info->received, pVgroup->numOfVnodes);
// if (info->received == pVgroup->numOfVnodes) {
// mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle, isGetMeta);
// free(info);
// }
//}
// //
//static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { //static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
// mTrace("drop vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); // mTrace("drop vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
......
...@@ -76,7 +76,7 @@ static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) { ...@@ -76,7 +76,7 @@ static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) {
if (mgmtProcessDnodeMsgFp[rpcMsg->msgType]) { if (mgmtProcessDnodeMsgFp[rpcMsg->msgType]) {
(*mgmtProcessDnodeMsgFp[rpcMsg->msgType])(rpcMsg); (*mgmtProcessDnodeMsgFp[rpcMsg->msgType])(rpcMsg);
} else { } else {
mError("%s is not processed", taosMsg[rpcMsg->msgType]); mError("%s is not processed in dserver", taosMsg[rpcMsg->msgType]);
} }
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
...@@ -210,22 +210,6 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s ...@@ -210,22 +210,6 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s
// } // }
//} //}
// //
//void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
// mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
// for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
// mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
// }
//}
//
//void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle);
// SMDCreateVnodeMsg *pVpeer = mgmtBuildCreateVnodeMsg(pVgroup, vnode);
// if (pVpeer != NULL) {
// mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_MD_CREATE_VNODE, pVpeer, sizeof(SMDCreateVnodeMsg), ahandle);
// }
//}
//
//void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { //void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
// if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { // if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
// mError("invalid msg type:%d", msgType); // mError("invalid msg type:%d", msgType);
......
...@@ -41,9 +41,9 @@ static int32_t mgmtDropDb(SDbObj *pDb); ...@@ -41,9 +41,9 @@ static int32_t mgmtDropDb(SDbObj *pDb);
static int32_t mgmtGetDbMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetDbMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg); static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg);
static void mgmtProcessAlterDbMsg(SRpcMsg *rpcMsg); static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg);
static void mgmtProcessDropDbMsg(SRpcMsg *rpcMsg); static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg);
static void *(*mgmtDbActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); static void *(*mgmtDbActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize); static void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
...@@ -81,7 +81,7 @@ int32_t mgmtInitDbs() { ...@@ -81,7 +81,7 @@ int32_t mgmtInitDbs() {
SDbObj tObj; SDbObj tObj;
tsDbUpdateSize = tObj.updateEnd - (char *)&tObj; tsDbUpdateSize = tObj.updateEnd - (char *)&tObj;
tsDbSdb = sdbOpenTable(tsMaxDbs, tsDbUpdateSize, "dbs", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtDbAction); tsDbSdb = sdbOpenTable(tsMaxDbs, tsDbUpdateSize, "dbs", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtDbAction);
if (tsDbSdb == NULL) { if (tsDbSdb == NULL) {
mError("failed to init db data"); mError("failed to init db data");
return -1; return -1;
...@@ -383,6 +383,7 @@ static void mgmtDropDbFromSdb(SDbObj *pDb) { ...@@ -383,6 +383,7 @@ static void mgmtDropDbFromSdb(SDbObj *pDb) {
} }
static int32_t mgmtDropDb(SDbObj *pDb) { static int32_t mgmtDropDb(SDbObj *pDb) {
if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) { if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) {
bool finished = mgmtCheckDropDbFinished(pDb); bool finished = mgmtCheckDropDbFinished(pDb);
if (!finished) { if (!finished) {
...@@ -405,6 +406,7 @@ static int32_t mgmtDropDb(SDbObj *pDb) { ...@@ -405,6 +406,7 @@ static int32_t mgmtDropDb(SDbObj *pDb) {
} }
} }
UNUSED_FUNC
static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) { static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) {
SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, name); SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, name);
if (pDb == NULL) { if (pDb == NULL) {
...@@ -904,19 +906,10 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) { ...@@ -904,19 +906,10 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfTables, -1); atomic_add_fetch_32(&pDb->numOfTables, -1);
} }
static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) { static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; if (mgmtCheckRedirect(pMsg->thandle)) return;
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SCMCreateDbMsg *pCreate = (SCMCreateDbMsg *) rpcMsg->pCont;
SCMCreateDbMsg *pCreate = pMsg->pCont;
pCreate->maxSessions = htonl(pCreate->maxSessions); pCreate->maxSessions = htonl(pCreate->maxSessions);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCreate->daysPerFile = htonl(pCreate->daysPerFile); pCreate->daysPerFile = htonl(pCreate->daysPerFile);
...@@ -928,69 +921,58 @@ static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) { ...@@ -928,69 +921,58 @@ static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) {
pCreate->rowsInFileBlock = htonl(pCreate->rowsInFileBlock); pCreate->rowsInFileBlock = htonl(pCreate->rowsInFileBlock);
// pCreate->cacheNumOfBlocks = htonl(pCreate->cacheNumOfBlocks); // pCreate->cacheNumOfBlocks = htonl(pCreate->cacheNumOfBlocks);
int32_t code;
if (mgmtCheckExpired()) { if (mgmtCheckExpired()) {
rpcRsp.code = TSDB_CODE_GRANT_EXPIRED; code = TSDB_CODE_GRANT_EXPIRED;
} else if (!pUser->writeAuth) { } else if (!pMsg->pUser->writeAuth) {
rpcRsp.code = TSDB_CODE_NO_RIGHTS; code = TSDB_CODE_NO_RIGHTS;
} else { } else {
rpcRsp.code = mgmtCreateDb(pUser->pAcct, pCreate); code = mgmtCreateDb(pMsg->pUser->pAcct, pCreate);
if (rpcRsp.code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
mLPrint("DB:%s is created by %s", pCreate->db, pUser->user); mLPrint("DB:%s is created by %s", pCreate->db, pMsg->pUser->user);
} }
} }
rpcSendResponse(&rpcRsp); mgmtSendSimpleResp(pMsg->thandle, code);
} }
static void mgmtProcessAlterDbMsg(SRpcMsg *rpcMsg) { static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; if (mgmtCheckRedirect(pMsg->thandle)) return;
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SCMAlterDbMsg *pAlter = (SCMAlterDbMsg *) rpcMsg->pCont; SCMAlterDbMsg *pAlter = pMsg->pCont;
pAlter->daysPerFile = htonl(pAlter->daysPerFile); pAlter->daysPerFile = htonl(pAlter->daysPerFile);
pAlter->daysToKeep = htonl(pAlter->daysToKeep); pAlter->daysToKeep = htonl(pAlter->daysToKeep);
pAlter->maxSessions = htonl(pAlter->maxSessions) + 1; pAlter->maxSessions = htonl(pAlter->maxSessions) + 1;
if (!pUser->writeAuth) { int32_t code;
rpcRsp.code = TSDB_CODE_NO_RIGHTS; if (!pMsg->pUser->writeAuth) {
code = TSDB_CODE_NO_RIGHTS;
} else { } else {
rpcRsp.code = mgmtAlterDb(pUser->pAcct, pAlter); code = mgmtAlterDb(pMsg->pUser->pAcct, pAlter);
if (rpcRsp.code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
mLPrint("DB:%s is altered by %s", pAlter->db, pUser->user); mLPrint("DB:%s is altered by %s", pAlter->db, pMsg->pUser->user);
} }
} }
rpcSendResponse(&rpcRsp); mgmtSendSimpleResp(pMsg->thandle, code);
} }
static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
static void mgmtProcessDropDbMsg(SRpcMsg *rpcMsg) { int32_t code;
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; if (pMsg->pUser->superAuth) {
if (mgmtCheckRedirect(rpcMsg->handle)) return; code = TSDB_CODE_OPS_NOT_SUPPORT;
//SCMDropDbMsg *pDrop = rpcMsg->pCont;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); //rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists);
if (pUser == NULL) { //if (rpcRsp.code == TSDB_CODE_SUCCESS) {
rpcRsp.code = TSDB_CODE_INVALID_USER; // mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user);
rpcSendResponse(&rpcRsp); //}
return ;
}
if (pUser->superAuth) {
SCMDropDbMsg *pDrop = rpcMsg->pCont;
rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists);
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user);
}
} else { } else {
rpcRsp.code = TSDB_CODE_NO_RIGHTS; code = TSDB_CODE_NO_RIGHTS;
} }
rpcSendResponse(&rpcRsp); if (code != TSDB_CODE_SUCCESS) {
mgmtSendSimpleResp(pMsg->thandle, code);
}
} }
...@@ -43,7 +43,7 @@ static int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn ...@@ -43,7 +43,7 @@ static int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn
static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg); static void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg);
void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) {
int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore; int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore;
...@@ -93,7 +93,7 @@ void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId) { ...@@ -93,7 +93,7 @@ void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId) {
memset(pVload, 0, sizeof(SVnodeLoad)); memset(pVload, 0, sizeof(SVnodeLoad));
pVload->vnode = vnodeGid[i].vnode; pVload->vnode = vnodeGid[i].vnode;
pVload->vgId = vgId; pVload->vgId = vgId;
mTrace("dnode:%s, vnode:%d add to vgroup:%d", taosIpStr(vnodeGid[i].ip), vnodeGid[i].vnode, pVload->vgId); mTrace("dnode:%s, vnode:%d add to vgroup:%d", taosIpStr(pDnode->privateIp), vnodeGid[i].vnode, pVload->vgId);
mgmtCalcNumOfFreeVnodes(pDnode); mgmtCalcNumOfFreeVnodes(pDnode);
} else { } else {
mError("dnode:%s, not in dnode DB!!!", taosIpStr(vnodeGid[i].ip)); mError("dnode:%s, not in dnode DB!!!", taosIpStr(vnodeGid[i].ip));
...@@ -527,21 +527,14 @@ bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode) { ...@@ -527,21 +527,14 @@ bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode) {
return pDnode->status == TSDB_DN_STATUS_OFFLINE; return pDnode->status == TSDB_DN_STATUS_OFFLINE;
} }
void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg) { void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(rpcMsg->handle)) return; if (mgmtCheckRedirect(pMsg->thandle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); SCMCfgDnodeMsg *pCmCfgDnode = pMsg->pCont;
if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SCMCfgDnodeMsg *pCmCfgDnode = (SCMCfgDnodeMsg *) rpcMsg->pCont;
uint32_t dnodeIp = inet_addr(pCmCfgDnode->ip); uint32_t dnodeIp = inet_addr(pCmCfgDnode->ip);
if (strcmp(pUser->pAcct->user, "root") != 0) { if (strcmp(pMsg->pUser->pAcct->user, "root") != 0) {
rpcRsp.code = TSDB_CODE_NO_RIGHTS; rpcRsp.code = TSDB_CODE_NO_RIGHTS;
} else { } else {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(dnodeIp); SRpcIpSet ipSet = mgmtGetIpSetFromIp(dnodeIp);
...@@ -560,7 +553,7 @@ void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg) { ...@@ -560,7 +553,7 @@ void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg) {
} }
if (rpcRsp.code == TSDB_CODE_SUCCESS) { if (rpcRsp.code == TSDB_CODE_SUCCESS) {
mTrace("dnode:%s is configured by %s", pCmCfgDnode->ip, pUser->user); mTrace("dnode:%s is configured by %s", pCmCfgDnode->ip, pMsg->pUser->user);
} }
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
......
...@@ -31,7 +31,6 @@ ...@@ -31,7 +31,6 @@
#include "mgmtShell.h" #include "mgmtShell.h"
static int32_t mgmtCheckMgmtRunning(); static int32_t mgmtCheckMgmtRunning();
char tsMgmtDirectory[128] = {0};
void *tsMgmtTmr = NULL; void *tsMgmtTmr = NULL;
int32_t mgmtInitSystem() { int32_t mgmtInitSystem() {
...@@ -41,7 +40,7 @@ int32_t mgmtInitSystem() { ...@@ -41,7 +40,7 @@ int32_t mgmtInitSystem() {
} }
struct stat dirstat; struct stat dirstat;
bool fileExist = (stat(tsMgmtDirectory, &dirstat) == 0); bool fileExist = (stat(tsMnodeDir, &dirstat) == 0);
bool asMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0); bool asMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0);
if (asMaster || fileExist) { if (asMaster || fileExist) {
...@@ -57,8 +56,8 @@ int32_t mgmtStartSystem() { ...@@ -57,8 +56,8 @@ int32_t mgmtStartSystem() {
mPrint("starting to initialize TDengine mgmt ..."); mPrint("starting to initialize TDengine mgmt ...");
struct stat dirstat; struct stat dirstat;
if (stat(tsMgmtDirectory, &dirstat) < 0) { if (stat(tsMnodeDir, &dirstat) < 0) {
mkdir(tsMgmtDirectory, 0755); mkdir(tsMnodeDir, 0755);
} }
if (mgmtCheckMgmtRunning() != 0) { if (mgmtCheckMgmtRunning() != 0) {
...@@ -110,7 +109,7 @@ int32_t mgmtStartSystem() { ...@@ -110,7 +109,7 @@ int32_t mgmtStartSystem() {
return -1; return -1;
} }
if (sdbInitPeers(tsMgmtDirectory) < 0) { if (sdbInitPeers(tsMnodeDir) < 0) {
mError("failed to init peers"); mError("failed to init peers");
return -1; return -1;
} }
...@@ -132,7 +131,7 @@ void mgmtStopSystem() { ...@@ -132,7 +131,7 @@ void mgmtStopSystem() {
} }
mgmtCleanUpSystem(); mgmtCleanUpSystem();
remove(tsMgmtDirectory); remove(tsMnodeDir);
} }
void mgmtCleanUpSystem() { void mgmtCleanUpSystem() {
......
...@@ -224,7 +224,7 @@ int32_t mgmtInitNormalTables() { ...@@ -224,7 +224,7 @@ int32_t mgmtInitNormalTables() {
tsNormalTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; tsNormalTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsNormalTableSdb = sdbOpenTable(tsMaxTables, sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS, tsNormalTableSdb = sdbOpenTable(tsMaxTables, sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS,
"ntables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtNormalTableAction); "ntables", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtNormalTableAction);
if (tsNormalTableSdb == NULL) { if (tsNormalTableSdb == NULL) {
mError("failed to init ntables data"); mError("failed to init ntables data");
return -1; return -1;
...@@ -393,7 +393,7 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb ...@@ -393,7 +393,7 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb
*pTableOut = (STableInfo *) pTable; *pTableOut = (STableInfo *) pTable;
mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , mTrace("table:%s, create ntable in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 ,
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -558,9 +558,11 @@ bool mgmtCheckQhandle(uint64_t qhandle) { ...@@ -558,9 +558,11 @@ bool mgmtCheckQhandle(uint64_t qhandle) {
} }
void mgmtSaveQhandle(void *qhandle) { void mgmtSaveQhandle(void *qhandle) {
mTrace("qhandle:%p is allocated", qhandle);
} }
void mgmtFreeQhandle(void *qhandle) { void mgmtFreeQhandle(void *qhandle) {
mTrace("qhandle:%p is freed", qhandle);
} }
int mgmtGetConns(SShowObj *pShow, void *pConn) { int mgmtGetConns(SShowObj *pShow, void *pConn) {
...@@ -673,72 +675,72 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn ...@@ -673,72 +675,72 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn
return numOfRows; return numOfRows;
} }
void mgmtProcessKillQueryMsg(SRpcMsg *rpcMsg) { void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(rpcMsg->handle)) return; if (mgmtCheckRedirect(pMsg->thandle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle);
if (pUser == NULL) { if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER; rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
return; return;
} }
SCMKillQueryMsg *pKill = (SCMKillQueryMsg *) rpcMsg->pCont; SCMKillQueryMsg *pKill = pMsg->pCont;
int32_t code; int32_t code;
if (!pUser->writeAuth) { if (!pUser->writeAuth) {
code = TSDB_CODE_NO_RIGHTS; code = TSDB_CODE_NO_RIGHTS;
} else { } else {
code = mgmtKillQuery(pKill->queryId, rpcMsg->handle); code = mgmtKillQuery(pKill->queryId, pMsg->thandle);
} }
rpcRsp.code = code; rpcRsp.code = code;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
void mgmtProcessKillStreamMsg(SRpcMsg *rpcMsg) { void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(rpcMsg->handle)) return; if (mgmtCheckRedirect(pMsg->thandle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle);
if (pUser == NULL) { if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER; rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
return; return;
} }
SCMKillStreamMsg *pKill = (SCMKillStreamMsg *) rpcMsg->pCont; SCMKillStreamMsg *pKill = pMsg->pCont;
int32_t code; int32_t code;
if (!pUser->writeAuth) { if (!pUser->writeAuth) {
code = TSDB_CODE_NO_RIGHTS; code = TSDB_CODE_NO_RIGHTS;
} else { } else {
code = mgmtKillStream(pKill->queryId, rpcMsg->handle); code = mgmtKillStream(pKill->queryId, pMsg->thandle);
} }
rpcRsp.code = code; rpcRsp.code = code;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
void mgmtProcessKillConnectionMsg(SRpcMsg *rpcMsg) { void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(rpcMsg->handle)) return; if (mgmtCheckRedirect(pMsg->thandle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle);
if (pUser == NULL) { if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER; rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
return; return;
} }
SCMKillConnMsg *pKill = (SCMKillConnMsg *) rpcMsg->pCont; SCMKillConnMsg *pKill = pMsg->pCont;
int32_t code; int32_t code;
if (!pUser->writeAuth) { if (!pUser->writeAuth) {
code = TSDB_CODE_NO_RIGHTS; code = TSDB_CODE_NO_RIGHTS;
} else { } else {
code = mgmtKillConnection(pKill->queryId, rpcMsg->handle); code = mgmtKillConnection(pKill->queryId, pMsg->thandle);
} }
rpcRsp.code = code; rpcRsp.code = code;
......
...@@ -41,24 +41,27 @@ ...@@ -41,24 +41,27 @@
typedef int32_t (*SShowMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn);
typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessMsgFromShell(SRpcMsg *pMsg);
static void mgmtProcessShowMsg(SRpcMsg *rpcMsg);
static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg);
static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg);
static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont); static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont);
static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg); static void mgmtProcessMsgFromShell(SRpcMsg *pMsg);
static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg); static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg);
static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg);
static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessHeartBeatMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg);
static void *tsMgmtShellRpc = NULL; static void *tsMgmtShellRpc = NULL;
static void *tsMgmtTranQhandle = NULL; static void *tsMgmtTranQhandle = NULL;
static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *) = {0}; static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0};
static SShowMetaFp tsMgmtShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowMetaFp tsMgmtShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0};
static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0};
int32_t mgmtInitShell() { int32_t mgmtInitShell() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_RETRIEVE, mgmtProcessRetrieveMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_RETRIEVE, mgmtProcessRetrieveMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg);
tsMgmtTranQhandle = taosInitScheduler(tsMaxDnodes + tsMaxShellConns, 1, "mnodeT"); tsMgmtTranQhandle = taosInitScheduler(tsMaxDnodes + tsMaxShellConns, 1, "mnodeT");
...@@ -84,9 +87,6 @@ int32_t mgmtInitShell() { ...@@ -84,9 +87,6 @@ int32_t mgmtInitShell() {
return -1; return -1;
} }
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg);
mPrint("server connection to shell is opened"); mPrint("server connection to shell is opened");
return 0; return 0;
} }
...@@ -104,7 +104,7 @@ void mgmtCleanUpShell() { ...@@ -104,7 +104,7 @@ void mgmtCleanUpShell() {
} }
} }
void mgmtAddShellMsgHandle(uint8_t showType, void (*fp)(SRpcMsg *rpcMsg)) { void mgmtAddShellMsgHandle(uint8_t showType, void (*fp)(SQueuedMsg *queuedMsg)) {
tsMgmtProcessShellMsgFp[showType] = fp; tsMgmtProcessShellMsgFp[showType] = fp;
} }
...@@ -117,107 +117,117 @@ void mgmtAddShellShowRetrieveHandle(uint8_t msgType, SShowRetrieveFp fp) { ...@@ -117,107 +117,117 @@ void mgmtAddShellShowRetrieveHandle(uint8_t msgType, SShowRetrieveFp fp) {
} }
void mgmtProcessTranRequest(SSchedMsg *sched) { void mgmtProcessTranRequest(SSchedMsg *sched) {
SRpcMsg *rpcMsg = sched->msg; SQueuedMsg *queuedMsg = sched->msg;
(*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(rpcMsg); (*tsMgmtProcessShellMsgFp[queuedMsg->msgType])(queuedMsg);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(queuedMsg->pCont);
free(rpcMsg); free(queuedMsg);
} }
void mgmtAddToTranRequest(SRpcMsg *rpcMsg) { void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) {
SRpcMsg *queuedRpcMsg = malloc(sizeof(SRpcMsg));
memcpy(queuedRpcMsg, rpcMsg, sizeof(SRpcMsg));
SSchedMsg schedMsg; SSchedMsg schedMsg;
schedMsg.msg = queuedRpcMsg; schedMsg.msg = queuedMsg;
schedMsg.fp = mgmtProcessTranRequest; schedMsg.fp = mgmtProcessTranRequest;
taosScheduleTask(tsMgmtTranQhandle, &schedMsg); taosScheduleTask(tsMgmtTranQhandle, &schedMsg);
} }
static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
if (sdbGetRunStatus() != SDB_STATUS_SERVING) { if (sdbGetRunStatus() != SDB_STATUS_SERVING) {
mTrace("shell msg is ignored since SDB is not ready"); mgmtProcessMsgWhileNotReady(rpcMsg);
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = TSDB_CODE_NOT_READY, .msgType = 0};
rpcSendResponse(&rpcRsp);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} }
mTrace("%s is received", taosMsg[rpcMsg->msgType]); if (tsMgmtProcessShellMsgFp[rpcMsg->msgType] == NULL) {
if (tsMgmtProcessShellMsgFp[rpcMsg->msgType]) {
if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) {
(*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(rpcMsg);
rpcFreeCont(rpcMsg->pCont);
} else {
mgmtAddToTranRequest(rpcMsg);
}
} else {
mError("%s is not processed", taosMsg[rpcMsg->msgType]);
mgmtProcessUnSupportMsg(rpcMsg); mgmtProcessUnSupportMsg(rpcMsg);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return;
} }
}
static void mgmtProcessShowMsg(SRpcMsg *rpcMsg) { SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; if (pUser == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER);
rpcFreeCont(rpcMsg->pCont);
return;
}
if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) {
SQueuedMsg queuedMsg = {0};
queuedMsg.thandle = rpcMsg->handle;
queuedMsg.msgType = rpcMsg->msgType;
queuedMsg.contLen = rpcMsg->contLen;
queuedMsg.pCont = rpcMsg->pCont;
queuedMsg.pUser = pUser;
(*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(&queuedMsg);
rpcFreeCont(rpcMsg->pCont);
} else {
SQueuedMsg *queuedMsg = calloc(1, sizeof(SQueuedMsg));
queuedMsg->thandle = rpcMsg->handle;
queuedMsg->msgType = rpcMsg->msgType;
queuedMsg->contLen = rpcMsg->contLen;
queuedMsg->pCont = rpcMsg->pCont;
queuedMsg->pUser = pUser;
mgmtAddToShellQueue(queuedMsg);
}
}
SCMShowMsg *pShowMsg = rpcMsg->pCont; static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
SCMShowMsg *pShowMsg = pMsg->pCont;
if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) { if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) {
if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { if (mgmtCheckRedirect(pMsg->thandle)) {
return; return;
} }
} }
int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_MSG_TYPE);
return;
}
if (!tsMgmtShowMetaFp[pShowMsg->type]) {
mError("show type:%d %s is not support", pShowMsg->type, taosMsg[pShowMsg->type]);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OPS_NOT_SUPPORT);
return;
}
int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
SCMShowRsp *pShowRsp = rpcMallocCont(size); SCMShowRsp *pShowRsp = rpcMallocCont(size);
if (pShowRsp == NULL) { if (pShowRsp == NULL) {
rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
rpcSendResponse(&rpcRsp);
return; return;
} }
int32_t code; SShowObj *pShow = (SShowObj *) calloc(1, sizeof(SShowObj) + htons(pShowMsg->payloadLen));
if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) { pShow->signature = pShow;
code = TSDB_CODE_INVALID_MSG_TYPE; pShow->type = pShowMsg->type;
pShow->payloadLen = htons(pShowMsg->payloadLen);
strcpy(pShow->db, pShowMsg->db);
memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen);
mgmtSaveQhandle(pShow);
pShowRsp->qhandle = htobe64((uint64_t) pShow);
int32_t code = (*tsMgmtShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->thandle);
if (code == 0) {
SRpcMsg rpcRsp = {
.handle = pMsg->thandle,
.pCont = pShowRsp,
.contLen = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns,
.code = code,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
} else { } else {
SShowObj *pShow = (SShowObj *) calloc(1, sizeof(SShowObj) + htons(pShowMsg->payloadLen)); mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type, taosMsg[pShowMsg->type], code);
pShow->signature = pShow; mgmtFreeQhandle(pShow);
pShow->type = pShowMsg->type; rpcFreeCont(pShowRsp);
strcpy(pShow->db, pShowMsg->db);
mTrace("pShow:%p is allocated", pShow);
// set the table name query condition
pShow->payloadLen = htons(pShowMsg->payloadLen);
memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen);
mgmtSaveQhandle(pShow);
pShowRsp->qhandle = htobe64((uint64_t) pShow);
if (tsMgmtShowMetaFp[pShowMsg->type]) {
code = (*tsMgmtShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, rpcMsg->handle);
if (code == 0) {
size = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
} else {
mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type,
taosMsg[(uint8_t) pShowMsg->type], code);
free(pShow);
}
} else {
code = TSDB_CODE_OPS_NOT_SUPPORT;
}
} }
rpcRsp.code = code;
rpcRsp.pCont = pShowRsp;
rpcRsp.contLen = size;
rpcSendResponse(&rpcRsp);
} }
static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
int32_t rowsToRead = 0; int32_t rowsToRead = 0;
int32_t size = 0; int32_t size = 0;
int32_t rowsRead = 0; int32_t rowsRead = 0;
SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *) rpcMsg->pCont; SRetrieveTableMsg *pRetrieve = pMsg->pCont;
pRetrieve->qhandle = htobe64(pRetrieve->qhandle); pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
/* /*
...@@ -226,16 +236,14 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { ...@@ -226,16 +236,14 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) {
*/ */
if (!mgmtCheckQhandle(pRetrieve->qhandle)) { if (!mgmtCheckQhandle(pRetrieve->qhandle)) {
mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pRetrieve->qhandle); mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pRetrieve->qhandle);
rpcRsp.code = TSDB_CODE_INVALID_QHANDLE; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_QHANDLE);
rpcSendResponse(&rpcRsp);
return; return;
} }
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
if (pShow->signature != (void *)pShow) { if (!mgmtCheckQhandle(pRetrieve->qhandle)) {
mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature); mError("pShow:%p, query memory is corrupted", pShow);
rpcRsp.code = TSDB_CODE_MEMORY_CORRUPTED; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MEMORY_CORRUPTED);
rpcSendResponse(&rpcRsp);
return; return;
} else { } else {
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
...@@ -258,10 +266,9 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { ...@@ -258,10 +266,9 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) {
// if free flag is set, client wants to clean the resources // if free flag is set, client wants to clean the resources
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE)
rowsRead = (*tsMgmtShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, rpcMsg->handle); rowsRead = (*tsMgmtShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, pMsg->thandle);
if (rowsRead < 0) { if (rowsRead < 0) { // TSDB_CODE_ACTION_IN_PROGRESS;
rowsRead = 0; // TSDB_CODE_ACTION_IN_PROGRESS;
rpcFreeCont(pRsp); rpcFreeCont(pRsp);
return; return;
} }
...@@ -269,8 +276,13 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { ...@@ -269,8 +276,13 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) {
pRsp->numOfRows = htonl(rowsRead); pRsp->numOfRows = htonl(rowsRead);
pRsp->precision = htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision pRsp->precision = htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision
rpcRsp.pCont = pRsp; SRpcMsg rpcRsp = {
rpcRsp.contLen = size; .handle = pMsg->thandle,
.pCont = pRsp,
.contLen = size,
.code = 0,
.msgType = 0
};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
if (rowsToRead == 0) { if (rowsToRead == 0) {
...@@ -278,21 +290,19 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { ...@@ -278,21 +290,19 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) {
} }
} }
static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
//SCMHeartBeatMsg *pHBMsg = (SCMHeartBeatMsg *) rpcMsg->pCont; //SCMHeartBeatMsg *pHBMsg = (SCMHeartBeatMsg *) rpcMsg->pCont;
//mgmtSaveQueryStreamList(pHBMsg); //mgmtSaveQueryStreamList(pHBMsg);
SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp)); SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp));
if (pHBRsp == NULL) { if (pHBRsp == NULL) {
rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
rpcSendResponse(&rpcRsp);
return; return;
} }
SRpcConnInfo connInfo; SRpcConnInfo connInfo;
if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) { if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) {
mError("conn:%p is already released while process heart beat msg", rpcMsg->handle); mError("conn:%p is already released while process heart beat msg", pMsg->thandle);
return; return;
} }
...@@ -320,8 +330,13 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { ...@@ -320,8 +330,13 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) {
pHBRsp->streamId = 0; pHBRsp->streamId = 0;
pHBRsp->killConnection = 0; pHBRsp->killConnection = 0;
rpcRsp.pCont = pHBRsp; SRpcMsg rpcRsp = {
rpcRsp.contLen = sizeof(SCMHeartBeatRsp); .handle = pMsg->thandle,
.pCont = pHBRsp,
.contLen = sizeof(SCMHeartBeatRsp),
.code = 0,
.msgType = 0
};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
...@@ -340,13 +355,13 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr ...@@ -340,13 +355,13 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr
} }
} }
static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg) { static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCMConnectMsg *pConnectMsg = (SCMConnectMsg *) rpcMsg->pCont; SCMConnectMsg *pConnectMsg = pMsg->pCont;
SRpcConnInfo connInfo; SRpcConnInfo connInfo;
if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) { if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) {
mError("conn:%p is already released while process connect msg", rpcMsg->handle); mError("thandle:%p is already released while process connect msg", pMsg->thandle);
return; return;
} }
...@@ -450,6 +465,7 @@ static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) { ...@@ -450,6 +465,7 @@ static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) {
} }
static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) { static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) {
mError("%s is not processed in shell", taosMsg[rpcMsg->msgType]);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = 0, .msgType = 0,
.pCont = 0, .pCont = 0,
...@@ -459,3 +475,26 @@ static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) { ...@@ -459,3 +475,26 @@ static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) {
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg) {
mTrace("%s is ignored since SDB is not ready", taosMsg[rpcMsg->msgType]);
SRpcMsg rpcRsp = {
.msgType = 0,
.pCont = 0,
.contLen = 0,
.code = TSDB_CODE_NOT_READY,
.handle = rpcMsg->handle
};
rpcSendResponse(&rpcRsp);
}
void mgmtSendSimpleResp(void *thandle, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = 0,
.pCont = 0,
.contLen = 0,
.code = code,
.handle = thandle
};
rpcSendResponse(&rpcRsp);
}
...@@ -165,7 +165,7 @@ int32_t mgmtInitSuperTables() { ...@@ -165,7 +165,7 @@ int32_t mgmtInitSuperTables() {
mgmtSuperTableActionInit(); mgmtSuperTableActionInit();
tsSuperTableSdb = sdbOpenTable(tsMaxTables, tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS, tsSuperTableSdb = sdbOpenTable(tsMaxTables, tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS,
"stables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtSuperTableAction); "stables", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtSuperTableAction);
if (tsSuperTableSdb == NULL) { if (tsSuperTableSdb == NULL) {
mError("failed to init stables data"); mError("failed to init stables data");
return -1; return -1;
......
此差异已折叠。
...@@ -33,9 +33,9 @@ static int32_t mgmtUpdateUser(SUserObj *pUser); ...@@ -33,9 +33,9 @@ static int32_t mgmtUpdateUser(SUserObj *pUser);
static int32_t mgmtGetUserMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetUserMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg); static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg);
static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg); static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg);
static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg); static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg);
static void *(*mgmtUserActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); static void *(*mgmtUserActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtUserActionInsert(void *row, char *str, int32_t size, int32_t *ssize); static void *mgmtUserActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
...@@ -59,7 +59,7 @@ int32_t mgmtInitUsers() { ...@@ -59,7 +59,7 @@ int32_t mgmtInitUsers() {
SUserObj tObj; SUserObj tObj;
tsUserUpdateSize = tObj.updateEnd - (int8_t *)&tObj; tsUserUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsUserSdb = sdbOpenTable(tsMaxUsers, tsUserUpdateSize, "users", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtUserAction); tsUserSdb = sdbOpenTable(tsMaxUsers, tsUserUpdateSize, "users", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtUserAction);
if (tsUserSdb == NULL) { if (tsUserSdb == NULL) {
mError("failed to init user data"); mError("failed to init user data");
return -1; return -1;
...@@ -337,52 +337,40 @@ SUserObj *mgmtGetUserFromConn(void *pConn) { ...@@ -337,52 +337,40 @@ SUserObj *mgmtGetUserFromConn(void *pConn) {
return NULL; return NULL;
} }
static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg) { static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; if (mgmtCheckRedirect(pMsg->thandle)) return;
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
int32_t code;
SUserObj *pUser = pMsg->pUser;
if (pUser->superAuth) { if (pUser->superAuth) {
SCMCreateUserMsg *pCreate = rpcMsg->pCont; SCMCreateUserMsg *pCreate = pMsg->pCont;
rpcRsp.code = mgmtCreateUser(pUser->pAcct, pCreate->user, pCreate->pass); code = mgmtCreateUser(pUser->pAcct, pCreate->user, pCreate->pass);
if (rpcRsp.code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
mLPrint("user:%s is created by %s", pCreate->user, pUser->user); mLPrint("user:%s is created by %s", pCreate->user, pUser->user);
} }
} else { } else {
rpcRsp.code = TSDB_CODE_NO_RIGHTS; code = TSDB_CODE_NO_RIGHTS;
} }
rpcSendResponse(&rpcRsp); mgmtSendSimpleResp(pMsg->thandle, code);
} }
static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) { static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; if (mgmtCheckRedirect(pMsg->thandle)) return;
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SUserObj *pOperUser = mgmtGetUserFromConn(rpcMsg->handle); int32_t code;
if (pOperUser == NULL) { SUserObj *pOperUser = pMsg->pUser;
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp); SCMAlterUserMsg *pAlter = pMsg->pCont;
return;
}
SCMAlterUserMsg *pAlter = rpcMsg->pCont;
SUserObj *pUser = mgmtGetUser(pAlter->user); SUserObj *pUser = mgmtGetUser(pAlter->user);
if (pUser == NULL) { if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER);
rpcSendResponse(&rpcRsp);
return; return;
} }
if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
rpcRsp.code = TSDB_CODE_NO_RIGHTS; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
rpcSendResponse(&rpcRsp);
return; return;
} }
...@@ -405,13 +393,13 @@ static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) { ...@@ -405,13 +393,13 @@ static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) {
if (hasRight) { if (hasRight) {
memset(pUser->pass, 0, sizeof(pUser->pass)); memset(pUser->pass, 0, sizeof(pUser->pass));
taosEncryptPass((uint8_t*)pAlter->pass, strlen(pAlter->pass), pUser->pass); taosEncryptPass((uint8_t*)pAlter->pass, strlen(pAlter->pass), pUser->pass);
rpcRsp.code = mgmtUpdateUser(pUser); code = mgmtUpdateUser(pUser);
mLPrint("user:%s password is altered by %s, code:%d", pAlter->user, pUser->user, rpcRsp.code); mLPrint("user:%s password is altered by %s, code:%d", pAlter->user, pUser->user, code);
} else { } else {
rpcRsp.code = TSDB_CODE_NO_RIGHTS; code = TSDB_CODE_NO_RIGHTS;
} }
rpcSendResponse(&rpcRsp); mgmtSendSimpleResp(pMsg->thandle, code);
return; return;
} }
...@@ -454,42 +442,34 @@ static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) { ...@@ -454,42 +442,34 @@ static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) {
pUser->writeAuth = 1; pUser->writeAuth = 1;
} }
rpcRsp.code = mgmtUpdateUser(pUser); code = mgmtUpdateUser(pUser);
mLPrint("user:%s privilege is altered by %s, code:%d", pAlter->user, pUser->user, rpcRsp.code); mLPrint("user:%s privilege is altered by %s, code:%d", pAlter->user, pUser->user, code);
} else { } else {
rpcRsp.code = TSDB_CODE_NO_RIGHTS; code = TSDB_CODE_NO_RIGHTS;
} }
rpcSendResponse(&rpcRsp); mgmtSendSimpleResp(pMsg->thandle, code);
return; return;
} }
rpcRsp.code = TSDB_CODE_NO_RIGHTS; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
rpcSendResponse(&rpcRsp);
} }
static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) { static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; if (mgmtCheckRedirect(pMsg->thandle)) return;
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SUserObj *pOperUser = mgmtGetUserFromConn(rpcMsg->handle); int32_t code;
if (pOperUser == NULL) { SUserObj *pOperUser = pMsg->pUser;
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return ;
}
SCMDropUserMsg *pDrop = rpcMsg->pCont; SCMDropUserMsg *pDrop = pMsg->pCont;
SUserObj *pUser = mgmtGetUser(pDrop->user); SUserObj *pUser = mgmtGetUser(pDrop->user);
if (pUser == NULL) { if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER);
rpcSendResponse(&rpcRsp);
return ; return ;
} }
if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
rpcRsp.code = TSDB_CODE_NO_RIGHTS; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
rpcSendResponse(&rpcRsp);
return ; return ;
} }
...@@ -511,13 +491,13 @@ static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) { ...@@ -511,13 +491,13 @@ static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) {
} }
if (hasRight) { if (hasRight) {
rpcRsp.code = mgmtDropUser(pUser->pAcct, pDrop->user); code = mgmtDropUser(pUser->pAcct, pDrop->user);
if (rpcRsp.code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
mLPrint("user:%s is dropped by %s", pDrop->user, pUser->user); mLPrint("user:%s is dropped by %s", pDrop->user, pUser->user);
} }
} else { } else {
rpcRsp.code = TSDB_CODE_NO_RIGHTS; code = TSDB_CODE_NO_RIGHTS;
} }
rpcSendResponse(&rpcRsp); mgmtSendSimpleResp(pMsg->thandle, code);
} }
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDClient.h" #include "mgmtDClient.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "mgmtProfile.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtTable.h" #include "mgmtTable.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
...@@ -42,6 +43,9 @@ static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t ...@@ -42,6 +43,9 @@ static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t
static int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
static void mgmtVgroupActionInit() { static void mgmtVgroupActionInit() {
SVgObj tObj; SVgObj tObj;
...@@ -69,7 +73,7 @@ int32_t mgmtInitVgroups() { ...@@ -69,7 +73,7 @@ int32_t mgmtInitVgroups() {
mgmtVgroupActionInit(); mgmtVgroupActionInit();
tsVgroupSdb = sdbOpenTable(tsMaxVGroups, tsVgUpdateSize, "vgroups", SDB_KEYTYPE_AUTO, tsMgmtDirectory, mgmtVgroupAction); tsVgroupSdb = sdbOpenTable(tsMaxVGroups, tsVgUpdateSize, "vgroups", SDB_KEYTYPE_AUTO, tsMnodeDir, mgmtVgroupAction);
if (tsVgroupSdb == NULL) { if (tsVgroupSdb == NULL) {
mError("failed to init vgroups data"); mError("failed to init vgroups data");
return -1; return -1;
...@@ -114,6 +118,7 @@ int32_t mgmtInitVgroups() { ...@@ -114,6 +118,7 @@ int32_t mgmtInitVgroups() {
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp);
mTrace("vgroup is initialized"); mTrace("vgroup is initialized");
return 0; return 0;
...@@ -123,19 +128,6 @@ SVgObj *mgmtGetVgroup(int32_t vgId) { ...@@ -123,19 +128,6 @@ SVgObj *mgmtGetVgroup(int32_t vgId) {
return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId); return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId);
} }
int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup) {
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
mWarn("table:%s, vgroup:%d run out of ID, num:%d", pDb->name, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool));
pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS;
mgmtCreateVgroup(pDb);
terrno = TSDB_CODE_ACTION_IN_PROGRESS;
}
terrno = 0;
return sid;
}
/* /*
* TODO: check if there is enough sids * TODO: check if there is enough sids
*/ */
...@@ -155,21 +147,25 @@ void mgmtProcessVgTimer(void *handle, void *tmrId) { ...@@ -155,21 +147,25 @@ void mgmtProcessVgTimer(void *handle, void *tmrId) {
pDb->vgTimer = NULL; pDb->vgTimer = NULL;
} }
SVgObj *mgmtCreateVgroup(SDbObj *pDb) { void mgmtCreateVgroup(SQueuedMsg *pMsg) {
SDbObj *pDb = pMsg->pDb;
if (pDb == NULL) {
mError("thandle:%p, failed to create vgroup, db not found", pMsg->thandle);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB);
return;
}
SVgObj *pVgroup = (SVgObj *)calloc(sizeof(SVgObj), 1); SVgObj *pVgroup = (SVgObj *)calloc(sizeof(SVgObj), 1);
strcpy(pVgroup->dbName, pDb->name); strcpy(pVgroup->dbName, pDb->name);
pVgroup->numOfVnodes = pDb->cfg.replications; pVgroup->numOfVnodes = pDb->cfg.replications;
pVgroup->createdTime = taosGetTimestampMs();
// based on load balance, create a new one
if (mgmtAllocVnodes(pVgroup) != 0) { if (mgmtAllocVnodes(pVgroup) != 0) {
mError("db:%s, no enough free dnode to alloc %d vnodes", pDb->name, pVgroup->numOfVnodes); mError("thandle:%p, db:%s no enough dnode to alloc %d vnodes", pMsg->thandle, pDb->name, pVgroup->numOfVnodes);
free(pVgroup); free(pVgroup);
pDb->vgStatus = TSDB_VG_STATUS_FULL; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES);
taosTmrReset(mgmtProcessVgTimer, 5000, pDb, tsMgmtTmr, &pDb->vgTimer); return;
return NULL;
} }
pVgroup->createdTime = taosGetTimestampMs();
pVgroup->tableList = (STableInfo **) calloc(sizeof(STableInfo *), pDb->cfg.maxSessions); pVgroup->tableList = (STableInfo **) calloc(sizeof(STableInfo *), pDb->cfg.maxSessions);
pVgroup->numOfTables = 0; pVgroup->numOfTables = 0;
pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions); pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions);
...@@ -179,11 +175,16 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) { ...@@ -179,11 +175,16 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
sdbInsertRow(tsVgroupSdb, pVgroup, 0); sdbInsertRow(tsVgroupSdb, pVgroup, 0);
mTrace("vgroup:%d, vgroup is created, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); mPrint("thandle:%p, vgroup:%d is created in mnode, db:%s replica:%d", pMsg->thandle, pVgroup->vgId, pDb->name,
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) pVgroup->numOfVnodes);
mTrace("vgroup:%d, dnode:%s vnode:%d is created", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
mPrint("thandle:%p, vgroup:%d, dnode:%s vnode:%d", pMsg->thandle, pVgroup->vgId,
taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode);
}
return pVgroup; pMsg->ahandle = pVgroup;
pMsg->expected = pVgroup->numOfVnodes;
mgmtSendCreateVgroupMsg(pVgroup, pMsg);
} }
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
...@@ -514,13 +515,13 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) { ...@@ -514,13 +515,13 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) {
SDbObj *pDb = mgmtGetDb(pVgroup->dbName); SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) return NULL; if (pDb == NULL) return NULL;
SMDCreateVnodeMsg *pVPeers = rpcMallocCont(sizeof(SMDCreateVnodeMsg)); SMDCreateVnodeMsg *pVnode = rpcMallocCont(sizeof(SMDCreateVnodeMsg));
if (pVPeers == NULL) return NULL; if (pVnode == NULL) return NULL;
pVPeers->vnode = htonl(vnode); pVnode->vnode = htonl(vnode);
pVPeers->cfg = pDb->cfg; pVnode->cfg = pDb->cfg;
SVnodeCfg *pCfg = &pVPeers->cfg; SVnodeCfg *pCfg = &pVnode->cfg;
pCfg->vgId = htonl(pVgroup->vgId); pCfg->vgId = htonl(pVgroup->vgId);
pCfg->maxSessions = htonl(pCfg->maxSessions); pCfg->maxSessions = htonl(pCfg->maxSessions);
pCfg->cacheBlockSize = htonl(pCfg->cacheBlockSize); pCfg->cacheBlockSize = htonl(pCfg->cacheBlockSize);
...@@ -534,13 +535,14 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) { ...@@ -534,13 +535,14 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) {
pCfg->replications = (char) pVgroup->numOfVnodes; pCfg->replications = (char) pVgroup->numOfVnodes;
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
SVnodeDesc *vpeerDesc = pVPeers->vpeerDesc; SVnodeDesc *vpeerDesc = pVnode->vpeerDesc;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip); vpeerDesc[j].vgId = htonl(pVgroup->vgId);
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip);
vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode); vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode);
} }
return pVPeers; return pVnode;
} }
SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) { SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) {
...@@ -558,7 +560,11 @@ SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) { ...@@ -558,7 +560,11 @@ SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) {
} }
SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
SRpcIpSet ipSet = {.numOfIps = pVgroup->numOfVnodes, .inUse = 0, .port = tsMnodeDnodePort + 1}; SRpcIpSet ipSet = {
.numOfIps = pVgroup->numOfVnodes,
.inUse = 0,
.port = tsDnodeMnodePort
};
for (int i = 0; i < pVgroup->numOfVnodes; ++i) { for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
ipSet.ip[i] = pVgroup->vnodeGid[i].ip; ipSet.ip[i] = pVgroup->vnodeGid[i].ip;
} }
...@@ -566,7 +572,12 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { ...@@ -566,7 +572,12 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
} }
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) { SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) {
SRpcIpSet ipSet = {.ip[0] = ip, .numOfIps = 1, .inUse = 0, .port = tsMnodeDnodePort + 1}; SRpcIpSet ipSet = {
.ip[0] = ip,
.numOfIps = 1,
.inUse = 0,
.port = tsDnodeMnodePort
};
return ipSet; return ipSet;
} }
...@@ -574,19 +585,54 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, vo ...@@ -574,19 +585,54 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, vo
mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle); mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle);
SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup, vnode); SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup, vnode);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.handle = ahandle, .handle = ahandle,
.pCont = pCreate, .pCont = pCreate,
.contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0, .contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0,
.code = 0, .code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE .msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE
}; };
mgmtSendMsgToDnode(ipSet, &rpcMsg); mgmtSendMsgToDnode(ipSet, &rpcMsg);
} }
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
mTrace("vgroup:%d, send create all vnodes msg, handle:%p", pVgroup->vgId, ahandle); mTrace("send create vgroup:%d msg, ahandle:%p", pVgroup->vgId, ahandle);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle); mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
} }
}
static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->handle == NULL) return;
SQueuedMsg *queueMsg = rpcMsg->handle;
queueMsg->received++;
if (rpcMsg->code == TSDB_CODE_SUCCESS) {
queueMsg->code = rpcMsg->code;
queueMsg->successed++;
}
SVgObj *pVgroup = queueMsg->ahandle;
mTrace("thandle:%p, vgroup:%d create vnode rsp received, ahandle:%p code:%d received:%d successed:%d expected:%d",
queueMsg->thandle, pVgroup->vgId, rpcMsg->handle, rpcMsg->code, queueMsg->received, queueMsg->successed,
queueMsg->expected);
if (queueMsg->received != queueMsg->expected) return;
if (queueMsg->received == queueMsg->successed) {
SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg));
newMsg->msgType = queueMsg->msgType;
newMsg->thandle = queueMsg->thandle;
newMsg->pDb = queueMsg->pDb;
newMsg->pUser = queueMsg->pUser;
newMsg->contLen = queueMsg->contLen;
newMsg->pCont = rpcMallocCont(newMsg->contLen);
memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen);
mgmtAddToShellQueue(newMsg);
} else {
sdbDeleteRow(tsVgroupSdb, pVgroup);
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
}
free(queueMsg);
} }
\ No newline at end of file
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
#include "tutil.h" #include "tutil.h"
char configDir[TSDB_FILENAME_LEN] = "/etc/taos"; char configDir[TSDB_FILENAME_LEN] = "/etc/taos";
char tsDirectory[TSDB_FILENAME_LEN] = "/var/lib/taos"; char tsVnodeDir[TSDB_FILENAME_LEN] = "/var/lib/taos";
char dataDir[TSDB_FILENAME_LEN] = "/var/lib/taos"; char dataDir[TSDB_FILENAME_LEN] = "/var/lib/taos";
char logDir[TSDB_FILENAME_LEN] = "~/TDengineLog"; char logDir[TSDB_FILENAME_LEN] = "~/TDengineLog";
char scriptDir[TSDB_FILENAME_LEN] = "/etc/taos"; char scriptDir[TSDB_FILENAME_LEN] = "/etc/taos";
......
...@@ -35,7 +35,9 @@ ...@@ -35,7 +35,9 @@
#include "ttimer.h" #include "ttimer.h"
char configDir[TSDB_FILENAME_LEN] = "/etc/taos"; char configDir[TSDB_FILENAME_LEN] = "/etc/taos";
char tsDirectory[TSDB_FILENAME_LEN] = "/var/lib/taos"; char tsVnodeDir[TSDB_FILENAME_LEN] = {0};
char tsDnodeDir[TSDB_FILENAME_LEN] = {0};
char tsMnodeDir[TSDB_FILENAME_LEN] = {0};
char dataDir[TSDB_FILENAME_LEN] = "/var/lib/taos"; char dataDir[TSDB_FILENAME_LEN] = "/var/lib/taos";
char logDir[TSDB_FILENAME_LEN] = "/var/log/taos"; char logDir[TSDB_FILENAME_LEN] = "/var/log/taos";
char scriptDir[TSDB_FILENAME_LEN] = "/etc/taos"; char scriptDir[TSDB_FILENAME_LEN] = "/etc/taos";
......
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
#include <Winsock2.h> #include <Winsock2.h>
char configDir[TSDB_FILENAME_LEN] = "C:/TDengine/cfg"; char configDir[TSDB_FILENAME_LEN] = "C:/TDengine/cfg";
char tsDirectory[TSDB_FILENAME_LEN] = "C:/TDengine/data"; char tsVnodeDir[TSDB_FILENAME_LEN] = "C:/TDengine/data";
char logDir[TSDB_FILENAME_LEN] = "C:/TDengine/log"; char logDir[TSDB_FILENAME_LEN] = "C:/TDengine/log";
char dataDir[TSDB_FILENAME_LEN] = "C:/TDengine/data"; char dataDir[TSDB_FILENAME_LEN] = "C:/TDengine/data";
char scriptDir[TSDB_FILENAME_LEN] = "C:/TDengine/script"; char scriptDir[TSDB_FILENAME_LEN] = "C:/TDengine/script";
......
...@@ -34,6 +34,12 @@ char *taosAddIntHash(void *handle, uint64_t key, char *pData); ...@@ -34,6 +34,12 @@ char *taosAddIntHash(void *handle, uint64_t key, char *pData);
int32_t taosHashInt(void *handle, uint64_t key); int32_t taosHashInt(void *handle, uint64_t key);
void taosCleanUpIntHashWithFp(void *handle, void (*fp)(char *));
char *taosVisitIntHashWithFp(void *handle, int (*fp)(char *));
int32_t taosGetIntHashSize(void *handle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -50,7 +50,9 @@ extern int tscEmbedded; ...@@ -50,7 +50,9 @@ extern int tscEmbedded;
extern int64_t tsMsPerDay[2]; extern int64_t tsMsPerDay[2];
extern char configDir[]; extern char configDir[];
extern char tsDirectory[]; extern char tsVnodeDir[];
extern char tsDnodeDir[];
extern char tsMnodeDir[];
extern char dataDir[]; extern char dataDir[];
extern char logDir[]; extern char logDir[];
extern char scriptDir[]; extern char scriptDir[];
...@@ -263,9 +265,6 @@ SGlobalConfig *tsGetConfigOption(const char *option); ...@@ -263,9 +265,6 @@ SGlobalConfig *tsGetConfigOption(const char *option);
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
extern char tsMgmtDirectory[];
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -26,7 +26,7 @@ typedef struct { ...@@ -26,7 +26,7 @@ typedef struct {
IHashNode **hashList; IHashNode **hashList;
int32_t maxSessions; int32_t maxSessions;
int32_t dataSize; int32_t dataSize;
int32_t (*hashFp)(void *, uint64_t key); int32_t (*hashFp)(void *, uint64_t key);
pthread_mutex_t mutex; pthread_mutex_t mutex;
} IHashObj; } IHashObj;
...@@ -186,3 +186,93 @@ void taosCleanUpIntHash(void *handle) { ...@@ -186,3 +186,93 @@ void taosCleanUpIntHash(void *handle) {
memset(pObj, 0, sizeof(IHashObj)); memset(pObj, 0, sizeof(IHashObj));
free(pObj); free(pObj);
} }
void taosCleanUpIntHashWithFp(void *handle, void (*fp)(char *)) {
IHashObj * pObj;
IHashNode *pNode, *pNext;
pObj = (IHashObj *)handle;
if (pObj == NULL || pObj->maxSessions <= 0) return;
pthread_mutex_lock(&pObj->mutex);
if (pObj->hashList) {
for (int i = 0; i < pObj->maxSessions; ++i) {
pNode = pObj->hashList[i];
while (pNode) {
pNext = pNode->next;
if (fp != NULL) fp(pNode->data);
free(pNode);
pNode = pNext;
}
}
free(pObj->hashList);
}
pthread_mutex_unlock(&pObj->mutex);
pthread_mutex_destroy(&pObj->mutex);
memset(pObj, 0, sizeof(IHashObj));
free(pObj);
}
char *taosVisitIntHashWithFp(void *handle, int (*fp)(char *)) {
IHashObj * pObj;
IHashNode *pNode, *pNext;
char * pData = NULL;
pObj = (IHashObj *)handle;
if (pObj == NULL || pObj->maxSessions <= 0) return NULL;
pthread_mutex_lock(&pObj->mutex);
if (pObj->hashList) {
for (int i = 0; i < pObj->maxSessions; ++i) {
pNode = pObj->hashList[i];
while (pNode) {
pNext = pNode->next;
int flag = fp(pNode->data);
if (flag) {
pData = pNode->data;
goto VisitEnd;
}
pNode = pNext;
}
}
}
VisitEnd:
pthread_mutex_unlock(&pObj->mutex);
return pData;
}
int32_t taosGetIntHashSize(void *handle) {
IHashObj * pObj;
IHashNode *pNode, *pNext;
char * pData = NULL;
int32_t num = 0;
pObj = (IHashObj *)handle;
if (pObj == NULL || pObj->maxSessions <= 0) return NULL;
pthread_mutex_lock(&pObj->mutex);
if (pObj->hashList) {
for (int i = 0; i < pObj->maxSessions; ++i) {
pNode = pObj->hashList[i];
while (pNode) {
pNext = pNode->next;
num++;
pNode = pNext;
}
}
}
pthread_mutex_unlock(&pObj->mutex);
return num;
}
\ No newline at end of file
...@@ -33,7 +33,7 @@ typedef struct { ...@@ -33,7 +33,7 @@ typedef struct {
SHashNode **hashList; SHashNode **hashList;
uint32_t maxSessions; uint32_t maxSessions;
uint32_t dataSize; uint32_t dataSize;
uint32_t (*hashFp)(void *, char *string); uint32_t (*hashFp)(void *, char *string);
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SHashObj; } SHashObj;
......
...@@ -58,6 +58,7 @@ enum { TSDB_PRECISION_MILLI, TSDB_PRECISION_MICRO, TSDB_PRECISION_NANO }; ...@@ -58,6 +58,7 @@ enum { TSDB_PRECISION_MILLI, TSDB_PRECISION_MICRO, TSDB_PRECISION_NANO };
// the TSDB repository configuration // the TSDB repository configuration
typedef struct { typedef struct {
int8_t precision; int8_t precision;
int32_t vgId;
int32_t tsdbId; int32_t tsdbId;
int32_t maxTables; // maximum number of tables this repository can have int32_t maxTables; // maximum number of tables this repository can have
int32_t daysPerFile; // day per file sharding policy int32_t daysPerFile; // day per file sharding policy
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册