未验证 提交 9e65741b 编写于 作者: D dapan1121 提交者: GitHub

Merge branch '3.0' into feature/qnode

......@@ -68,6 +68,7 @@ typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, cha
typedef bool (*RpcRfp)(int32_t code);
typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN];
uint16_t localPort; // local port
char * label; // for debug purpose
int numOfThreads; // number of threads to handle connections
......
......@@ -161,6 +161,7 @@ int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec);
TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
bool taosValidIpAndPort(uint32_t ip, uint16_t port);
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int32_t taosKeepTcpAlive(TdSocketPtr pSocket);
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, int *addrLen);
......
......@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE
#include "dmImp.h"
#define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key"
#define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd"
static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
......@@ -130,10 +130,10 @@ _OVER:
}
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans;
SDnodeTrans * pTrans = &pDnode->trans;
tmsg_t msgType = pMsg->msgType;
bool isReq = msgType & 1u;
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
SMsgHandle * pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
if (msgType == TDMT_DND_SERVER_STATUS) {
......@@ -517,7 +517,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void *pReq = rpcMallocCont(contLen);
void * pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
......@@ -547,6 +547,8 @@ static int32_t dmInitServer(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit = {0};
strncpy(rpcInit.localFqdn, pDnode->data.localFqdn, strlen(pDnode->data.localFqdn));
rpcInit.localPort = pDnode->data.serverPort;
rpcInit.label = "DND";
rpcInit.numOfThreads = tsNumOfRpcThreads;
......
......@@ -258,6 +258,7 @@ typedef struct {
int32_t authVersion;
SHashObj* readDbs;
SHashObj* writeDbs;
SRWLatch lock;
} SUserObj;
typedef struct {
......
......@@ -26,7 +26,6 @@ int32_t mndInitSma(SMnode *pMnode);
void mndCleanupSma(SMnode *pMnode);
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName);
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma);
int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist);
#ifdef __cplusplus
}
......
......@@ -33,6 +33,7 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
#ifdef __cplusplus
}
......
......@@ -44,16 +44,17 @@ static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity);
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq);
static int32_t mndProcessGetIndexReq(SNodeMsg *pReq);
int32_t mndInitDb(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_DB,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndDbActionEncode,
.decodeFp = (SdbDecodeFp)mndDbActionDecode,
.insertFp = (SdbInsertFp)mndDbActionInsert,
.updateFp = (SdbUpdateFp)mndDbActionUpdate,
.deleteFp = (SdbDeleteFp)mndDbActionDelete};
SSdbTable table = {
.sdbType = SDB_DB,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndDbActionEncode,
.decodeFp = (SdbDecodeFp)mndDbActionDecode,
.insertFp = (SdbInsertFp)mndDbActionInsert,
.updateFp = (SdbUpdateFp)mndDbActionUpdate,
.deleteFp = (SdbDeleteFp)mndDbActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DB, mndProcessAlterDbReq);
......@@ -61,7 +62,6 @@ int32_t mndInitDb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_USE_DB, mndProcessUseDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetIndexReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DB, mndCancelGetNextDb);
......@@ -194,6 +194,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
}
SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pDb->lock);
terrno = 0;
......@@ -222,17 +223,29 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
mTrace("db:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
taosWLockLatch(&pOld->lock);
SArray *pOldRetensions = pOld->cfg.pRetensions;
pOld->updateTime = pNew->updateTime;
pOld->cfgVersion = pNew->cfgVersion;
pOld->vgVersion = pNew->vgVersion;
memcpy(&pOld->cfg, &pNew->cfg, sizeof(SDbCfg));
pNew->cfg.pRetensions = pOldRetensions;
pOld->cfg.buffer = pNew->cfg.buffer;
pOld->cfg.pages = pNew->cfg.pages;
pOld->cfg.pageSize = pNew->cfg.pageSize;
pOld->cfg.daysPerFile = pNew->cfg.daysPerFile;
pOld->cfg.daysToKeep0 = pNew->cfg.daysToKeep0;
pOld->cfg.daysToKeep1 = pNew->cfg.daysToKeep1;
pOld->cfg.daysToKeep2 = pNew->cfg.daysToKeep2;
pOld->cfg.fsyncPeriod = pNew->cfg.fsyncPeriod;
pOld->cfg.walLevel = pNew->cfg.walLevel;
pOld->cfg.strict = pNew->cfg.strict;
pOld->cfg.cacheLastRow = pNew->cfg.cacheLastRow;
pOld->cfg.replications = pNew->cfg.replications;
taosWUnLockLatch(&pOld->lock);
return 0;
}
static int32_t mndGetGlobalVgroupVersion(SMnode *pMnode) { return sdbGetTableVer(pMnode->pSdb, SDB_VGROUP); }
static inline int32_t mndGetGlobalVgroupVersion(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
return sdbGetTableVer(pSdb, SDB_VGROUP);
}
SDbObj *mndAcquireDb(SMnode *pMnode, const char *db) {
SSdb *pSdb = pMnode->pSdb;
......@@ -638,69 +651,7 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
return 0;
}
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
SAlterVnodeReq alterReq = {0};
alterReq.vgVersion = pVgroup->version;
alterReq.buffer = pDb->cfg.buffer;
alterReq.pages = pDb->cfg.pages;
alterReq.pageSize = pDb->cfg.pageSize;
alterReq.daysPerFile = pDb->cfg.daysPerFile;
alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
alterReq.fsyncPeriod = pDb->cfg.fsyncPeriod;
alterReq.walLevel = pDb->cfg.walLevel;
alterReq.strict = pDb->cfg.strict;
alterReq.cacheLastRow = pDb->cfg.cacheLastRow;
alterReq.replica = pVgroup->replica;
alterReq.selfIndex = -1;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &alterReq.replicas[v];
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pVgidDnode == NULL) {
return NULL;
}
pReplica->id = pVgidDnode->id;
pReplica->port = pVgidDnode->port;
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
mndReleaseDnode(pMnode, pVgidDnode);
if (pDnode->id == pVgid->dnodeId) {
alterReq.selfIndex = v;
}
}
if (alterReq.selfIndex == -1) {
terrno = TSDB_CODE_MND_APP_ERROR;
return NULL;
}
int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
contLen += +sizeof(SMsgHead);
void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SMsgHead *pHead = pReq;
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSAlterVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq);
*pContLen = contLen;
return pReq;
}
static int32_t mndBuilAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
STransAction action = {0};
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
......@@ -736,7 +687,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
if (pIter == NULL) break;
if (pVgroup->dbUid == pNew->uid) {
if (mndBuilAlterVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
......@@ -752,19 +703,19 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static int32_t mndAlterDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, &pReq->rpcMsg);
if (pTrans == NULL) goto UPDATE_DB_OVER;
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name);
mndTransSetDbInfo(pTrans, pOld);
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto UPDATE_DB_OVER;
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
UPDATE_DB_OVER:
_OVER:
mndTransDrop(pTrans);
return code;
}
......@@ -778,7 +729,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
if (tDeserializeSAlterDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto ALTER_DB_OVER;
goto _OVER;
}
mDebug("db:%s, start to alter", alterReq.db);
......@@ -786,24 +737,26 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
pDb = mndAcquireDb(pMnode, alterReq.db);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto ALTER_DB_OVER;
goto _OVER;
}
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
goto ALTER_DB_OVER;
goto _OVER;
}
if (mndCheckAlterDropCompactDbAuth(pUser, pDb) != 0) {
goto ALTER_DB_OVER;
goto _OVER;
}
SDbObj dbObj = {0};
memcpy(&dbObj, pDb, sizeof(SDbObj));
dbObj.cfg.numOfRetensions = 0;
dbObj.cfg.pRetensions = NULL;
code = mndSetDbCfgFromAlterDbReq(&dbObj, &alterReq);
if (code != 0) {
goto ALTER_DB_OVER;
goto _OVER;
}
dbObj.cfgVersion++;
......@@ -811,7 +764,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
code = mndAlterDb(pMnode, pReq, pDb, &dbObj);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
ALTER_DB_OVER:
_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to alter since %s", alterReq.db, terrstr());
}
......@@ -831,13 +784,13 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
if (tDeserializeSDbCfgReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &cfgReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto GET_DB_CFG_OVER;
goto _OVER;
}
pDb = mndAcquireDb(pMnode, cfgReq.db);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto GET_DB_CFG_OVER;
goto _OVER;
}
cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups;
......@@ -866,7 +819,7 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto GET_DB_CFG_OVER;
goto _OVER;
}
tSerializeSDbCfgRsp(pRsp, contLen, &cfgRsp);
......@@ -876,9 +829,9 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
code = 0;
GET_DB_CFG_OVER:
_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0) {
mError("db:%s, failed to get cfg since %s", cfgReq.db, terrstr());
}
......@@ -1097,7 +1050,8 @@ _OVER:
return code;
}
void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) {
static int32_t mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode) {
int32_t numOfTables = 0;
int32_t vindex = 0;
SSdb *pSdb = pMnode->pSdb;
......@@ -1108,8 +1062,7 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) {
if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid) {
*num += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT;
numOfTables += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT;
vindex++;
}
......@@ -1117,6 +1070,7 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) {
}
sdbCancelFetch(pSdb, pIter);
return numOfTables;
}
static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
......@@ -1170,8 +1124,7 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs
return -1;
}
int32_t numOfTable = 0;
mndGetDBTableNum(pDb, pMnode, &numOfTable);
int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
if (pReq == NULL || pReq->vgVersion < pDb->vgVersion || pReq->dbId != pDb->uid || numOfTable != pReq->numOfTable) {
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->pVgroupInfos);
......@@ -1195,7 +1148,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
if (tDeserializeSUseDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &usedbReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto USE_DB_OVER;
goto _OVER;
}
char *p = strchr(usedbReq.db, '.');
......@@ -1206,12 +1159,11 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo));
if (usedbRsp.pVgroupInfos == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto USE_DB_OVER;
goto _OVER;
}
mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos);
usedbRsp.vgVersion = vgVersion++;
} else {
usedbRsp.vgVersion = usedbReq.vgVersion;
}
......@@ -1232,15 +1184,15 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
} else {
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
goto USE_DB_OVER;
goto _OVER;
}
if (mndCheckUseDbAuth(pUser, pDb) != 0) {
goto USE_DB_OVER;
goto _OVER;
}
if (mndExtractDbInfo(pMnode, pDb, &usedbRsp, &usedbReq) < 0) {
goto USE_DB_OVER;
goto _OVER;
}
code = 0;
......@@ -1252,7 +1204,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto USE_DB_OVER;
goto _OVER;
}
tSerializeSUseDbRsp(pRsp, contLen, &usedbRsp);
......@@ -1260,7 +1212,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
pReq->pRsp = pRsp;
pReq->rspLen = contLen;
USE_DB_OVER:
_OVER:
if (code != 0) {
mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr());
}
......@@ -1298,8 +1250,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
continue;
}
int32_t numOfTable = 0;
mndGetDBTableNum(pDb, pMnode, &numOfTable);
int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) {
mDebug("db:%s, version & numOfTable not changed", pDbVgVersion->dbFName);
......@@ -1514,9 +1465,6 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
colDataAppend(pColInfo, rows, (const char *)b, false);
}
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
// *(int8_t *)pWrite = pDb->cfg.update;
}
static void setInformationSchemaDbCfg(SDbObj *pDbObj) {
......@@ -1544,7 +1492,6 @@ static void setPerfSchemaDbCfg(SDbObj *pDbObj) {
static bool mndGetTablesOfDbFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SVgObj *pVgroup = pObj;
int32_t *numOfTables = p1;
*numOfTables += pVgroup->numOfTables;
return true;
}
......@@ -1594,49 +1541,3 @@ static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
static int32_t mndProcessGetIndexReq(SNodeMsg *pReq) {
SUserIndexReq indexReq = {0};
SMnode *pMnode = pReq->pNode;
int32_t code = -1;
SUserIndexRsp rsp = {0};
bool exist = false;
if (tDeserializeSUserIndexReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &indexReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
code = mndProcessGetSmaReq(pMnode, &indexReq, &rsp, &exist);
if (code) {
goto _OVER;
}
if (!exist) {
// TODO GET INDEX FROM FULLTEXT
code = -1;
terrno = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
} else {
int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER;
}
tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
pReq->pRsp = pRsp;
pReq->rspLen = contLen;
code = 0;
}
_OVER:
if (code != 0) {
mError("failed to get index %s since %s", indexReq.indexFName, terrstr());
}
return code;
}
......@@ -40,6 +40,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq);
static int32_t mndProcessMDropSmaReq(SNodeMsg *pReq);
static int32_t mndProcessVCreateSmaRsp(SNodeMsg *pRsp);
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp);
static int32_t mndProcessGetSmaReq(SNodeMsg *pReq);
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
......@@ -56,6 +57,7 @@ int32_t mndInitSma(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessMDropSmaReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndProcessVCreateSmaRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndProcessVDropSmaRsp);
mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma);
......@@ -686,7 +688,7 @@ _OVER:
return code;
}
int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
int32_t code = -1;
SSmaObj *pSma = NULL;
......@@ -715,6 +717,51 @@ int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexR
}
mndReleaseSma(pMnode, pSma);
return code;
}
static int32_t mndProcessGetSmaReq(SNodeMsg *pReq) {
SUserIndexReq indexReq = {0};
SMnode *pMnode = pReq->pNode;
int32_t code = -1;
SUserIndexRsp rsp = {0};
bool exist = false;
if (tDeserializeSUserIndexReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &indexReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
code = mndGetSma(pMnode, &indexReq, &rsp, &exist);
if (code) {
goto _OVER;
}
if (!exist) {
// TODO GET INDEX FROM FULLTEXT
code = -1;
terrno = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
} else {
int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER;
}
tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
pReq->pRsp = pRsp;
pReq->rspLen = contLen;
code = 0;
}
_OVER:
if (code != 0) {
mError("failed to get index %s since %s", indexReq.indexFName, terrstr());
}
return code;
}
......
......@@ -1050,7 +1050,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions);
if (code != 0) {
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("failed to execute redoActions since %s", terrstr());
}
return code;
......@@ -1058,7 +1058,7 @@ static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions);
if (code != 0) {
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("failed to execute undoActions since %s", terrstr());
}
return code;
......
......@@ -186,6 +186,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
}
SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pUser->lock);
terrno = 0;
......@@ -228,11 +229,12 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
mTrace("user:%s, perform update action, old row:%p new row:%p", pOld->user, pOld, pNew);
memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN);
taosWLockLatch(&pOld->lock);
pOld->updateTime = pNew->updateTime;
memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN);
TSWAP(pOld->readDbs, pNew->readDbs);
TSWAP(pOld->writeDbs, pNew->writeDbs);
taosWUnLockLatch(&pOld->lock);
return 0;
}
......@@ -426,8 +428,12 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
}
memcpy(&newUser, pUser, sizeof(SUserObj));
taosRLockLatch(&pUser->lock);
newUser.readDbs = mndDupDbHash(pUser->readDbs);
newUser.writeDbs = mndDupDbHash(pUser->writeDbs);
taosRUnLockLatch(&pUser->lock);
if (newUser.readDbs == NULL || newUser.writeDbs == NULL) {
goto _OVER;
}
......@@ -586,8 +592,10 @@ static int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUser
memcpy(pRsp->user, pUser->user, TSDB_USER_LEN);
pRsp->superAuth = pUser->superUser;
pRsp->version = pUser->authVersion;
taosRLockLatch(&pUser->lock);
pRsp->readDbs = mndDupDbHash(pUser->readDbs);
pRsp->writeDbs = mndDupDbHash(pUser->writeDbs);
taosRUnLockLatch(&pUser->lock);
pRsp->createdDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == pRsp->createdDbs) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -21,8 +21,8 @@
#include "mndShow.h"
#include "mndTrans.h"
#define TSDB_VGROUP_VER_NUMBER 1
#define TSDB_VGROUP_RESERVE_SIZE 64
#define VGROUP_VER_NUMBER 1
#define VGROUP_RESERVE_SIZE 64
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
......@@ -34,19 +34,21 @@ static int32_t mndProcessAlterVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessDropVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessCompactVnodeRsp(SNodeMsg *pRsp);
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
int32_t mndInitVgroup(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_VGROUP,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
.decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
.insertFp = (SdbInsertFp)mndVgroupActionInsert,
.updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
.deleteFp = (SdbDeleteFp)mndVgroupActionDelete};
SSdbTable table = {
.sdbType = SDB_VGROUP,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
.decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
.insertFp = (SdbInsertFp)mndVgroupActionInsert,
.updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
.deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_VNODE_RSP, mndProcessAlterVnodeRsp);
......@@ -66,29 +68,29 @@ void mndCleanupVgroup(SMnode *pMnode) {}
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUMBER, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE);
if (pRaw == NULL) goto VG_ENCODE_OVER;
SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, VG_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, VG_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, VG_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pVgroup->version, VG_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, VG_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, VG_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, VG_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, VG_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
for (int8_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, VG_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
}
SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, VG_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
terrno = 0;
VG_ENCODE_OVER:
_OVER:
if (terrno != 0) {
mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
sdbFreeRaw(pRaw);
......@@ -103,41 +105,41 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto VG_DECODE_OVER;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TSDB_VGROUP_VER_NUMBER) {
if (sver != VGROUP_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto VG_DECODE_OVER;
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SVgObj));
if (pRow == NULL) goto VG_DECODE_OVER;
if (pRow == NULL) goto _OVER;
SVgObj *pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) goto VG_DECODE_OVER;
if (pVgroup == NULL) goto _OVER;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, VG_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, VG_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, VG_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, VG_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, VG_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, VG_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, VG_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, VG_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
for (int8_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, VG_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
if (pVgroup->replica == 1) {
pVgid->role = TAOS_SYNC_STATE_LEADER;
}
}
SDB_GET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
terrno = 0;
VG_DECODE_OVER:
_OVER:
if (terrno != 0) {
mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
taosMemoryFreeClear(pRow);
......@@ -254,6 +256,68 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
return pReq;
}
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
SAlterVnodeReq alterReq = {0};
alterReq.vgVersion = pVgroup->version;
alterReq.buffer = pDb->cfg.buffer;
alterReq.pages = pDb->cfg.pages;
alterReq.pageSize = pDb->cfg.pageSize;
alterReq.daysPerFile = pDb->cfg.daysPerFile;
alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
alterReq.fsyncPeriod = pDb->cfg.fsyncPeriod;
alterReq.walLevel = pDb->cfg.walLevel;
alterReq.strict = pDb->cfg.strict;
alterReq.cacheLastRow = pDb->cfg.cacheLastRow;
alterReq.replica = pVgroup->replica;
alterReq.selfIndex = -1;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &alterReq.replicas[v];
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pVgidDnode == NULL) {
return NULL;
}
pReplica->id = pVgidDnode->id;
pReplica->port = pVgidDnode->port;
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
mndReleaseDnode(pMnode, pVgidDnode);
if (pDnode->id == pVgid->dnodeId) {
alterReq.selfIndex = v;
}
}
if (alterReq.selfIndex == -1) {
terrno = TSDB_CODE_MND_APP_ERROR;
return NULL;
}
int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
contLen += +sizeof(SMsgHead);
void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SMsgHead *pHead = pReq;
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSAlterVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq);
*pContLen = contLen;
return pReq;
}
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
SDropVnodeReq dropReq = {0};
dropReq.dnodeId = pDnode->id;
......@@ -372,12 +436,12 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
if (pVgroups == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto ALLOC_VGROUP_OVER;
goto _OVER;
}
pArray = mndBuildDnodesArray(pMnode);
if (pArray == NULL) {
goto ALLOC_VGROUP_OVER;
goto _OVER;
}
mDebug("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
......@@ -410,7 +474,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
if (mndGetAvailableDnode(pMnode, pVgroup, pArray) != 0) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
goto ALLOC_VGROUP_OVER;
goto _OVER;
}
allocedVgroups++;
......@@ -421,7 +485,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
mDebug("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
ALLOC_VGROUP_OVER:
_OVER:
if (code != 0) taosMemoryFree(pVgroups);
taosArrayDestroy(pArray);
return code;
......@@ -492,7 +556,7 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep
return 0;
}
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
......@@ -533,14 +597,13 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock*
// default 3 replica
for (int32_t i = 0; i < 3; ++i) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (i < pVgroup->replica) {
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].dnodeId, false);
char buf1[20] = {0};
const char *role = syncStr(pVgroup->vnodeGid[i].role);
STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)buf1, false);
......@@ -597,13 +660,12 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
return numOfVnodes;
}
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;
int32_t cols = 0;
// int32_t dnodeId = pShow->replica;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
......
......@@ -46,9 +46,21 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
}
uint32_t ip = 0;
if (pInit->connType == TAOS_CONN_SERVER) {
ip = taosGetIpv4FromFqdn(pInit->localFqdn);
if (ip == 0xFFFFFFFF) {
tError("invalid fqdn: %s", pInit->localFqdn);
terrno = TSDB_CODE_RPC_FQDN_ERROR;
taosMemoryFree(pRpc);
return NULL;
}
}
pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime;
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
pRpc->tcphandle =
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
if (pRpc->tcphandle == NULL) {
taosMemoryFree(pRpc);
return NULL;
......
......@@ -817,7 +817,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
uv_os_sock_t fds[2];
if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
goto End;
......@@ -841,6 +840,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto End;
}
}
if (false == taosValidIpAndPort(srv->ip, srv->port)) {
tError("failed to bind, reason: %s", terrstr());
goto End;
}
if (false == addHandleToAcceptloop(srv)) {
goto End;
}
......
......@@ -638,6 +638,48 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
return 0;
}
bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
struct sockaddr_in serverAdd;
SocketFd fd;
int32_t reuse;
// printf("open tcp server socket:0x%x:%hu", ip, port);
bzero((char *)&serverAdd, sizeof(serverAdd));
serverAdd.sin_family = AF_INET;
serverAdd.sin_addr.s_addr = ip;
serverAdd.sin_port = (uint16_t)htons(port);
if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
// printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck1(fd);
return false;
}
TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
if (pSocket == NULL) {
taosCloseSocketNoCheck1(fd);
return false;
}
pSocket->refId = 0;
pSocket->fd = fd;
/* set REUSEADDR option, so the portnumber can be re-used */
reuse = 1;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
// printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
/* bind socket to server address */
if (bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
// printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(&pSocket);
return false;
}
taosCloseSocket(&pSocket);
return true;
}
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in serverAdd;
SocketFd fd;
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import threading
import multiprocessing as mp
from numpy.lib.function_base import insert
import taos
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
import datetime as dt
import time
# constant define
WAITS = 5 # wait seconds
class TDTestCase:
#
# --------------- main frame -------------------
#
def caseDescription(self):
'''
limit and offset keyword function test cases;
case1: limit offset base function test
case2: offset return valid
'''
return
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
# init
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
# tdSql.prepare()
# self.create_tables();
self.ts = 1500000000000
# run case
def run(self):
# test base case
self.test_case1()
tdLog.debug(" LIMIT test_case1 ............ [OK]")
# test advance case
# self.test_case2()
# tdLog.debug(" LIMIT test_case2 ............ [OK]")
# stop
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
# --------------- case -------------------
# create tables
def create_tables(self,dbname,stbname,count):
tdSql.execute("use %s" %dbname)
tdSql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname)
pre_create = "create table"
sql = pre_create
tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
# print(time.time())
exeStartTime=time.time()
for i in range(count):
sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1)
if i >0 and i%3000 == 0:
tdSql.execute(sql)
sql = pre_create
# print(time.time())
# end sql
if sql != pre_create:
tdSql.execute(sql)
exeEndTime=time.time()
spendTime=exeEndTime-exeStartTime
speedCreate=count/spendTime
tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
return
def newcur(self,host,cfg):
user = "root"
password = "taosdata"
port =6030
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
cur=con.cursor()
print(cur)
return cur
def new_create_tables(self,dbname,vgroups,stbname,tcountStart,tcountStop):
host = "chenhaoran02"
buildPath = self.getBuildPath()
config = buildPath+ "../sim/dnode1/cfg/"
tsql=self.newcur(host,config)
tsql.execute("create database %s vgroups %d"%(dbname,vgroups))
tsql.execute("use %s" %dbname)
tsql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname)
pre_create = "create table"
sql = pre_create
tcountStop=int(tcountStop)
tcountStart=int(tcountStart)
count=tcountStop-tcountStart
tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
# print(time.time())
exeStartTime=time.time()
# print(type(tcountStop),type(tcountStart))
for i in range(tcountStart,tcountStop):
sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1)
if i >0 and i%20000 == 0:
# print(sql)
tsql.execute(sql)
sql = pre_create
# print(time.time())
# end sql
if sql != pre_create:
# print(sql)
tsql.execute(sql)
exeEndTime=time.time()
spendTime=exeEndTime-exeStartTime
speedCreate=count/spendTime
# tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
return
# insert data
def insert_data(self, dbname, stbname, ts_start, tcountStart,tcountStop,rowCount):
tdSql.execute("use %s" %dbname)
pre_insert = "insert into "
sql = pre_insert
tcount=tcountStop-tcountStart
allRows=tcount*rowCount
tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbname, allRows))
exeStartTime=time.time()
for i in range(tcountStart,tcountStop):
sql += " %s_%d values "%(stbname,i)
for j in range(rowCount):
sql += "(%d, %d, 'taos_%d') "%(ts_start + j*1000, j, j)
if j >0 and j%5000 == 0:
# print(sql)
tdSql.execute(sql)
sql = "insert into %s_%d values " %(stbname,i)
# end sql
if sql != pre_insert:
# print(sql)
tdSql.execute(sql)
exeEndTime=time.time()
spendTime=exeEndTime-exeStartTime
speedInsert=allRows/spendTime
# tdLog.debug("spent %.2fs to INSERT %d rows , insert rate is %.2f rows/s... [OK]"% (spendTime,allRows,speedInsert))
tdLog.debug("INSERT TABLE DATA ............ [OK]")
return
# test case1 base
def test_case1(self):
tdLog.debug("-----create database and tables test------- ")
tdSql.execute("drop database if exists db1")
tdSql.execute("drop database if exists db4")
tdSql.execute("drop database if exists db6")
tdSql.execute("drop database if exists db8")
tdSql.execute("drop database if exists db12")
tdSql.execute("drop database if exists db16")
#create database and tables;
# tdSql.execute("create database db11 vgroups 1")
# # self.create_tables("db1", "stb1", 30*10000)
# tdSql.execute("use db1")
# tdSql.execute("create stable stb1(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)")
# tdSql.execute("create database db12 vgroups 1")
# # self.create_tables("db1", "stb1", 30*10000)
# tdSql.execute("use db1")
# t1 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(1,))
# t2 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(2,))
# t1 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", 0,count/2,))
# t2 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", count/2,count,))
count=50000
vgroups=1
threads = []
threadNumbers=2
for i in range(threadNumbers):
threads.append(mp.Process(target=self.new_create_tables, args=("db1%d"%i, vgroups, "stb1", 0,count,)))
start_time = time.time()
for tr in threads:
tr.start()
for tr in threads:
tr.join()
end_time = time.time()
spendTime=end_time-start_time
speedCreate=count/spendTime
tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
# self.new_create_tables("db1", "stb1", 15*10000)
# self.new_create_tables("db1", "stb1", 15*10000)
# tdSql.execute("create database db4 vgroups 4")
# self.create_tables("db4", "stb4", 30*10000)
# tdSql.execute("create database db6 vgroups 6")
# self.create_tables("db6", "stb6", 30*10000)
# tdSql.execute("create database db8 vgroups 8")
# self.create_tables("db8", "stb8", 30*10000)
# tdSql.execute("create database db12 vgroups 12")
# self.create_tables("db12", "stb12", 30*10000)
# tdSql.execute("create database db16 vgroups 16")
# self.create_tables("db16", "stb16", 30*10000)
return
# test case2 base:insert data
def test_case2(self):
tdLog.debug("-----insert data test------- ")
# drop database
tdSql.execute("drop database if exists db1")
tdSql.execute("drop database if exists db4")
tdSql.execute("drop database if exists db6")
tdSql.execute("drop database if exists db8")
tdSql.execute("drop database if exists db12")
tdSql.execute("drop database if exists db16")
#create database and tables;
tdSql.execute("create database db1 vgroups 1")
self.create_tables("db1", "stb1", 1*100)
self.insert_data("db1", "stb1", self.ts, 1*50,1*10000)
tdSql.execute("create database db4 vgroups 4")
self.create_tables("db4", "stb4", 1*100)
self.insert_data("db4", "stb4", self.ts, 1*100,1*10000)
tdSql.execute("create database db6 vgroups 6")
self.create_tables("db6", "stb6", 1*100)
self.insert_data("db6", "stb6", self.ts, 1*100,1*10000)
tdSql.execute("create database db8 vgroups 8")
self.create_tables("db8", "stb8", 1*100)
self.insert_data("db8", "stb8", self.ts, 1*100,1*10000)
tdSql.execute("create database db12 vgroups 12")
self.create_tables("db12", "stb12", 1*100)
self.insert_data("db12", "stb12", self.ts, 1*100,1*10000)
tdSql.execute("create database db16 vgroups 16")
self.create_tables("db16", "stb16", 1*100)
self.insert_data("db16", "stb16", self.ts, 1*100,1*10000)
return
#
# add case with filename
#
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from numpy.lib.function_base import insert
import taos
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
# constant define
WAITS = 5 # wait seconds
class TDTestCase:
#
# --------------- main frame -------------------
#
# updatecfgDict = {'debugFlag': 135}
# updatecfgDict = {'fqdn': 135}
def caseDescription(self):
'''
limit and offset keyword function test cases;
case1: limit offset base function test
case2: offset return valid
'''
return
# init
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
tdSql.prepare()
self.create_tables();
self.ts = 1500000000000
# run case
def run(self):
# insert data
self.insert_data1("t1", self.ts, 1000*10000)
self.insert_data1("t4", self.ts, 1000*10000)
# test base case
# self.test_case1()
tdLog.debug(" LIMIT test_case1 ............ [OK]")
# test advance case
# self.test_case2()
tdLog.debug(" LIMIT test_case2 ............ [OK]")
# stop
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
#
# --------------- case -------------------
#
# create table
def create_tables(self):
# super table
tdSql.execute("create table st(ts timestamp, i1 int,i2 int) tags(area int)");
# child table
tdSql.execute("create table t1 using st tags(1)");
tdSql.execute("create table st1(ts timestamp, i1 int ,i2 int) tags(area int) sma(i2) ");
tdSql.execute("create table t4 using st1 tags(1)");
return
# insert data1
def insert_data(self, tbname, ts_start, count):
pre_insert = "insert into %s values"%tbname
sql = pre_insert
tdLog.debug("doing insert table %s rows=%d ..."%(tbname, count))
for i in range(count):
sql += " (%d,%d)"%(ts_start + i*1000, i )
if i >0 and i%30000 == 0:
tdSql.execute(sql)
sql = pre_insert
# end sql
if sql != pre_insert:
tdSql.execute(sql)
tdLog.debug("INSERT TABLE DATA ............ [OK]")
return
def insert_data1(self, tbname, ts_start, count):
pre_insert = "insert into %s values"%tbname
sql = pre_insert
tdLog.debug("doing insert table %s rows=%d ..."%(tbname, count))
for i in range(count):
sql += " (%d,%d,%d)"%(ts_start + i*1000, i , i+1)
if i >0 and i%30000 == 0:
tdSql.execute(sql)
sql = pre_insert
# end sql
if sql != pre_insert:
tdSql.execute(sql)
tdLog.debug("INSERT TABLE DATA ............ [OK]")
return
# test case1 base
# def test_case1(self):
# #
# # limit base function
# #
# # base no where
# sql = "select * from t1 limit 10"
# tdSql.waitedQuery(sql, 10, WAITS)
#
# add case with filename
#
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
......@@ -141,6 +141,9 @@ int32_t shellRunCommand(char *command) {
*p++ = '\\';
}
break;
default:
*p++ = '\\';
break;
}
*p++ = c;
esc = false;
......
......@@ -21,7 +21,7 @@ static void shellWorkAsClient() {
SRpcInit rpcInit = {0};
SEpSet epSet = {.inUse = 0, .numOfEps = 1};
SRpcMsg rpcRsp = {0};
void *clientRpc = NULL;
void * clientRpc = NULL;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)("_pwd"), strlen("_pwd"), pass);
......@@ -116,6 +116,7 @@ static void shellWorkAsServer() {
}
SRpcInit rpcInit = {0};
memcpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn));
rpcInit.localPort = pArgs->port;
rpcInit.label = "CHK";
rpcInit.numOfThreads = tsNumOfRpcThreads;
......@@ -126,7 +127,7 @@ static void shellWorkAsServer() {
void *serverRpc = rpcOpen(&rpcInit);
if (serverRpc == NULL) {
printf("failed to init net test server since %s", terrstr());
printf("failed to init net test server since %s\n", terrstr());
} else {
printf("network test server is initialized, port:%u\n", pArgs->port);
taosSetSignal(SIGTERM, shellNettestHandler);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册