diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 1281ce98ac3299096934f69d0985f045707a2e2e..d8220fe846350665f90782e0ae65004d815b9dd2 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -57,11 +57,10 @@ void dnodeCleanupMClient() { } static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) { - if (dnodeProcessMgmtRspFp[pMsg->msgType]) { (*dnodeProcessMgmtRspFp[pMsg->msgType])(pMsg); } else { - dError("%s is not processed", taosMsg[pMsg->msgType]); + dError("%s is not processed in mclient", taosMsg[pMsg->msgType]); } rpcFreeCont(pMsg->pCont); diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 89b29508d5eccef740b56212e86fdd4cccc29a95..65fac0155bbebad9e47e3193bc6cbac26c750cd3 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -226,20 +226,18 @@ static void dnodeCheckDataDirOpenned(char *dir) { static int32_t dnodeInitStorage() { struct stat dirstat; - strcpy(tsDirectory, dataDir); if (stat(dataDir, &dirstat) < 0) { mkdir(dataDir, 0755); } - char fileName[128]; - sprintf(fileName, "%s/tsdb", tsDirectory); - mkdir(fileName, 0755); - sprintf(fileName, "%s/data", tsDirectory); - mkdir(fileName, 0755); - sprintf(tsMgmtDirectory, "%s/mgmt", tsDirectory); - sprintf(tsDirectory, "%s/tsdb", dataDir); + sprintf(tsMnodeDir, "%s/mnode", dataDir); + sprintf(tsVnodeDir, "%s/vnode", dataDir); + sprintf(tsDnodeDir, "%s/dnode", dataDir); + mkdir(tsMnodeDir, 0755); + mkdir(tsVnodeDir, 0755); + mkdir(tsDnodeDir, 0755); - dnodeCheckDataDirOpenned(dataDir); + dnodeCheckDataDirOpenned(tsDnodeDir); dPrint("storage directory is initialized"); return 0; diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 7131440ec12ed16c8100fe5085acf3f8cb675645..63e4a290aa5b4615559fb798ba5d4d3b97937f8d 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -29,6 +29,7 @@ typedef struct { int32_t vgId; // global vnode group ID + int32_t vnode; int32_t status; // status: master, slave, notready, deleting int32_t refCount; // reference count int64_t version; @@ -43,7 +44,7 @@ typedef struct { static int32_t dnodeOpenVnodes(); static void dnodeCleanupVnodes(); -static int32_t dnodeOpenVnode(int32_t vgId); +static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir); static void dnodeCleanupVnode(SVnodeObj *pVnode); static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *cfg); static void dnodeDropVnode(SVnodeObj *pVnode); @@ -79,7 +80,25 @@ int32_t dnodeInitMgmt() { } taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer); - return dnodeOpenVnodes(); + SMDCreateVnodeMsg cfg; + cfg.cfg.vgId = 1; + cfg.cfg.precision = 0; + cfg.vnode = 1; + cfg.cfg.maxSessions = 1000; + cfg.cfg.daysPerFile = 10; + + dnodeCreateVnode(&cfg); + SVnodeObj *pVnode = dnodeGetVnode(cfg.cfg.vgId); + dnodeDropVnode(pVnode); + + dnodeCreateVnode(&cfg); + SVnodeObj *pVnode = dnodeGetVnode(cfg.cfg.vgId); + dnodeCleanupVnodes(); + + dnodeOpenVnodes(); + dnodeCleanupVnodes(); + + //return dnodeOpenVnodes(); } void dnodeCleanupMgmt() { @@ -98,14 +117,13 @@ void dnodeMgmt(SRpcMsg *pMsg) { if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); } else { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; + SRpcMsg rsp; + rsp.handle = pMsg->handle; + rsp.code = TSDB_CODE_MSG_NOT_PROCESSED; + rsp.pCont = NULL; + rpcSendResponse(&rsp); } - SRpcMsg rsp; - rsp.handle = pMsg->handle; - rsp.code = terrno; - rsp.pCont = NULL; - rpcSendResponse(&rsp); rpcFreeCont(pMsg->pCont); // free the received message } @@ -150,28 +168,55 @@ void dnodeReleaseVnode(void *pVnode) { } static int32_t dnodeOpenVnodes() { - dPrint("open all vnodes"); + DIR *dir = opendir(tsVnodeDir); + if (dir == NULL) { + return TSDB_CODE_NO_WRITE_ACCESS; + } + + int32_t numOfVnodes = 0; + struct dirent *de = NULL; + while ((de = readdir(dir)) != NULL) { + if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue; + if (de->d_type & DT_DIR) { + if (strncmp("vnode", de->d_name, 5) != 0) continue; + int32_t vnode = atoi(de->d_name + 5); + if (vnode == 0) continue; + + char tsdbDir[TSDB_FILENAME_LEN]; + sprintf(tsdbDir, "%s/%s", tsVnodeDir, de->d_name); + int32_t code = dnodeOpenVnode(vnode, tsdbDir); + if (code == 0) { + numOfVnodes++; + } + } + } + closedir(dir); + + dPrint("all vnodes is opened, num:%d", numOfVnodes); return TSDB_CODE_SUCCESS; } static void dnodeCleanupVnodes() { - dPrint("clean all vnodes"); + int32_t num = taosGetIntHashSize(tsDnodeVnodesHash); + taosCleanUpIntHashWithFp(tsDnodeVnodesHash, dnodeCleanupVnode); + dPrint("all vnodes is opened, num:%d", num); } -static int32_t dnodeOpenVnode(int32_t vgId) { - char rootDir[TSDB_FILENAME_LEN] = {0}; - sprintf(rootDir, "%s/vnode%d", tsDirectory, vgId); - +static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) { void *pTsdb = tsdbOpenRepo(rootDir); - if (pTsdb != NULL) { + if (pTsdb == NULL) { + dError("failed to open vnode:%d in dir:%s, reason:%s", vnode, rootDir, tstrerror(terrno)); return terrno; } + //STsdbRepoInfo *tsdbInfo = tsdbGetStatus(pTsdb); + SVnodeObj vnodeObj; - vnodeObj.vgId = vgId; + vnodeObj.vgId = vnode; //tsdbInfo->tsdbCfg.vgId; + vnodeObj.vnode = vnode; //tsdbInfo->tsdbCfg.tsdbId; vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.refCount = 1; - vnodeObj.version = 0; + vnodeObj.version = version; vnodeObj.wworker = dnodeAllocateWriteWorker(); vnodeObj.rworker = dnodeAllocateReadWorker(); vnodeObj.wal = NULL; @@ -182,6 +227,7 @@ static int32_t dnodeOpenVnode(int32_t vgId) { taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); + dTrace("open vnode:%d in %s", vnodeObj.vnode, rootDir); return TSDB_CODE_SUCCESS; } @@ -210,11 +256,12 @@ static void dnodeCleanupVnode(SVnodeObj *pVnode) { pVnode->tsdb = NULL; } - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + dTrace("cleanup vnode:%d", pVnode->vnode); } static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { STsdbCfg tsdbCfg; + tsdbCfg.vgId = pVnodeCfg->cfg.vgId; tsdbCfg.precision = pVnodeCfg->cfg.precision; tsdbCfg.tsdbId = pVnodeCfg->vnode; tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; @@ -225,15 +272,16 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { tsdbCfg.maxCacheSize = -1; char rootDir[TSDB_FILENAME_LEN] = {0}; - sprintf(rootDir, "%s/vnode%d", tsDirectory, pVnodeCfg->cfg.vgId); + sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId); void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL); - if (pTsdb != NULL) { + if (pTsdb == NULL) { return terrno; } SVnodeObj vnodeObj; vnodeObj.vgId = pVnodeCfg->cfg.vgId; + vnodeObj.vnode = pVnodeCfg->vnode; vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.refCount = 1; vnodeObj.version = 0; @@ -264,6 +312,7 @@ static void dnodeDropVnode(SVnodeObj *pVnode) { } dnodeCleanupVnode(pVnode); + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); } static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { @@ -281,9 +330,8 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { } else { rpcRsp.code = dnodeCreateVnode(pCreate); } - + rpcRsp.code = TSDB_CODE_SUCCESS; rpcSendResponse(&rpcRsp); - rpcFreeCont(rpcMsg->pCont); } static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { @@ -301,7 +349,6 @@ static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { } rpcSendResponse(&rpcRsp); - rpcFreeCont(rpcMsg->pCont); } static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { @@ -321,7 +368,6 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { } rpcSendResponse(&rpcRsp); - rpcFreeCont(rpcMsg->pCont); } static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { @@ -342,7 +388,6 @@ static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { // dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } - static void dnodeSendStatusMsg(void *handle, void *tmrId) { taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); if (tsStatusTimer == NULL) { diff --git a/src/dnode/src/dnodeMnode.c b/src/dnode/src/dnodeMnode.c index 6e75ddc68ef197e5a12dda4b964d3e7cfb6e0ec6..971e7b37f518d0b3cbe4b4c6df72579e996a9461 100644 --- a/src/dnode/src/dnodeMnode.c +++ b/src/dnode/src/dnodeMnode.c @@ -81,7 +81,7 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) { if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); } else { - dError("%s is not processed", taosMsg[pMsg->msgType]); + dError("%s is not processed in mserver", taosMsg[pMsg->msgType]); rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 4250297e05b2edcea953dfd4df2bce27987a4fa4..20fc948844e814326cfabfdde397f15a054a4b4d 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -60,11 +60,13 @@ int32_t dnodeInitRead() { maxThreads = tsNumOfCores*tsNumOfThreadsPerCore; if (maxThreads <= minThreads*2) maxThreads = 2*minThreads; + dPrint("dnode read is opened"); return 0; } void dnodeCleanupRead() { taosCloseQset(readQset); + dPrint("dnode read is closed"); } void dnodeRead(SRpcMsg *pMsg) { diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index a0a0a1b5fd0c88227bb0b0fc9872e1ebad27f70c..c5d44cc95a77bf2df58769737428af53c6d1b512 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -76,35 +76,43 @@ int32_t dnodeInitWrite() { wWorkerPool.writeWorker[i].workerId = i; } + dPrint("dnode write is opened"); return 0; } void dnodeCleanupWrite() { free(wWorkerPool.writeWorker); + dPrint("dnode write is closed"); } void dnodeWrite(SRpcMsg *pMsg) { + int32_t queuedMsgNum = 0; int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - int32_t contLen = 0; - int32_t numOfVnodes = 0; - int32_t vgId = 0; SRpcContext *pRpcContext = NULL; - // parse head, get number of vnodes; + int32_t numOfVnodes = 0; + if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { + // TODO parse head, get number of vnodes; + numOfVnodes = 1; + } else { + numOfVnodes = 1; + } - if ( numOfVnodes > 1) { + if (numOfVnodes > 1) { pRpcContext = calloc(sizeof(SRpcContext), 1); pRpcContext->numOfVnodes = numOfVnodes; } while (leftLen > 0) { - // todo: parse head, get vgId, contLen + SWriteMsgHead *pHead = (SWriteMsgHead *) pCont; + int32_t vgId = htonl(pHead->vgId); + int32_t contLen = htonl(pHead->contLen); - // get pVnode from vgId void *pVnode = dnodeGetVnode(vgId); if (pVnode == NULL) { - + leftLen -= contLen; + pCont -= contLen; continue; } @@ -118,20 +126,37 @@ void dnodeWrite(SRpcMsg *pMsg) { taos_queue queue = dnodeGetVnodeWworker(pVnode); taosWriteQitem(queue, &writeMsg); - + // next vnode leftLen -= contLen; - pCont -= contLen; + pCont -= contLen; + queuedMsgNum++; + } + + if (queuedMsgNum == 0) { + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .pCont = NULL, + .contLen = 0, + .code = TSDB_CODE_INVALID_VGROUP_ID, + .msgType = 0 + }; + rpcSendResponse(&rpcRsp); } } void *dnodeAllocateWriteWorker() { SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; + taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg)); + if (queue != NULL) return queue; if (pWorker->qset == NULL) { pWorker->qset = taosOpenQset(); if (pWorker->qset == NULL) return NULL; + taosAddIntoQset(pWorker->qset, queue); + wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; + pthread_attr_t thAttr; pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); @@ -140,14 +165,11 @@ void *dnodeAllocateWriteWorker() { dError("failed to create thread to process read queue, reason:%s", strerror(errno)); taosCloseQset(pWorker->qset); } - } - - taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg)); - if (queue) { + } else { taosAddIntoQset(pWorker->qset, queue); wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; } - + return queue; } diff --git a/src/inc/mnode.h b/src/inc/mnode.h index d206577cdc63f4d2f1d9c4a124c09b02b7ed123a..64c75dca7d36d9196e017540b512974893639fc0 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -39,12 +39,7 @@ extern "C" { #include "ttimer.h" #include "tutil.h" -// internal globals -extern char version[]; -extern void *tsMgmtTmr; -extern char tsMgmtDirectory[]; - -typedef struct { + typedef struct { uint32_t privateIp; int32_t sid; uint32_t moduleStatus; @@ -87,11 +82,6 @@ typedef struct { int32_t vnode; } SVnodeGid; -typedef struct { - int32_t sid; - int32_t vgId; // vnode group ID -} STableGid; - typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; @@ -248,16 +238,32 @@ typedef struct { int16_t offset[TSDB_MAX_COLUMNS]; int16_t bytes[TSDB_MAX_COLUMNS]; void * signature; - uint16_t payloadLen; /* length of payload*/ - char payload[]; /* payload for wildcard match in show tables */ + uint16_t payloadLen; + char payload[]; } SShowObj; -//mgmtSystem +typedef struct { + uint8_t msgType; + int8_t expected; + int8_t received; + int8_t successed; + int32_t contLen; + int32_t code; + void *ahandle; + void *thandle; + void *pCont; + SDbObj *pDb; + SUserObj *pUser; +} SQueuedMsg; + int32_t mgmtInitSystem(); int32_t mgmtStartSystem(); void mgmtCleanUpSystem(); void mgmtStopSystem(); +extern char version[]; +extern void *tsMgmtTmr; +extern char tsMnodeDir[]; #ifdef __cplusplus } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b83b81d358a8fbcba28bb9bad44e5765431a1ef6..34ef1eb67ca34c3f4977e8224e11ab0cd7f41a1b 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -238,11 +238,20 @@ typedef struct SSchema { } SSchema; typedef struct { + int32_t vgId; int32_t vnode; //the index of vnode uint32_t ip; } SVnodeDesc; typedef struct { + int32_t contLen; + int32_t vgId; +} SWriteMsgHead; + +typedef struct { + int32_t contLen; + int32_t vgId; + int8_t tableType; int16_t numOfColumns; int16_t numOfTags; @@ -250,7 +259,6 @@ typedef struct { int32_t sversion; int32_t tagDataLen; int32_t sqlDataLen; - int32_t contLen; int32_t numOfVPeers; uint64_t uid; uint64_t superTableUid; @@ -336,6 +344,7 @@ typedef struct { } SMgmtHead; typedef struct { + int32_t vgId; int32_t sid; int32_t numOfVPeers; uint64_t uid; diff --git a/src/mnode/inc/mgmtProfile.h b/src/mnode/inc/mgmtProfile.h index 31904666586cd5a1f7c349f07b75d0e90fc48762..7f9fd9622c7b782e1d796fc36f6340198702934b 100644 --- a/src/mnode/inc/mgmtProfile.h +++ b/src/mnode/inc/mgmtProfile.h @@ -28,22 +28,6 @@ bool mgmtCheckQhandle(uint64_t qhandle); void mgmtSaveQhandle(void *qhandle); void mgmtFreeQhandle(void *qhandle); -enum { - TSDB_PROCESS_CREATE_VGROUP, - TSDB_PROCESS_CREATE_VGROUP_GET_META, - TSDB_PROCESS_CREATE_TABLE, - TSDB_PROCESS_CREATE_TABLE_GET_META, -}; - -typedef struct { - void *thandle; // come from uplayer - void *ahandle; // object to process - void *cont; // additional information of object to process - int32_t type; // the type of sync process - int32_t received; // num of received, such as numOfVnodes - int32_t contLen; // the length of additional information -} SProcessInfo; - #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index ff089dad7ebb110388002d93eaf9252562a4c1cd..a9af568547802fca47a913a46cddf94951a9af72 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -23,13 +23,16 @@ extern "C" { int32_t mgmtInitShell(); void mgmtCleanUpShell(); -void mgmtAddShellMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); +void mgmtAddShellMsgHandle(uint8_t msgType, void (*fp)(SQueuedMsg *queuedMsg)); typedef int32_t (*SShowMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp); void mgmtAddShellShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp); +void mgmtAddToShellQueue(SQueuedMsg *queuedMsg); +void mgmtSendSimpleResp(void *thandle, int32_t code); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 2d94d48882ab5b337041f06af11787bec7b747e8..514b903db8010b741c26a2b3d268f5ac0f5586cc 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -33,7 +33,6 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp); int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); -int32_t mgmtCreateTable(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore); int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter); @@ -44,10 +43,6 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty); SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable); SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable); -void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle); -void mgmtProcessCreateTable(SVgObj *pVgroup, SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); -void mgmtProcessCreateVgroup(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); - #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 975a10dc47ca2303b52ef02c637358370577ea10..7ac97076c7f9113cb45082692213e02a5c76fe27 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -29,7 +29,7 @@ void mgmtCleanUpVgroups(); SVgObj *mgmtGetVgroup(int32_t vgId); SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode); -SVgObj *mgmtCreateVgroup(SDbObj *pDb); +void mgmtCreateVgroup(SQueuedMsg *pMsg); int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup); void mgmtUpdateVgroup(SVgObj *pVgroup); diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index 81f410548e13e606cea3375434e1635c962824e3..fae237c4f55d1ab4ed4a2eca0c47759141831207 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -56,10 +56,10 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) { } if (selectedVnode == -1) { - mError("vgroup:%d alloc vnode failed, free vnodes:%d", pVgroup->vgId, pDnode->numOfFreeVnodes); + mError("alloc vnode failed, free vnodes:%d", pDnode->numOfFreeVnodes); return -1; } else { - mTrace("vgroup:%d allocate vnode:%d, last allocated vnode:%d", pVgroup->vgId, selectedVnode, lastAllocVode); + mTrace("allocate vnode:%d, last allocated vnode:%d", selectedVnode, lastAllocVode); pVgroup->vnodeGid[0].vnode = selectedVnode; pDnode->lastAllocVnode = selectedVnode + 1; if (pDnode->lastAllocVnode >= pDnode->numOfVnodes) pDnode->lastAllocVnode = 0; diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 0c7518eef73d32844aecd7b10846465c9edabe41..af032ab08b3b9372638a5cf1a90e4df83f00b255 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -198,7 +198,7 @@ int32_t mgmtInitChildTables() { tsChildTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; tsChildTableSdb = sdbOpenTable(tsMaxTables, tsChildTableUpdateSize, - "ctables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtChildTableAction); + "ctables", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtChildTableAction); if (tsChildTableSdb == NULL) { mError("failed to init child table data"); return -1; @@ -359,7 +359,7 @@ int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pTableOut = (STableInfo *) pTable; - mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , + mTrace("table:%s, create ctable in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); return TSDB_CODE_SUCCESS; diff --git a/src/mnode/src/mgmtDClient.c b/src/mnode/src/mgmtDClient.c index 4670663222dde7b01521976e6ba7912f2247c701..22884cc7d005c0d595d1bf0229c4e9b1e8f373b6 100644 --- a/src/mnode/src/mgmtDClient.c +++ b/src/mnode/src/mgmtDClient.c @@ -78,45 +78,12 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) { if (mgmtProcessDnodeRspFp[rpcMsg->msgType]) { (*mgmtProcessDnodeRspFp[rpcMsg->msgType])(rpcMsg); } else { - dError("%s is not processed", taosMsg[rpcMsg->msgType]); + mError("%s is not processed in dclient", taosMsg[rpcMsg->msgType]); } rpcFreeCont(rpcMsg->pCont); } - -//static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { -// mTrace("create table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); -// if (rpcMsg->handle == NULL) return; -// -// SProcessInfo *info = rpcMsg->handle; -// assert(info->type == TSDB_PROCESS_CREATE_TABLE || info->type == TSDB_PROCESS_CREATE_TABLE_GET_META); -// -// STableInfo *pTable = info->ahandle; -// if (rpcMsg->code != TSDB_CODE_SUCCESS) { -// mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId, rpcMsg->code); -// mgmtSetTableDirty(pTable, true); -// } else { -// mTrace("table:%s, created in dnode", pTable->tableId); -// mgmtSetTableDirty(pTable, false); -// } -// -// if (rpcMsg->code != TSDB_CODE_SUCCESS) { -// SRpcMsg rpcRsp = {.handle = info->thandle, .pCont = NULL, .contLen = 0, .code = rpcMsg->code, .msgType = 0}; -// rpcSendResponse(&rpcMsg); -// } else { -// if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) { -// mTrace("table:%s, start to process get meta", pTable->tableId); -// mgmtProcessGetTableMeta(pTable, rpcMsg->handle); -// } else { -// SRpcMsg rpcRsp = {.handle = info->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; -// rpcSendResponse(&rpcMsg); -// } -// } -// -// free(info); -//} -// //static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { // mTrace("drop table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); //} @@ -125,27 +92,6 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) { // mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); //} // -//static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { -// mTrace("create vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); -// if (rpcMsg->handle == NULL) return; -// -// SProcessInfo *info = rpcMsg->handle; -// assert(info->type == TSDB_PROCESS_CREATE_VGROUP || info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META); -// -// info->received++; -// SVgObj *pVgroup = info->ahandle; -// -// bool isGetMeta = false; -// if (info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META) { -// isGetMeta = true; -// } -// -// mTrace("vgroup:%d, received:%d numOfVnodes:%d", pVgroup->vgId, info->received, pVgroup->numOfVnodes); -// if (info->received == pVgroup->numOfVnodes) { -// mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle, isGetMeta); -// free(info); -// } -//} // //static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { // mTrace("drop vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); diff --git a/src/mnode/src/mgmtDServer.c b/src/mnode/src/mgmtDServer.c index 469c903d833a265bd4307dceae420f89b9859aab..98a2e387285889bbcf5fdcf94e1e700c7fa3a6c7 100644 --- a/src/mnode/src/mgmtDServer.c +++ b/src/mnode/src/mgmtDServer.c @@ -76,7 +76,7 @@ static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) { if (mgmtProcessDnodeMsgFp[rpcMsg->msgType]) { (*mgmtProcessDnodeMsgFp[rpcMsg->msgType])(rpcMsg); } else { - mError("%s is not processed", taosMsg[rpcMsg->msgType]); + mError("%s is not processed in dserver", taosMsg[rpcMsg->msgType]); } rpcFreeCont(rpcMsg->pCont); @@ -210,22 +210,6 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s // } //} // -//void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { -// mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); -// for (int i = 0; i < pVgroup->numOfVnodes; ++i) { -// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); -// mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle); -// } -//} -// -//void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) { -// mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle); -// SMDCreateVnodeMsg *pVpeer = mgmtBuildCreateVnodeMsg(pVgroup, vnode); -// if (pVpeer != NULL) { -// mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_MD_CREATE_VNODE, pVpeer, sizeof(SMDCreateVnodeMsg), ahandle); -// } -//} -// //void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { // if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { // mError("invalid msg type:%d", msgType); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 5670d2a384a8e2376eeb824c5851dba96593ed77..12c34ad05762217a164a235ef554cced31c0218a 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -41,9 +41,9 @@ static int32_t mgmtDropDb(SDbObj *pDb); static int32_t mgmtGetDbMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg); -static void mgmtProcessAlterDbMsg(SRpcMsg *rpcMsg); -static void mgmtProcessDropDbMsg(SRpcMsg *rpcMsg); +static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg); +static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg); +static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg); static void *(*mgmtDbActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); static void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize); @@ -81,7 +81,7 @@ int32_t mgmtInitDbs() { SDbObj tObj; tsDbUpdateSize = tObj.updateEnd - (char *)&tObj; - tsDbSdb = sdbOpenTable(tsMaxDbs, tsDbUpdateSize, "dbs", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtDbAction); + tsDbSdb = sdbOpenTable(tsMaxDbs, tsDbUpdateSize, "dbs", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtDbAction); if (tsDbSdb == NULL) { mError("failed to init db data"); return -1; @@ -383,6 +383,7 @@ static void mgmtDropDbFromSdb(SDbObj *pDb) { } static int32_t mgmtDropDb(SDbObj *pDb) { + if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) { bool finished = mgmtCheckDropDbFinished(pDb); if (!finished) { @@ -405,6 +406,7 @@ static int32_t mgmtDropDb(SDbObj *pDb) { } } +UNUSED_FUNC static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) { SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, name); if (pDb == NULL) { @@ -904,19 +906,10 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) { atomic_add_fetch_32(&pDb->numOfTables, -1); } -static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; - - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return; - } - - SCMCreateDbMsg *pCreate = (SCMCreateDbMsg *) rpcMsg->pCont; +static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; + SCMCreateDbMsg *pCreate = pMsg->pCont; pCreate->maxSessions = htonl(pCreate->maxSessions); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCreate->daysPerFile = htonl(pCreate->daysPerFile); @@ -928,69 +921,58 @@ static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) { pCreate->rowsInFileBlock = htonl(pCreate->rowsInFileBlock); // pCreate->cacheNumOfBlocks = htonl(pCreate->cacheNumOfBlocks); + int32_t code; if (mgmtCheckExpired()) { - rpcRsp.code = TSDB_CODE_GRANT_EXPIRED; - } else if (!pUser->writeAuth) { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_GRANT_EXPIRED; + } else if (!pMsg->pUser->writeAuth) { + code = TSDB_CODE_NO_RIGHTS; } else { - rpcRsp.code = mgmtCreateDb(pUser->pAcct, pCreate); - if (rpcRsp.code == TSDB_CODE_SUCCESS) { - mLPrint("DB:%s is created by %s", pCreate->db, pUser->user); + code = mgmtCreateDb(pMsg->pUser->pAcct, pCreate); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("DB:%s is created by %s", pCreate->db, pMsg->pUser->user); } } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); } -static void mgmtProcessAlterDbMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; - - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return; - } +static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; - SCMAlterDbMsg *pAlter = (SCMAlterDbMsg *) rpcMsg->pCont; + SCMAlterDbMsg *pAlter = pMsg->pCont; pAlter->daysPerFile = htonl(pAlter->daysPerFile); pAlter->daysToKeep = htonl(pAlter->daysToKeep); pAlter->maxSessions = htonl(pAlter->maxSessions) + 1; - if (!pUser->writeAuth) { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + int32_t code; + if (!pMsg->pUser->writeAuth) { + code = TSDB_CODE_NO_RIGHTS; } else { - rpcRsp.code = mgmtAlterDb(pUser->pAcct, pAlter); - if (rpcRsp.code == TSDB_CODE_SUCCESS) { - mLPrint("DB:%s is altered by %s", pAlter->db, pUser->user); + code = mgmtAlterDb(pMsg->pUser->pAcct, pAlter); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("DB:%s is altered by %s", pAlter->db, pMsg->pUser->user); } } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); } +static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; -static void mgmtProcessDropDbMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; - - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return ; - } - - if (pUser->superAuth) { - SCMDropDbMsg *pDrop = rpcMsg->pCont; - rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists); - if (rpcRsp.code == TSDB_CODE_SUCCESS) { - mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user); - } + int32_t code; + if (pMsg->pUser->superAuth) { + code = TSDB_CODE_OPS_NOT_SUPPORT; + //SCMDropDbMsg *pDrop = rpcMsg->pCont; + //rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists); + //if (rpcRsp.code == TSDB_CODE_SUCCESS) { + // mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user); + //} } else { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(&rpcRsp); + if (code != TSDB_CODE_SUCCESS) { + mgmtSendSimpleResp(pMsg->thandle, code); + } } diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 576b7ebf6ef6088b79ecd7db99397faa4519cce1..c48b4477e5021dcead416b36d6c86b3823de10b6 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -43,7 +43,7 @@ static int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg); +static void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg); void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore; @@ -93,7 +93,7 @@ void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId) { memset(pVload, 0, sizeof(SVnodeLoad)); pVload->vnode = vnodeGid[i].vnode; pVload->vgId = vgId; - mTrace("dnode:%s, vnode:%d add to vgroup:%d", taosIpStr(vnodeGid[i].ip), vnodeGid[i].vnode, pVload->vgId); + mTrace("dnode:%s, vnode:%d add to vgroup:%d", taosIpStr(pDnode->privateIp), vnodeGid[i].vnode, pVload->vgId); mgmtCalcNumOfFreeVnodes(pDnode); } else { mError("dnode:%s, not in dnode DB!!!", taosIpStr(vnodeGid[i].ip)); @@ -527,21 +527,14 @@ bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode) { return pDnode->status == TSDB_DN_STATUS_OFFLINE; } -void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; +void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) { + SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirect(pMsg->thandle)) return; - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return; - } - - SCMCfgDnodeMsg *pCmCfgDnode = (SCMCfgDnodeMsg *) rpcMsg->pCont; + SCMCfgDnodeMsg *pCmCfgDnode = pMsg->pCont; uint32_t dnodeIp = inet_addr(pCmCfgDnode->ip); - if (strcmp(pUser->pAcct->user, "root") != 0) { + if (strcmp(pMsg->pUser->pAcct->user, "root") != 0) { rpcRsp.code = TSDB_CODE_NO_RIGHTS; } else { SRpcIpSet ipSet = mgmtGetIpSetFromIp(dnodeIp); @@ -560,7 +553,7 @@ void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg) { } if (rpcRsp.code == TSDB_CODE_SUCCESS) { - mTrace("dnode:%s is configured by %s", pCmCfgDnode->ip, pUser->user); + mTrace("dnode:%s is configured by %s", pCmCfgDnode->ip, pMsg->pUser->user); } rpcSendResponse(&rpcRsp); diff --git a/src/mnode/src/mgmtMain.c b/src/mnode/src/mgmtMain.c index 3e142e595d15e93a6e7c42f8a7572794de9b1b10..66200e5a1407aa59e1e2a76f736c23eb0f82a7b3 100644 --- a/src/mnode/src/mgmtMain.c +++ b/src/mnode/src/mgmtMain.c @@ -31,7 +31,6 @@ #include "mgmtShell.h" static int32_t mgmtCheckMgmtRunning(); -char tsMgmtDirectory[128] = {0}; void *tsMgmtTmr = NULL; int32_t mgmtInitSystem() { @@ -41,7 +40,7 @@ int32_t mgmtInitSystem() { } struct stat dirstat; - bool fileExist = (stat(tsMgmtDirectory, &dirstat) == 0); + bool fileExist = (stat(tsMnodeDir, &dirstat) == 0); bool asMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0); if (asMaster || fileExist) { @@ -57,8 +56,8 @@ int32_t mgmtStartSystem() { mPrint("starting to initialize TDengine mgmt ..."); struct stat dirstat; - if (stat(tsMgmtDirectory, &dirstat) < 0) { - mkdir(tsMgmtDirectory, 0755); + if (stat(tsMnodeDir, &dirstat) < 0) { + mkdir(tsMnodeDir, 0755); } if (mgmtCheckMgmtRunning() != 0) { @@ -110,7 +109,7 @@ int32_t mgmtStartSystem() { return -1; } - if (sdbInitPeers(tsMgmtDirectory) < 0) { + if (sdbInitPeers(tsMnodeDir) < 0) { mError("failed to init peers"); return -1; } @@ -132,7 +131,7 @@ void mgmtStopSystem() { } mgmtCleanUpSystem(); - remove(tsMgmtDirectory); + remove(tsMnodeDir); } void mgmtCleanUpSystem() { diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index d6623ee7009a8be93d0af69642fa6d04c74758cf..773467259945b8bd8f1abfe31e258351fc0eb74c 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -224,7 +224,7 @@ int32_t mgmtInitNormalTables() { tsNormalTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; tsNormalTableSdb = sdbOpenTable(tsMaxTables, sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS, - "ntables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtNormalTableAction); + "ntables", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtNormalTableAction); if (tsNormalTableSdb == NULL) { mError("failed to init ntables data"); return -1; @@ -393,7 +393,7 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb *pTableOut = (STableInfo *) pTable; - mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , + mTrace("table:%s, create ntable in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); return TSDB_CODE_SUCCESS; diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index e16a7f166c7b1c25c34c4b662caedaf57acc7e0e..bb192ce8a56fd353da3dd9ce92046b3eab2154c2 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -558,9 +558,11 @@ bool mgmtCheckQhandle(uint64_t qhandle) { } void mgmtSaveQhandle(void *qhandle) { + mTrace("qhandle:%p is allocated", qhandle); } void mgmtFreeQhandle(void *qhandle) { + mTrace("qhandle:%p is freed", qhandle); } int mgmtGetConns(SShowObj *pShow, void *pConn) { @@ -673,72 +675,72 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn return numOfRows; } -void mgmtProcessKillQueryMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; +void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) { + SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirect(pMsg->thandle)) return; - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; rpcSendResponse(&rpcRsp); return; } - SCMKillQueryMsg *pKill = (SCMKillQueryMsg *) rpcMsg->pCont; + SCMKillQueryMsg *pKill = pMsg->pCont; int32_t code; if (!pUser->writeAuth) { code = TSDB_CODE_NO_RIGHTS; } else { - code = mgmtKillQuery(pKill->queryId, rpcMsg->handle); + code = mgmtKillQuery(pKill->queryId, pMsg->thandle); } rpcRsp.code = code; rpcSendResponse(&rpcRsp); } -void mgmtProcessKillStreamMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; +void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) { + SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirect(pMsg->thandle)) return; - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; rpcSendResponse(&rpcRsp); return; } - SCMKillStreamMsg *pKill = (SCMKillStreamMsg *) rpcMsg->pCont; + SCMKillStreamMsg *pKill = pMsg->pCont; int32_t code; if (!pUser->writeAuth) { code = TSDB_CODE_NO_RIGHTS; } else { - code = mgmtKillStream(pKill->queryId, rpcMsg->handle); + code = mgmtKillStream(pKill->queryId, pMsg->thandle); } rpcRsp.code = code; rpcSendResponse(&rpcRsp); } -void mgmtProcessKillConnectionMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; +void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) { + SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirect(pMsg->thandle)) return; - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; rpcSendResponse(&rpcRsp); return; } - SCMKillConnMsg *pKill = (SCMKillConnMsg *) rpcMsg->pCont; + SCMKillConnMsg *pKill = pMsg->pCont; int32_t code; if (!pUser->writeAuth) { code = TSDB_CODE_NO_RIGHTS; } else { - code = mgmtKillConnection(pKill->queryId, rpcMsg->handle); + code = mgmtKillConnection(pKill->queryId, pMsg->thandle); } rpcRsp.code = code; diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 094848c8c081ba60495d28493ca6b05139d99a38..3898d62fdbbf6b31bbc30b5dfbc27e8478cf5578 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -41,24 +41,27 @@ typedef int32_t (*SShowMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static void mgmtProcessMsgFromShell(SRpcMsg *pMsg); -static void mgmtProcessShowMsg(SRpcMsg *rpcMsg); -static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg); -static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont); -static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg); -static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg); +static void mgmtProcessMsgFromShell(SRpcMsg *pMsg); +static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); +static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg); +static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg); +static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg); +static void mgmtProcessHeartBeatMsg(SQueuedMsg *queuedMsg); +static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg); static void *tsMgmtShellRpc = NULL; static void *tsMgmtTranQhandle = NULL; -static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *) = {0}; +static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0}; static SShowMetaFp tsMgmtShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; int32_t mgmtInitShell() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_RETRIEVE, mgmtProcessRetrieveMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg); tsMgmtTranQhandle = taosInitScheduler(tsMaxDnodes + tsMaxShellConns, 1, "mnodeT"); @@ -84,9 +87,6 @@ int32_t mgmtInitShell() { return -1; } - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg); - mPrint("server connection to shell is opened"); return 0; } @@ -104,7 +104,7 @@ void mgmtCleanUpShell() { } } -void mgmtAddShellMsgHandle(uint8_t showType, void (*fp)(SRpcMsg *rpcMsg)) { +void mgmtAddShellMsgHandle(uint8_t showType, void (*fp)(SQueuedMsg *queuedMsg)) { tsMgmtProcessShellMsgFp[showType] = fp; } @@ -117,107 +117,117 @@ void mgmtAddShellShowRetrieveHandle(uint8_t msgType, SShowRetrieveFp fp) { } void mgmtProcessTranRequest(SSchedMsg *sched) { - SRpcMsg *rpcMsg = sched->msg; - (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(rpcMsg); - rpcFreeCont(rpcMsg->pCont); - free(rpcMsg); + SQueuedMsg *queuedMsg = sched->msg; + (*tsMgmtProcessShellMsgFp[queuedMsg->msgType])(queuedMsg); + rpcFreeCont(queuedMsg->pCont); + free(queuedMsg); } -void mgmtAddToTranRequest(SRpcMsg *rpcMsg) { - SRpcMsg *queuedRpcMsg = malloc(sizeof(SRpcMsg)); - memcpy(queuedRpcMsg, rpcMsg, sizeof(SRpcMsg)); - +void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) { SSchedMsg schedMsg; - schedMsg.msg = queuedRpcMsg; + schedMsg.msg = queuedMsg; schedMsg.fp = mgmtProcessTranRequest; taosScheduleTask(tsMgmtTranQhandle, &schedMsg); } static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { if (sdbGetRunStatus() != SDB_STATUS_SERVING) { - mTrace("shell msg is ignored since SDB is not ready"); - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = TSDB_CODE_NOT_READY, .msgType = 0}; - rpcSendResponse(&rpcRsp); + mgmtProcessMsgWhileNotReady(rpcMsg); rpcFreeCont(rpcMsg->pCont); return; } - mTrace("%s is received", taosMsg[rpcMsg->msgType]); - if (tsMgmtProcessShellMsgFp[rpcMsg->msgType]) { - if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) { - (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(rpcMsg); - rpcFreeCont(rpcMsg->pCont); - } else { - mgmtAddToTranRequest(rpcMsg); - } - } else { - mError("%s is not processed", taosMsg[rpcMsg->msgType]); + if (tsMgmtProcessShellMsgFp[rpcMsg->msgType] == NULL) { mgmtProcessUnSupportMsg(rpcMsg); rpcFreeCont(rpcMsg->pCont); + return; } -} -static void mgmtProcessShowMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + if (pUser == NULL) { + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER); + rpcFreeCont(rpcMsg->pCont); + return; + } + + if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) { + SQueuedMsg queuedMsg = {0}; + queuedMsg.thandle = rpcMsg->handle; + queuedMsg.msgType = rpcMsg->msgType; + queuedMsg.contLen = rpcMsg->contLen; + queuedMsg.pCont = rpcMsg->pCont; + queuedMsg.pUser = pUser; + (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(&queuedMsg); + rpcFreeCont(rpcMsg->pCont); + } else { + SQueuedMsg *queuedMsg = calloc(1, sizeof(SQueuedMsg)); + queuedMsg->thandle = rpcMsg->handle; + queuedMsg->msgType = rpcMsg->msgType; + queuedMsg->contLen = rpcMsg->contLen; + queuedMsg->pCont = rpcMsg->pCont; + queuedMsg->pUser = pUser; + mgmtAddToShellQueue(queuedMsg); + } +} - SCMShowMsg *pShowMsg = rpcMsg->pCont; +static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { + SCMShowMsg *pShowMsg = pMsg->pCont; if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) { - if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { + if (mgmtCheckRedirect(pMsg->thandle)) { return; } } - int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; + if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) { + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_MSG_TYPE); + return; + } + + if (!tsMgmtShowMetaFp[pShowMsg->type]) { + mError("show type:%d %s is not support", pShowMsg->type, taosMsg[pShowMsg->type]); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OPS_NOT_SUPPORT); + return; + } + + int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; SCMShowRsp *pShowRsp = rpcMallocCont(size); if (pShowRsp == NULL) { - rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); return; } - int32_t code; - if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) { - code = TSDB_CODE_INVALID_MSG_TYPE; + SShowObj *pShow = (SShowObj *) calloc(1, sizeof(SShowObj) + htons(pShowMsg->payloadLen)); + pShow->signature = pShow; + pShow->type = pShowMsg->type; + pShow->payloadLen = htons(pShowMsg->payloadLen); + strcpy(pShow->db, pShowMsg->db); + memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); + + mgmtSaveQhandle(pShow); + pShowRsp->qhandle = htobe64((uint64_t) pShow); + + int32_t code = (*tsMgmtShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->thandle); + if (code == 0) { + SRpcMsg rpcRsp = { + .handle = pMsg->thandle, + .pCont = pShowRsp, + .contLen = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns, + .code = code, + .msgType = 0 + }; + rpcSendResponse(&rpcRsp); } else { - SShowObj *pShow = (SShowObj *) calloc(1, sizeof(SShowObj) + htons(pShowMsg->payloadLen)); - pShow->signature = pShow; - pShow->type = pShowMsg->type; - strcpy(pShow->db, pShowMsg->db); - mTrace("pShow:%p is allocated", pShow); - - // set the table name query condition - pShow->payloadLen = htons(pShowMsg->payloadLen); - memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); - - mgmtSaveQhandle(pShow); - pShowRsp->qhandle = htobe64((uint64_t) pShow); - if (tsMgmtShowMetaFp[pShowMsg->type]) { - code = (*tsMgmtShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, rpcMsg->handle); - if (code == 0) { - size = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns; - } else { - mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type, - taosMsg[(uint8_t) pShowMsg->type], code); - free(pShow); - } - } else { - code = TSDB_CODE_OPS_NOT_SUPPORT; - } + mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type, taosMsg[pShowMsg->type], code); + mgmtFreeQhandle(pShow); + rpcFreeCont(pShowRsp); } - - rpcRsp.code = code; - rpcRsp.pCont = pShowRsp; - rpcRsp.contLen = size; - rpcSendResponse(&rpcRsp); } -static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - +static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { int32_t rowsToRead = 0; int32_t size = 0; int32_t rowsRead = 0; - SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *) rpcMsg->pCont; + SRetrieveTableMsg *pRetrieve = pMsg->pCont; pRetrieve->qhandle = htobe64(pRetrieve->qhandle); /* @@ -226,16 +236,14 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { */ if (!mgmtCheckQhandle(pRetrieve->qhandle)) { mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pRetrieve->qhandle); - rpcRsp.code = TSDB_CODE_INVALID_QHANDLE; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_QHANDLE); return; } SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; - if (pShow->signature != (void *)pShow) { - mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature); - rpcRsp.code = TSDB_CODE_MEMORY_CORRUPTED; - rpcSendResponse(&rpcRsp); + if (!mgmtCheckQhandle(pRetrieve->qhandle)) { + mError("pShow:%p, query memory is corrupted", pShow); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MEMORY_CORRUPTED); return; } else { if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { @@ -258,10 +266,9 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { // if free flag is set, client wants to clean the resources if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) - rowsRead = (*tsMgmtShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, rpcMsg->handle); + rowsRead = (*tsMgmtShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, pMsg->thandle); - if (rowsRead < 0) { - rowsRead = 0; // TSDB_CODE_ACTION_IN_PROGRESS; + if (rowsRead < 0) { // TSDB_CODE_ACTION_IN_PROGRESS; rpcFreeCont(pRsp); return; } @@ -269,8 +276,13 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { pRsp->numOfRows = htonl(rowsRead); pRsp->precision = htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision - rpcRsp.pCont = pRsp; - rpcRsp.contLen = size; + SRpcMsg rpcRsp = { + .handle = pMsg->thandle, + .pCont = pRsp, + .contLen = size, + .code = 0, + .msgType = 0 + }; rpcSendResponse(&rpcRsp); if (rowsToRead == 0) { @@ -278,21 +290,19 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { } } -static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; +static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) { //SCMHeartBeatMsg *pHBMsg = (SCMHeartBeatMsg *) rpcMsg->pCont; //mgmtSaveQueryStreamList(pHBMsg); SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp)); if (pHBRsp == NULL) { - rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); return; } SRpcConnInfo connInfo; - if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) { - mError("conn:%p is already released while process heart beat msg", rpcMsg->handle); + if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) { + mError("conn:%p is already released while process heart beat msg", pMsg->thandle); return; } @@ -320,8 +330,13 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { pHBRsp->streamId = 0; pHBRsp->killConnection = 0; - rpcRsp.pCont = pHBRsp; - rpcRsp.contLen = sizeof(SCMHeartBeatRsp); + SRpcMsg rpcRsp = { + .handle = pMsg->thandle, + .pCont = pHBRsp, + .contLen = sizeof(SCMHeartBeatRsp), + .code = 0, + .msgType = 0 + }; rpcSendResponse(&rpcRsp); } @@ -340,13 +355,13 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr } } -static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SCMConnectMsg *pConnectMsg = (SCMConnectMsg *) rpcMsg->pCont; +static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) { + SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + SCMConnectMsg *pConnectMsg = pMsg->pCont; SRpcConnInfo connInfo; - if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) { - mError("conn:%p is already released while process connect msg", rpcMsg->handle); + if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) { + mError("thandle:%p is already released while process connect msg", pMsg->thandle); return; } @@ -450,6 +465,7 @@ static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) { } static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) { + mError("%s is not processed in shell", taosMsg[rpcMsg->msgType]); SRpcMsg rpcRsp = { .msgType = 0, .pCont = 0, @@ -459,3 +475,26 @@ static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) { }; rpcSendResponse(&rpcRsp); } + +static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg) { + mTrace("%s is ignored since SDB is not ready", taosMsg[rpcMsg->msgType]); + SRpcMsg rpcRsp = { + .msgType = 0, + .pCont = 0, + .contLen = 0, + .code = TSDB_CODE_NOT_READY, + .handle = rpcMsg->handle + }; + rpcSendResponse(&rpcRsp); +} + +void mgmtSendSimpleResp(void *thandle, int32_t code) { + SRpcMsg rpcRsp = { + .msgType = 0, + .pCont = 0, + .contLen = 0, + .code = code, + .handle = thandle + }; + rpcSendResponse(&rpcRsp); +} diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index ed4fa3b6a6a706b43a68084daae341a6b1d50d91..088792bfac537d5711872f249aa8a942433f6ac4 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -165,7 +165,7 @@ int32_t mgmtInitSuperTables() { mgmtSuperTableActionInit(); tsSuperTableSdb = sdbOpenTable(tsMaxTables, tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS, - "stables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtSuperTableAction); + "stables", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtSuperTableAction); if (tsSuperTableSdb == NULL) { mError("failed to init stables data"); return -1; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 188083d516904165fde44c6a8a600f90200ca8bd..ee0f19b89bc3e1474acc3d76d2018f65f77ca8b5 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -44,14 +44,16 @@ extern void *tsNormalTableSdb; extern void *tsChildTableSdb; -static void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg); -static void mgmtProcessDropTableMsg(SRpcMsg *rpcMsg); -static void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg); -static void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg); -static void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg); -static void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg); +static void mgmtProcessCreateTableMsg(SQueuedMsg *queueMsg); +static void mgmtProcessDropTableMsg(SQueuedMsg *queueMsg); +static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg); +static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg); +static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); +static void mgmtProcessSuperTableMetaMsg(SQueuedMsg *queueMsg); +static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg); static int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle); int32_t mgmtInitTables() { int32_t code = mgmtInitSuperTables(); @@ -79,6 +81,7 @@ int32_t mgmtInitTables() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_STABLE_META, mgmtProcessSuperTableMetaMsg); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateTableRsp); return TSDB_CODE_SUCCESS; } @@ -131,168 +134,56 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo return TSDB_CODE_SUCCESS; } +static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) { + SCMCreateTableMsg *pCreate = pMsg->pCont; + pCreate->numOfColumns = htons(pCreate->numOfColumns); + pCreate->numOfTags = htons(pCreate->numOfTags); + pCreate->sqlLen = htons(pCreate->sqlLen); -void mgmtProcessCreateVgroup(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { - SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SDbObj *pDb = mgmtGetDb(pCreate->db); - if (pDb == NULL) { - mError("table:%s, failed to create vgroup, db not found", pCreate->tableId); - rpcRsp.code = TSDB_CODE_INVALID_DB; - rpcSendResponse(&rpcRsp); - return; - } - - SVgObj *pVgroup = mgmtCreateVgroup(pDb); - if (pVgroup == NULL) { - mError("table:%s, failed to alloc vnode to vgroup", pCreate->tableId); - rpcRsp.code = TSDB_CODE_NO_ENOUGH_DNODES; - rpcSendResponse(&rpcRsp); - return; - } - - void *cont = rpcMallocCont(contLen); - if (cont == NULL) { - mError("table:%s, failed to create table, can not alloc memory", pCreate->tableId); - rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; - rpcSendResponse(&rpcRsp); - return; - } - - memcpy(cont, pCreate, contLen); - - SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); - info->type = TSDB_PROCESS_CREATE_VGROUP; - info->thandle = thandle; - info->ahandle = pVgroup; - info->cont = cont; - info->contLen = contLen; - - if (isGetMeta) { - info->type = TSDB_PROCESS_CREATE_VGROUP_GET_META; + SSchema *pSchema = (SSchema*) pCreate->schema; + for (int32_t i = 0; i < pCreate->numOfColumns + pCreate->numOfTags; ++i) { + pSchema->bytes = htons(pSchema->bytes); + pSchema->colId = i; + pSchema++; } - mgmtSendCreateVgroupMsg(pVgroup, info); -} - -//void mgmtSendCreateTableMsg(SMDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) { -// mTrace("table:%s, send create table msg, ahandle:%p", pCreate->tableId, ahandle); -// SRpcMsg rpcMsg = { -// .handle = ahandle, -// .pCont = pCreate, -// .contLen = htonl(pCreate->contLen), -// .code = 0, -// .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE -// }; -// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg); -//} -// - - -void mgmtProcessCreateTable(SVgObj *pVgroup, SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { - assert(pVgroup != NULL); - SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - int32_t sid = taosAllocateId(pVgroup->idPool); if (sid < 0) { - mTrace("table:%s, no enough sid in vgroup:%d, start to create a new vgroup", pCreate->tableId, pVgroup->vgId); - mgmtProcessCreateVgroup(pCreate, contLen, thandle, isGetMeta); + mTrace("thandle:%p, no enough sid in vgroup:%d, start to create a new one", pMsg->thandle, pVgroup->vgId); + mgmtCreateVgroup(pMsg); return; } + int32_t code; STableInfo *pTable; - SMDCreateTableMsg *pDCreate = NULL; + SMDCreateTableMsg *pMDCreate = NULL; if (pCreate->numOfColumns == 0) { - mTrace("table:%s, start to create child table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid); - rpcRsp.code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); + mTrace("thandle:%p, create ctable:%s, vgroup:%d sid:%d ahandle:%p", pMsg->thandle, pCreate->tableId, pVgroup->vgId, sid, pMsg); + code = mgmtCreateChildTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); } else { - mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid); - rpcRsp.code = mgmtCreateNormalTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); + mTrace("thandle:%p, create ntable:%s, vgroup:%d sid:%d ahandle:%p", pMsg->thandle, pCreate->tableId, pVgroup->vgId, sid, pMsg); + code = mgmtCreateNormalTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); } - if (rpcRsp.code != TSDB_CODE_SUCCESS) { - mTrace("table:%s, failed to create table in vgroup:%d sid:%d ", pCreate->tableId, pVgroup->vgId, sid); - rpcSendResponse(&rpcRsp); + if (code != TSDB_CODE_SUCCESS) { + mTrace("thandle:%p, failed to create table:%s in vgroup:%d", pMsg->thandle, pCreate->tableId, pVgroup->vgId); + mgmtSendSimpleResp(pMsg->thandle, code); return; } - assert(pDCreate != NULL); - assert(pTable != NULL); - - SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); - info->type = TSDB_PROCESS_CREATE_TABLE; - info->thandle = thandle; - info->ahandle = pTable; SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - if (isGetMeta) { - info->type = TSDB_PROCESS_CREATE_TABLE_GET_META; - } - SRpcMsg rpcMsg = { - .handle = info, - .pCont = pCreate, - .contLen = htonl(pDCreate->contLen), - .code = 0, - .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE + .handle = pMsg, + .pCont = pCreate, + .contLen = htonl(pMDCreate->contLen), + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE }; - mgmtSendMsgToDnode(&ipSet, &rpcMsg); -} -int32_t mgmtCreateTable(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { - SDbObj *pDb = mgmtGetDb(pCreate->db); - if (pDb == NULL) { - mError("table:%s, failed to create table, db not selected", pCreate->tableId); - return TSDB_CODE_DB_NOT_SELECTED; - } - - STableInfo *pTable = mgmtGetTable(pCreate->tableId); - if (pTable != NULL) { - if (pCreate->igExists) { - mTrace("table:%s, table is already exist, think it success", pCreate->tableId); - return TSDB_CODE_SUCCESS; - } else { - mError("table:%s, failed to create table, table already exist", pCreate->tableId); - return TSDB_CODE_TABLE_ALREADY_EXIST; - } - } - - SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - assert(pAcct != NULL); - - int32_t code = mgmtCheckTableLimit(pAcct, pCreate->numOfColumns); - if (code != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to create table, table num exceed the limit", pCreate->tableId); - return code; - } - - if (mgmtCheckExpired()) { - mError("table:%s, failed to create table, grant expired", pCreate->tableId); - return TSDB_CODE_GRANT_EXPIRED; - } - - if (pCreate->numOfTags != 0) { - mTrace("table:%s, start to create super table, tags:%d columns:%d", - pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns); - return mgmtCreateSuperTable(pDb, pCreate); - } - - code = mgmtCheckTimeSeries(pCreate->numOfColumns); - if (code != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to create table, timeseries exceed the limit", pCreate->tableId); - return TSDB_CODE_SUCCESS; - } - - SVgObj *pVgroup = mgmtGetAvailableVgroup(pDb); - if (pVgroup == NULL) { - mTrace("table:%s, no avaliable vgroup, start to create a new one", pCreate->tableId); - mgmtProcessCreateVgroup(pCreate, contLen, thandle, isGetMeta); - } else { - mTrace("table:%s, try to create table in vgroup:%d", pCreate->tableId, pVgroup->vgId); - mgmtProcessCreateTable(pVgroup, pCreate, contLen, thandle, isGetMeta); - } - - return TSDB_CODE_ACTION_IN_PROGRESS; + pMsg->ahandle = pTable; + mgmtSendMsgToDnode(&ipSet, &rpcMsg); } int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) { @@ -547,114 +438,142 @@ SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) { } void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) { + // TODO: if dirty, delete from sdb pTable->dirty = isDirty; } -void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; +void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; - SCMCreateTableMsg *pCreate = (SCMCreateTableMsg *) rpcMsg->pCont; - pCreate->numOfColumns = htons(pCreate->numOfColumns); - pCreate->numOfTags = htons(pCreate->numOfTags); - pCreate->sqlLen = htons(pCreate->sqlLen); + SCMCreateTableMsg *pCreate = pMsg->pCont; + mTrace("thandle:%p, start to create table:%s", pMsg->thandle, pCreate->tableId); - SSchema *pSchema = (SSchema*) pCreate->schema; - for (int32_t i = 0; i < pCreate->numOfColumns + pCreate->numOfTags; ++i) { - pSchema->bytes = htons(pSchema->bytes); - pSchema->colId = i; - pSchema++; + if (mgmtCheckExpired()) { + mError("thandle:%p, failed to create table:%s, grant expired", pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); + return; } - if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to create table, need redirect message", pCreate->tableId); + if (!pMsg->pUser->writeAuth) { + mError("thandle:%p, failed to create table:%s, no rights", pMsg->thandle, pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return; } - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pUser == NULL) { - mError("table:%s, failed to create table, invalid user", pCreate->tableId); - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + SAcctObj *pAcct = pMsg->pUser->pAcct; + int32_t code = mgmtCheckTableLimit(pAcct, htons(pCreate->numOfColumns)); + if (code != TSDB_CODE_SUCCESS) { + mError("thandle:%p, failed to create table:%s, exceed the limit", pMsg->thandle, pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return; } - if (!pUser->writeAuth) { - mError("table:%s, failed to create table, no rights", pCreate->tableId); - rpcRsp.code = TSDB_CODE_NO_RIGHTS; - rpcSendResponse(&rpcRsp); + pMsg->pDb = mgmtGetDb(pCreate->db); + if (pMsg->pDb == NULL) { + mError("thandle:%p, failed to create table:%s, db not selected", pMsg->thandle, pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); return; } - int32_t code = mgmtCreateTable(pCreate, rpcMsg->contLen, rpcMsg->handle, false); - if (code != TSDB_CODE_ACTION_IN_PROGRESS) { - rpcRsp.code = code; - rpcSendResponse(&rpcRsp); + STableInfo *pTable = mgmtGetTable(pCreate->tableId); + if (pTable != NULL) { + if (pCreate->igExists) { + mTrace("thandle:%p, table:%s is already exist", pMsg->thandle, pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); + return; + } else { + mError("thandle:%p, failed to create table:%s, table already exist", pMsg->thandle, pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_TABLE_ALREADY_EXIST); + return; + } + } + + if (pCreate->numOfTags != 0) { + mTrace("thandle:%p, start to create super table:%s, tags:%d columns:%d", + pMsg->thandle, pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns); + code = mgmtCreateSuperTable(pMsg->pDb, pCreate); + mgmtSendSimpleResp(pMsg->thandle, code); + return; + } + + code = mgmtCheckTimeSeries(pCreate->numOfColumns); + if (code != TSDB_CODE_SUCCESS) { + mError("thandle:%p, failed to create table:%s, timeseries exceed the limit", pMsg->thandle, pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, code); + return; + } + + SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); + memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); + pMsg->pCont = NULL; + + SVgObj *pVgroup = mgmtGetAvailableVgroup(pMsg->pDb); + if (pVgroup == NULL) { + mTrace("thandle:%p, table:%s start to create a new vgroup", newMsg->thandle, pCreate->tableId); + mgmtCreateVgroup(newMsg); + } else { + mTrace("thandle:%p, create table:%s in vgroup:%d", newMsg->thandle, pCreate->tableId, pVgroup->vgId); + mgmtCreateTable(pVgroup, newMsg); } } -void mgmtProcessDropTableMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SCMDropTableMsg *pDrop = (SCMDropTableMsg *) rpcMsg->pCont; +void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { + SCMDropTableMsg *pDrop = pMsg->pCont; - if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to drop table, need redirect message", pDrop->tableId); + if (mgmtCheckRedirect(pMsg->thandle)) { + mError("thandle:%p, failed to drop table:%s, need redirect message", pMsg->thandle, pDrop->tableId); return; } - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { mError("table:%s, failed to drop table, invalid user", pDrop->tableId); - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); return; } if (!pUser->writeAuth) { mError("table:%s, failed to drop table, no rights", pDrop->tableId); - rpcRsp.code = TSDB_CODE_NO_RIGHTS; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return; } SDbObj *pDb = mgmtGetDbByTableId(pDrop->tableId); if (pDb == NULL) { mError("table:%s, failed to drop table, db not selected", pDrop->tableId); - rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); return; } int32_t code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists); if (code != TSDB_CODE_ACTION_IN_PROGRESS) { - rpcRsp.code = code; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); } } -void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { +void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) { return; } - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); return; } - SCMAlterTableMsg *pAlter = (SCMAlterTableMsg *) rpcMsg->pCont; + SCMAlterTableMsg *pAlter = pMsg->pCont; + int32_t code; if (!pUser->writeAuth) { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_NO_RIGHTS; } else { pAlter->type = htons(pAlter->type); pAlter->numOfCols = htons(pAlter->numOfCols); if (pAlter->numOfCols > 2) { mError("table:%s error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols); - rpcRsp.code = TSDB_CODE_APP_ERROR; + code = TSDB_CODE_APP_ERROR; } else { SDbObj *pDb = mgmtGetDb(pAlter->db); if (pDb) { @@ -662,17 +581,17 @@ void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg) { pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes); } - rpcRsp.code = mgmtAlterTable(pDb, pAlter); - if (rpcRsp.code == 0) { + code = mgmtAlterTable(pDb, pAlter); + if (code == 0) { mLPrint("table:%s is altered by %s", pAlter->tableId, pUser->user); } } else { - rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED; + code = TSDB_CODE_DB_NOT_SELECTED; } } } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); } void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) { @@ -707,21 +626,14 @@ void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) { rpcSendResponse(&rpcRsp); } - -void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp; - rpcRsp.handle = rpcMsg->handle; - rpcRsp.pCont = NULL; - rpcRsp.contLen = 0; - - SCMTableInfoMsg *pInfo = rpcMsg->pCont; +void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { + SCMTableInfoMsg *pInfo = pMsg->pCont; pInfo->createFlag = htons(pInfo->createFlag); - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { mError("table:%s, failed to get table meta, invalid user", pInfo->tableId); - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); return; } @@ -729,12 +641,11 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) { if (pTable == NULL) { if (pInfo->createFlag != 1) { mError("table:%s, failed to get table meta, table not exist", pInfo->tableId); - rpcRsp.code = TSDB_CODE_INVALID_TABLE; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); return; } else { // on demand create table from super table if table does not exists - if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { + if (mgmtCheckRedirect(pMsg->thandle)) { mError("table:%s, failed to create table while get meta info, need redirect message", pInfo->tableId); return; } @@ -743,8 +654,7 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) { SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); if (pCreateMsg == NULL) { mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId); - rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); return; } @@ -752,41 +662,34 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) { strcpy(pCreateMsg->tableId, pInfo->tableId); mError("table:%s, start to create table while get meta info", pInfo->tableId); - mgmtCreateTable(pCreateMsg, contLen, rpcMsg->handle, true); +// mgmtCreateTable(pCreateMsg, contLen, pMsg->thandle, true); } } else { - mgmtProcessGetTableMeta(pTable, rpcMsg->handle); + mgmtProcessGetTableMeta(pTable, pMsg->thandle); } } -void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp; - rpcRsp.handle = rpcMsg->handle; - rpcRsp.pCont = NULL; - rpcRsp.contLen = 0; - +void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { SRpcConnInfo connInfo; - if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) { - mError("conn:%p is already released while get mulit table meta", rpcMsg->handle); + if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) { + mError("conn:%p is already released while get mulit table meta", pMsg->thandle); return; } bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); SUserObj *pUser = mgmtGetUser(connInfo.user); if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); return; } - SCMMultiTableInfoMsg *pInfo = rpcMsg->pCont; + SCMMultiTableInfoMsg *pInfo = pMsg->pCont; pInfo->numOfTables = htonl(pInfo->numOfTables); int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen); if (pMultiMeta == NULL) { - rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); return; } @@ -823,29 +726,69 @@ void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) { } } + SRpcMsg rpcRsp = {0}; + rpcRsp.handle = pMsg->thandle; rpcRsp.pCont = pMultiMeta; rpcRsp.contLen = pMultiMeta->contLen; rpcSendResponse(&rpcRsp); } -void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SCMSuperTableInfoMsg *pInfo = rpcMsg->pCont; +void mgmtProcessSuperTableMetaMsg(SQueuedMsg *pMsg) { + SCMSuperTableInfoMsg *pInfo = pMsg->pCont; STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId); if (pTable == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_TABLE; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); return; } SCMSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable); if (pRsp != NULL) { int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); + SRpcMsg rpcRsp = {0}; + rpcRsp.handle = pMsg->thandle; rpcRsp.pCont = pRsp; rpcRsp.contLen = msgLen; rpcSendResponse(&rpcRsp); } else { - rpcRsp.code = TSDB_CODE_INVALID_TABLE; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); + } +} + +static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { + if (rpcMsg->handle == NULL) return; + + SQueuedMsg *queueMsg = rpcMsg->handle; + queueMsg->received++; + + STableInfo *pTable = queueMsg->ahandle; + mTrace("thandle:%p, create table:%s rsp received, ahandle:%p code:%d received:%d", + queueMsg->thandle, pTable->tableId, rpcMsg->handle, rpcMsg->code, queueMsg->received); + + if (rpcMsg->code != TSDB_CODE_SUCCESS) { + mgmtSetTableDirty(pTable, true); + //sdbDeleteRow(tsVgroupSdb, pVgroup); + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + mError("table:%s, failed to create in dnode, reason:%s, set it dirty", pTable->tableId, tstrerror(rpcMsg->code)); + mgmtSetTableDirty(pTable, true); + } else { + mTrace("table:%s, created in dnode", pTable->tableId); + mgmtSetTableDirty(pTable, false); + + if (queueMsg->msgType != TSDB_MSG_TYPE_CM_CREATE_TABLE) { + SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); + newMsg->msgType = queueMsg->msgType; + newMsg->thandle = queueMsg->thandle; + newMsg->pDb = queueMsg->pDb; + newMsg->pUser = queueMsg->pUser; + newMsg->contLen = queueMsg->contLen; + newMsg->pCont = rpcMallocCont(newMsg->contLen); + memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); + mTrace("table:%s, start to process get meta", pTable->tableId); + mgmtAddToShellQueue(newMsg); + } else { + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + } } -} \ No newline at end of file + + free(queueMsg); +} diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 821e553810d7f430dbe2eb8594eb31bf6e3ec34d..22c6cbc1dc0ca59c03183ae1765a761818c2d29e 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -33,9 +33,9 @@ static int32_t mgmtUpdateUser(SUserObj *pUser); static int32_t mgmtGetUserMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg); -static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg); -static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg); +static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg); +static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg); +static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg); static void *(*mgmtUserActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); static void *mgmtUserActionInsert(void *row, char *str, int32_t size, int32_t *ssize); @@ -59,7 +59,7 @@ int32_t mgmtInitUsers() { SUserObj tObj; tsUserUpdateSize = tObj.updateEnd - (int8_t *)&tObj; - tsUserSdb = sdbOpenTable(tsMaxUsers, tsUserUpdateSize, "users", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtUserAction); + tsUserSdb = sdbOpenTable(tsMaxUsers, tsUserUpdateSize, "users", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtUserAction); if (tsUserSdb == NULL) { mError("failed to init user data"); return -1; @@ -337,52 +337,40 @@ SUserObj *mgmtGetUserFromConn(void *pConn) { return NULL; } -static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; - - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return; - } +static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; + int32_t code; + SUserObj *pUser = pMsg->pUser; + if (pUser->superAuth) { - SCMCreateUserMsg *pCreate = rpcMsg->pCont; - rpcRsp.code = mgmtCreateUser(pUser->pAcct, pCreate->user, pCreate->pass); - if (rpcRsp.code == TSDB_CODE_SUCCESS) { + SCMCreateUserMsg *pCreate = pMsg->pCont; + code = mgmtCreateUser(pUser->pAcct, pCreate->user, pCreate->pass); + if (code == TSDB_CODE_SUCCESS) { mLPrint("user:%s is created by %s", pCreate->user, pUser->user); } } else { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); } -static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; +static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; - SUserObj *pOperUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pOperUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return; - } - - SCMAlterUserMsg *pAlter = rpcMsg->pCont; + int32_t code; + SUserObj *pOperUser = pMsg->pUser; + + SCMAlterUserMsg *pAlter = pMsg->pCont; SUserObj *pUser = mgmtGetUser(pAlter->user); if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); return; } if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return; } @@ -405,13 +393,13 @@ static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) { if (hasRight) { memset(pUser->pass, 0, sizeof(pUser->pass)); taosEncryptPass((uint8_t*)pAlter->pass, strlen(pAlter->pass), pUser->pass); - rpcRsp.code = mgmtUpdateUser(pUser); - mLPrint("user:%s password is altered by %s, code:%d", pAlter->user, pUser->user, rpcRsp.code); + code = mgmtUpdateUser(pUser); + mLPrint("user:%s password is altered by %s, code:%d", pAlter->user, pUser->user, code); } else { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); return; } @@ -454,42 +442,34 @@ static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) { pUser->writeAuth = 1; } - rpcRsp.code = mgmtUpdateUser(pUser); - mLPrint("user:%s privilege is altered by %s, code:%d", pAlter->user, pUser->user, rpcRsp.code); + code = mgmtUpdateUser(pUser); + mLPrint("user:%s privilege is altered by %s, code:%d", pAlter->user, pUser->user, code); } else { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); return; } - rpcRsp.code = TSDB_CODE_NO_RIGHTS; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); } -static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; +static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; - SUserObj *pOperUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pOperUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return ; - } + int32_t code; + SUserObj *pOperUser = pMsg->pUser; - SCMDropUserMsg *pDrop = rpcMsg->pCont; + SCMDropUserMsg *pDrop = pMsg->pCont; SUserObj *pUser = mgmtGetUser(pDrop->user); if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); return ; } if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return ; } @@ -511,13 +491,13 @@ static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) { } if (hasRight) { - rpcRsp.code = mgmtDropUser(pUser->pAcct, pDrop->user); - if (rpcRsp.code == TSDB_CODE_SUCCESS) { + code = mgmtDropUser(pUser->pAcct, pDrop->user); + if (code == TSDB_CODE_SUCCESS) { mLPrint("user:%s is dropped by %s", pDrop->user, pUser->user); } } else { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index c4a11bd11ea269ba5a1cdc2dacc99348c8297de3..f953db6ca47ef11a9aacc5538b029fcc40ebaa62 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -24,6 +24,7 @@ #include "mgmtDb.h" #include "mgmtDClient.h" #include "mgmtDnode.h" +#include "mgmtProfile.h" #include "mgmtShell.h" #include "mgmtTable.h" #include "mgmtVgroup.h" @@ -42,6 +43,9 @@ static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t static int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg); + +void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); static void mgmtVgroupActionInit() { SVgObj tObj; @@ -69,7 +73,7 @@ int32_t mgmtInitVgroups() { mgmtVgroupActionInit(); - tsVgroupSdb = sdbOpenTable(tsMaxVGroups, tsVgUpdateSize, "vgroups", SDB_KEYTYPE_AUTO, tsMgmtDirectory, mgmtVgroupAction); + tsVgroupSdb = sdbOpenTable(tsMaxVGroups, tsVgUpdateSize, "vgroups", SDB_KEYTYPE_AUTO, tsMnodeDir, mgmtVgroupAction); if (tsVgroupSdb == NULL) { mError("failed to init vgroups data"); return -1; @@ -114,6 +118,7 @@ int32_t mgmtInitVgroups() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp); mTrace("vgroup is initialized"); return 0; @@ -123,19 +128,6 @@ SVgObj *mgmtGetVgroup(int32_t vgId) { return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId); } -int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup) { - int32_t sid = taosAllocateId(pVgroup->idPool); - if (sid < 0) { - mWarn("table:%s, vgroup:%d run out of ID, num:%d", pDb->name, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool)); - pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS; - mgmtCreateVgroup(pDb); - terrno = TSDB_CODE_ACTION_IN_PROGRESS; - } - - terrno = 0; - return sid; -} - /* * TODO: check if there is enough sids */ @@ -155,21 +147,25 @@ void mgmtProcessVgTimer(void *handle, void *tmrId) { pDb->vgTimer = NULL; } -SVgObj *mgmtCreateVgroup(SDbObj *pDb) { +void mgmtCreateVgroup(SQueuedMsg *pMsg) { + SDbObj *pDb = pMsg->pDb; + if (pDb == NULL) { + mError("thandle:%p, failed to create vgroup, db not found", pMsg->thandle); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB); + return; + } + SVgObj *pVgroup = (SVgObj *)calloc(sizeof(SVgObj), 1); strcpy(pVgroup->dbName, pDb->name); pVgroup->numOfVnodes = pDb->cfg.replications; - pVgroup->createdTime = taosGetTimestampMs(); - - // based on load balance, create a new one if (mgmtAllocVnodes(pVgroup) != 0) { - mError("db:%s, no enough free dnode to alloc %d vnodes", pDb->name, pVgroup->numOfVnodes); + mError("thandle:%p, db:%s no enough dnode to alloc %d vnodes", pMsg->thandle, pDb->name, pVgroup->numOfVnodes); free(pVgroup); - pDb->vgStatus = TSDB_VG_STATUS_FULL; - taosTmrReset(mgmtProcessVgTimer, 5000, pDb, tsMgmtTmr, &pDb->vgTimer); - return NULL; + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES); + return; } + pVgroup->createdTime = taosGetTimestampMs(); pVgroup->tableList = (STableInfo **) calloc(sizeof(STableInfo *), pDb->cfg.maxSessions); pVgroup->numOfTables = 0; pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions); @@ -179,11 +175,16 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) { sdbInsertRow(tsVgroupSdb, pVgroup, 0); - mTrace("vgroup:%d, vgroup is created, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); - for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) - mTrace("vgroup:%d, dnode:%s vnode:%d is created", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode); + mPrint("thandle:%p, vgroup:%d is created in mnode, db:%s replica:%d", pMsg->thandle, pVgroup->vgId, pDb->name, + pVgroup->numOfVnodes); + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + mPrint("thandle:%p, vgroup:%d, dnode:%s vnode:%d", pMsg->thandle, pVgroup->vgId, + taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode); + } - return pVgroup; + pMsg->ahandle = pVgroup; + pMsg->expected = pVgroup->numOfVnodes; + mgmtSendCreateVgroupMsg(pVgroup, pMsg); } int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { @@ -514,13 +515,13 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) { SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) return NULL; - SMDCreateVnodeMsg *pVPeers = rpcMallocCont(sizeof(SMDCreateVnodeMsg)); - if (pVPeers == NULL) return NULL; + SMDCreateVnodeMsg *pVnode = rpcMallocCont(sizeof(SMDCreateVnodeMsg)); + if (pVnode == NULL) return NULL; - pVPeers->vnode = htonl(vnode); - pVPeers->cfg = pDb->cfg; + pVnode->vnode = htonl(vnode); + pVnode->cfg = pDb->cfg; - SVnodeCfg *pCfg = &pVPeers->cfg; + SVnodeCfg *pCfg = &pVnode->cfg; pCfg->vgId = htonl(pVgroup->vgId); pCfg->maxSessions = htonl(pCfg->maxSessions); pCfg->cacheBlockSize = htonl(pCfg->cacheBlockSize); @@ -534,13 +535,14 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) { pCfg->replications = (char) pVgroup->numOfVnodes; pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); - SVnodeDesc *vpeerDesc = pVPeers->vpeerDesc; + SVnodeDesc *vpeerDesc = pVnode->vpeerDesc; for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { - vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip); + vpeerDesc[j].vgId = htonl(pVgroup->vgId); + vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip); vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode); } - return pVPeers; + return pVnode; } SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) { @@ -558,7 +560,11 @@ SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) { } SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { - SRpcIpSet ipSet = {.numOfIps = pVgroup->numOfVnodes, .inUse = 0, .port = tsMnodeDnodePort + 1}; + SRpcIpSet ipSet = { + .numOfIps = pVgroup->numOfVnodes, + .inUse = 0, + .port = tsDnodeMnodePort + }; for (int i = 0; i < pVgroup->numOfVnodes; ++i) { ipSet.ip[i] = pVgroup->vnodeGid[i].ip; } @@ -566,7 +572,12 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { } SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) { - SRpcIpSet ipSet = {.ip[0] = ip, .numOfIps = 1, .inUse = 0, .port = tsMnodeDnodePort + 1}; + SRpcIpSet ipSet = { + .ip[0] = ip, + .numOfIps = 1, + .inUse = 0, + .port = tsDnodeMnodePort + }; return ipSet; } @@ -574,19 +585,54 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, vo mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle); SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup, vnode); SRpcMsg rpcMsg = { - .handle = ahandle, - .pCont = pCreate, - .contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0, - .code = 0, - .msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE + .handle = ahandle, + .pCont = pCreate, + .contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0, + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE }; mgmtSendMsgToDnode(ipSet, &rpcMsg); } void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { - mTrace("vgroup:%d, send create all vnodes msg, handle:%p", pVgroup->vgId, ahandle); + mTrace("send create vgroup:%d msg, ahandle:%p", pVgroup->vgId, ahandle); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle); } +} + +static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { + if (rpcMsg->handle == NULL) return; + + SQueuedMsg *queueMsg = rpcMsg->handle; + queueMsg->received++; + if (rpcMsg->code == TSDB_CODE_SUCCESS) { + queueMsg->code = rpcMsg->code; + queueMsg->successed++; + } + + SVgObj *pVgroup = queueMsg->ahandle; + mTrace("thandle:%p, vgroup:%d create vnode rsp received, ahandle:%p code:%d received:%d successed:%d expected:%d", + queueMsg->thandle, pVgroup->vgId, rpcMsg->handle, rpcMsg->code, queueMsg->received, queueMsg->successed, + queueMsg->expected); + + if (queueMsg->received != queueMsg->expected) return; + + if (queueMsg->received == queueMsg->successed) { + SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); + newMsg->msgType = queueMsg->msgType; + newMsg->thandle = queueMsg->thandle; + newMsg->pDb = queueMsg->pDb; + newMsg->pUser = queueMsg->pUser; + newMsg->contLen = queueMsg->contLen; + newMsg->pCont = rpcMallocCont(newMsg->contLen); + memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); + mgmtAddToShellQueue(newMsg); + } else { + sdbDeleteRow(tsVgroupSdb, pVgroup); + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + } + + free(queueMsg); } \ No newline at end of file diff --git a/src/os/darwin/src/tdarwin.c b/src/os/darwin/src/tdarwin.c index ff9576542f2bda45db0d016d27a6957e0fd19c9a..7896592030f8e18e5d4be5efad0bf7c78100dfb8 100644 --- a/src/os/darwin/src/tdarwin.c +++ b/src/os/darwin/src/tdarwin.c @@ -34,7 +34,7 @@ #include "tutil.h" char configDir[TSDB_FILENAME_LEN] = "/etc/taos"; -char tsDirectory[TSDB_FILENAME_LEN] = "/var/lib/taos"; +char tsVnodeDir[TSDB_FILENAME_LEN] = "/var/lib/taos"; char dataDir[TSDB_FILENAME_LEN] = "/var/lib/taos"; char logDir[TSDB_FILENAME_LEN] = "~/TDengineLog"; char scriptDir[TSDB_FILENAME_LEN] = "/etc/taos"; diff --git a/src/os/linux/src/tlinux.c b/src/os/linux/src/tlinux.c index 98faffdfd27fb0dbdbd641141a2f8ec99ead3f49..bce4a8f13db7727cda101943529ebbd23481a32a 100644 --- a/src/os/linux/src/tlinux.c +++ b/src/os/linux/src/tlinux.c @@ -35,7 +35,9 @@ #include "ttimer.h" char configDir[TSDB_FILENAME_LEN] = "/etc/taos"; -char tsDirectory[TSDB_FILENAME_LEN] = "/var/lib/taos"; +char tsVnodeDir[TSDB_FILENAME_LEN] = {0}; +char tsDnodeDir[TSDB_FILENAME_LEN] = {0}; +char tsMnodeDir[TSDB_FILENAME_LEN] = {0}; char dataDir[TSDB_FILENAME_LEN] = "/var/lib/taos"; char logDir[TSDB_FILENAME_LEN] = "/var/log/taos"; char scriptDir[TSDB_FILENAME_LEN] = "/etc/taos"; diff --git a/src/os/windows/src/twindows.c b/src/os/windows/src/twindows.c index 15e42d8948faf30c7f4462479fc5e2eed549ff50..83c6017b398c9321aa2fd8bf39e05fcc6871dcd3 100644 --- a/src/os/windows/src/twindows.c +++ b/src/os/windows/src/twindows.c @@ -33,7 +33,7 @@ #include char configDir[TSDB_FILENAME_LEN] = "C:/TDengine/cfg"; -char tsDirectory[TSDB_FILENAME_LEN] = "C:/TDengine/data"; +char tsVnodeDir[TSDB_FILENAME_LEN] = "C:/TDengine/data"; char logDir[TSDB_FILENAME_LEN] = "C:/TDengine/log"; char dataDir[TSDB_FILENAME_LEN] = "C:/TDengine/data"; char scriptDir[TSDB_FILENAME_LEN] = "C:/TDengine/script"; diff --git a/src/util/inc/ihash.h b/src/util/inc/ihash.h index 9623f95bbd8b65ad4b2dbcae2f26112eb6beb1e1..1d7a8f79309dd3a5bfc2d8eee67f1b7bedb187ee 100644 --- a/src/util/inc/ihash.h +++ b/src/util/inc/ihash.h @@ -34,6 +34,12 @@ char *taosAddIntHash(void *handle, uint64_t key, char *pData); int32_t taosHashInt(void *handle, uint64_t key); +void taosCleanUpIntHashWithFp(void *handle, void (*fp)(char *)); + +char *taosVisitIntHashWithFp(void *handle, int (*fp)(char *)); + +int32_t taosGetIntHashSize(void *handle); + #ifdef __cplusplus } #endif diff --git a/src/util/inc/tglobalcfg.h b/src/util/inc/tglobalcfg.h index 18523c36799390ecaecb24601c9cb2a32419f83a..993992ffcbf9a33b00761dd20141224f912db958 100644 --- a/src/util/inc/tglobalcfg.h +++ b/src/util/inc/tglobalcfg.h @@ -50,7 +50,9 @@ extern int tscEmbedded; extern int64_t tsMsPerDay[2]; extern char configDir[]; -extern char tsDirectory[]; +extern char tsVnodeDir[]; +extern char tsDnodeDir[]; +extern char tsMnodeDir[]; extern char dataDir[]; extern char logDir[]; extern char scriptDir[]; @@ -263,9 +265,6 @@ SGlobalConfig *tsGetConfigOption(const char *option); #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) -extern char tsMgmtDirectory[]; - - #ifdef __cplusplus } #endif diff --git a/src/util/src/ihash.c b/src/util/src/ihash.c index 8c492b03f867036d3fcb3a52f872f57142cba7ec..a61ce6654fe614ddbcd5c7720f8ec40d484161d5 100644 --- a/src/util/src/ihash.c +++ b/src/util/src/ihash.c @@ -26,7 +26,7 @@ typedef struct { IHashNode **hashList; int32_t maxSessions; int32_t dataSize; - int32_t (*hashFp)(void *, uint64_t key); + int32_t (*hashFp)(void *, uint64_t key); pthread_mutex_t mutex; } IHashObj; @@ -186,3 +186,93 @@ void taosCleanUpIntHash(void *handle) { memset(pObj, 0, sizeof(IHashObj)); free(pObj); } + + +void taosCleanUpIntHashWithFp(void *handle, void (*fp)(char *)) { + IHashObj * pObj; + IHashNode *pNode, *pNext; + + pObj = (IHashObj *)handle; + if (pObj == NULL || pObj->maxSessions <= 0) return; + + pthread_mutex_lock(&pObj->mutex); + + if (pObj->hashList) { + for (int i = 0; i < pObj->maxSessions; ++i) { + pNode = pObj->hashList[i]; + while (pNode) { + pNext = pNode->next; + if (fp != NULL) fp(pNode->data); + free(pNode); + pNode = pNext; + } + } + + free(pObj->hashList); + } + + pthread_mutex_unlock(&pObj->mutex); + + pthread_mutex_destroy(&pObj->mutex); + + memset(pObj, 0, sizeof(IHashObj)); + free(pObj); +} + +char *taosVisitIntHashWithFp(void *handle, int (*fp)(char *)) { + IHashObj * pObj; + IHashNode *pNode, *pNext; + char * pData = NULL; + + pObj = (IHashObj *)handle; + if (pObj == NULL || pObj->maxSessions <= 0) return NULL; + + pthread_mutex_lock(&pObj->mutex); + + if (pObj->hashList) { + for (int i = 0; i < pObj->maxSessions; ++i) { + pNode = pObj->hashList[i]; + while (pNode) { + pNext = pNode->next; + int flag = fp(pNode->data); + if (flag) { + pData = pNode->data; + goto VisitEnd; + } + + pNode = pNext; + } + } + } + +VisitEnd: + + pthread_mutex_unlock(&pObj->mutex); + return pData; +} + +int32_t taosGetIntHashSize(void *handle) { + IHashObj * pObj; + IHashNode *pNode, *pNext; + char * pData = NULL; + int32_t num = 0; + + pObj = (IHashObj *)handle; + if (pObj == NULL || pObj->maxSessions <= 0) return NULL; + + pthread_mutex_lock(&pObj->mutex); + + if (pObj->hashList) { + for (int i = 0; i < pObj->maxSessions; ++i) { + pNode = pObj->hashList[i]; + while (pNode) { + pNext = pNode->next; + num++; + pNode = pNext; + } + } + } + + pthread_mutex_unlock(&pObj->mutex); + return num; +} \ No newline at end of file diff --git a/src/util/src/shash.c b/src/util/src/shash.c index 5be0dfa9739157ab231bdad7c52d7e15416736dd..da97af84bbc957ba102add1b34bff23d71c91d0e 100644 --- a/src/util/src/shash.c +++ b/src/util/src/shash.c @@ -33,7 +33,7 @@ typedef struct { SHashNode **hashList; uint32_t maxSessions; uint32_t dataSize; - uint32_t (*hashFp)(void *, char *string); + uint32_t (*hashFp)(void *, char *string); pthread_mutex_t mutex; } SHashObj; diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 57798b6a091401d2eb644b4337612428032b7ea6..8c3fadf486acb4be3997eed299f6aa528b9432f3 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -42,6 +42,7 @@ typedef enum { typedef struct { int8_t precision; + int32_t vgId; int32_t tsdbId; int32_t maxTables; // maximum number of tables this repository can have int32_t daysPerFile; // day per file sharding policy