提交 54584fd1 编写于 作者: S Shengliang Guan

seralize db msg

上级 e89dc532
...@@ -28,7 +28,6 @@ static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) ...@@ -28,7 +28,6 @@ static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp)
} }
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t msgLen = 0;
int32_t code = 0; int32_t code = 0;
SUseDbBatchRsp batchUseRsp = {0}; SUseDbBatchRsp batchUseRsp = {0};
...@@ -72,8 +71,6 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog ...@@ -72,8 +71,6 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
if (code) { if (code) {
return code; return code;
} }
msgLen += sizeof(SUseDbRsp) + rsp->vgNum * sizeof(SVgroupInfo);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -264,9 +264,13 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -264,9 +264,13 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code; return code;
} }
SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg->pData; SUseDbRsp usedbRsp = {0};
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
SName name = {0}; SName name = {0};
tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT|T_NAME_DB); tNameFromString(&name, usedbRsp.db, T_NAME_ACCT|T_NAME_DB);
tFreeSUsedbRsp(&usedbRsp);
char db[TSDB_DB_NAME_LEN] = {0}; char db[TSDB_DB_NAME_LEN] = {0};
tNameGetDbName(&name, db); tNameGetDbName(&name, db);
...@@ -300,14 +304,12 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -300,14 +304,12 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code; return code;
} }
SDropDbRsp *rsp = (SDropDbRsp *)pMsg->pData; SDropDbRsp dropdbRsp = {0};
tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp);
struct SCatalog *pCatalog = NULL;
rsp->uid = be64toh(rsp->uid);
struct SCatalog* pCatalog = NULL;
catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);
return code; return code;
......
...@@ -975,14 +975,20 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, ...@@ -975,14 +975,20 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
pDbVgVersion->dbId = htobe64(pDbVgVersion->dbId); pDbVgVersion->dbId = htobe64(pDbVgVersion->dbId);
pDbVgVersion->vgVersion = htonl(pDbVgVersion->vgVersion); pDbVgVersion->vgVersion = htonl(pDbVgVersion->vgVersion);
SUseDbRsp usedbRsp = {0};
SDbObj *pDb = mndAcquireDb(pMnode, pDbVgVersion->dbFName); SDbObj *pDb = mndAcquireDb(pMnode, pDbVgVersion->dbFName);
if (pDb == NULL || pDbVgVersion->vgVersion >= pDb->vgVersion) { if (pDb == NULL) {
mDebug("db:%s, no exist", pDbVgVersion->dbFName);
memcpy(usedbRsp.db, pDbVgVersion->dbFName, TSDB_DB_FNAME_LEN);
usedbRsp.uid = pDbVgVersion->dbId;
usedbRsp.vgVersion = -1;
taosArrayPush(batchUseRsp.pArray, &usedbRsp);
} else if (pDbVgVersion->vgVersion >= pDb->vgVersion) {
mDebug("db:%s, version not changed", pDbVgVersion->dbFName);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mDebug("db:%s, no need to use db", pDbVgVersion->dbFName);
continue; continue;
} } else {
SUseDbRsp usedbRsp = {0};
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
if (usedbRsp.pVgroupInfos == NULL) { if (usedbRsp.pVgroupInfos == NULL) {
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
...@@ -1000,6 +1006,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, ...@@ -1000,6 +1006,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
taosArrayPush(batchUseRsp.pArray, &usedbRsp); taosArrayPush(batchUseRsp.pArray, &usedbRsp);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
} }
}
int32_t rspLen = tSerializeSUseDbBatchRsp(NULL, 0, &batchUseRsp); int32_t rspLen = tSerializeSUseDbBatchRsp(NULL, 0, &batchUseRsp);
void *pRsp = malloc(rspLen); void *pRsp = malloc(rspLen);
......
...@@ -69,10 +69,12 @@ void* MndTestStb::BuildCreateDbReq(const char* dbname, int32_t* pContLen) { ...@@ -69,10 +69,12 @@ void* MndTestStb::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
} }
void* MndTestStb::BuildDropDbReq(const char* dbname, int32_t* pContLen) { void* MndTestStb::BuildDropDbReq(const char* dbname, int32_t* pContLen) {
int32_t contLen = sizeof(SDropDbReq); SDropDbReq dropdbReq = {0};
strcpy(dropdbReq.db, dbname);
SDropDbReq* pReq = (SDropDbReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDropDbReq(NULL, 0, &dropdbReq);
strcpy(pReq->db, dbname); void* pReq = rpcMallocCont(contLen);
tSerializeSDropDbReq(pReq, contLen, &dropdbReq);
*pContLen = contLen; *pContLen = contLen;
return pReq; return pReq;
......
...@@ -9,7 +9,7 @@ char* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, ...@@ -9,7 +9,7 @@ char* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id,
char* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); char* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
char* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); char* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext* pParseCtx, SMsgBuf* pMsgBuf); SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx, SMsgBuf* pMsgBuf); char* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, int32_t *len, SParseContext *pCtx, SMsgBuf* pMsgBuf);
char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf); char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
char* buildDropStableReq(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf); char* buildDropStableReq(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
char* buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); char* buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
......
...@@ -39,14 +39,11 @@ char* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, ...@@ -39,14 +39,11 @@ char* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id,
SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt; SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt;
createReq.maxUsers = htonl(pAcctOpt->maxUsers); createReq.maxUsers = pAcctOpt->maxUsers;
createReq.maxDbs = htonl(pAcctOpt->maxDbs); createReq.maxDbs = pAcctOpt->maxDbs;
createReq.maxTimeSeries = htonl(pAcctOpt->maxTimeSeries); createReq.maxTimeSeries = pAcctOpt->maxTimeSeries;
createReq.maxStreams = htonl(pAcctOpt->maxStreams); createReq.maxStreams = pAcctOpt->maxStreams;
// createReq.maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond); createReq.maxStorage = pAcctOpt->maxStorage;
createReq.maxStorage = htobe64(pAcctOpt->maxStorage);
// createReq.maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
// createReq.maxConnections = htonl(pAcctOpt->maxConnections);
if (pAcctOpt->stat.n == 0) { if (pAcctOpt->stat.n == 0) {
createReq.accessState = -1; createReq.accessState = -1;
...@@ -147,9 +144,9 @@ static int32_t setKeepOption(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb, ...@@ -147,9 +144,9 @@ static int32_t setKeepOption(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb,
const char* msg2 = "invalid keep value"; const char* msg2 = "invalid keep value";
const char* msg3 = "invalid keep value, should be keep0 <= keep1 <= keep2"; const char* msg3 = "invalid keep value, should be keep0 <= keep1 <= keep2";
pMsg->daysToKeep0 = htonl(-1); pMsg->daysToKeep0 = -1;
pMsg->daysToKeep1 = htonl(-1); pMsg->daysToKeep1 = -1;
pMsg->daysToKeep2 = htonl(-1); pMsg->daysToKeep2 = -1;
SArray* pKeep = pCreateDb->keep; SArray* pKeep = pCreateDb->keep;
if (pKeep != NULL) { if (pKeep != NULL) {
...@@ -209,13 +206,13 @@ static int32_t setTimePrecision(SCreateDbReq* pMsg, const SCreateDbInfo* pCreate ...@@ -209,13 +206,13 @@ static int32_t setTimePrecision(SCreateDbReq* pMsg, const SCreateDbInfo* pCreate
} }
static void doSetDbOptions(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb) { static void doSetDbOptions(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb) {
pMsg->cacheBlockSize = htonl(pCreateDb->cacheBlockSize); pMsg->cacheBlockSize = pCreateDb->cacheBlockSize;
pMsg->totalBlocks = htonl(pCreateDb->numOfBlocks); pMsg->totalBlocks = pCreateDb->numOfBlocks;
pMsg->daysPerFile = htonl(pCreateDb->daysPerFile); pMsg->daysPerFile = pCreateDb->daysPerFile;
pMsg->commitTime = htonl((int32_t)pCreateDb->commitTime); pMsg->commitTime = (int32_t)pCreateDb->commitTime;
pMsg->minRows = htonl(pCreateDb->minRowsPerBlock); pMsg->minRows = pCreateDb->minRowsPerBlock;
pMsg->maxRows = htonl(pCreateDb->maxRowsPerBlock); pMsg->maxRows = pCreateDb->maxRowsPerBlock;
pMsg->fsyncPeriod = htonl(pCreateDb->fsyncPeriod); pMsg->fsyncPeriod = pCreateDb->fsyncPeriod;
pMsg->compression = (int8_t)pCreateDb->compressionLevel; pMsg->compression = (int8_t)pCreateDb->compressionLevel;
pMsg->walLevel = (char)pCreateDb->walLevel; pMsg->walLevel = (char)pCreateDb->walLevel;
pMsg->replications = pCreateDb->replica; pMsg->replications = pCreateDb->replica;
...@@ -223,7 +220,7 @@ static void doSetDbOptions(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb) { ...@@ -223,7 +220,7 @@ static void doSetDbOptions(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb) {
pMsg->ignoreExist = pCreateDb->ignoreExists; pMsg->ignoreExist = pCreateDb->ignoreExists;
pMsg->update = pCreateDb->update; pMsg->update = pCreateDb->update;
pMsg->cacheLastRow = pCreateDb->cachelast; pMsg->cacheLastRow = pCreateDb->cachelast;
pMsg->numOfVgroups = htonl(pCreateDb->numOfVgroups); pMsg->numOfVgroups = pCreateDb->numOfVgroups;
} }
int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) { int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) {
...@@ -240,12 +237,104 @@ int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbS ...@@ -240,12 +237,104 @@ int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbS
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext* pCtx, SMsgBuf* pMsgBuf) { // can only perform the parameters based on the macro definitation
SCreateDbReq* pCreateMsg = calloc(1, sizeof(SCreateDbReq)); static int32_t doCheckDbOptions(SCreateDbReq* pCreate, SMsgBuf* pMsgBuf) {
if (setDbOptions(pCreateMsg, pCreateDbInfo, pMsgBuf) != TSDB_CODE_SUCCESS) { char msg[512] = {0};
tfree(pCreateMsg);
terrno = TSDB_CODE_TSC_INVALID_OPERATION; if (pCreate->walLevel != -1 && (pCreate->walLevel < TSDB_MIN_WAL_LEVEL || pCreate->walLevel > TSDB_MAX_WAL_LEVEL)) {
snprintf(msg, tListLen(msg), "invalid db option walLevel: %d, only 1-2 allowed", pCreate->walLevel);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
if (pCreate->replications != -1 &&
(pCreate->replications < TSDB_MIN_DB_REPLICA_OPTION || pCreate->replications > TSDB_MAX_DB_REPLICA_OPTION)) {
snprintf(msg, tListLen(msg), "invalid db option replications: %d valid range: [%d, %d]", pCreate->replications,
TSDB_MIN_DB_REPLICA_OPTION, TSDB_MAX_DB_REPLICA_OPTION);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
int32_t blocks = pCreate->totalBlocks;
if (blocks != -1 && (blocks < TSDB_MIN_TOTAL_BLOCKS || blocks > TSDB_MAX_TOTAL_BLOCKS)) {
snprintf(msg, tListLen(msg), "invalid db option totalBlocks: %d valid range: [%d, %d]", blocks,
TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
if (pCreate->quorum != -1 &&
(pCreate->quorum < TSDB_MIN_DB_QUORUM_OPTION || pCreate->quorum > TSDB_MAX_DB_QUORUM_OPTION)) {
snprintf(msg, tListLen(msg), "invalid db option quorum: %d valid range: [%d, %d]", pCreate->quorum,
TSDB_MIN_DB_QUORUM_OPTION, TSDB_MAX_DB_QUORUM_OPTION);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
int32_t val = pCreate->daysPerFile;
if (val != -1 && (val < TSDB_MIN_DAYS_PER_FILE || val > TSDB_MAX_DAYS_PER_FILE)) {
snprintf(msg, tListLen(msg), "invalid db option daysPerFile: %d valid range: [%d, %d]", val, TSDB_MIN_DAYS_PER_FILE,
TSDB_MAX_DAYS_PER_FILE);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
val = pCreate->cacheBlockSize;
if (val != -1 && (val < TSDB_MIN_CACHE_BLOCK_SIZE || val > TSDB_MAX_CACHE_BLOCK_SIZE)) {
snprintf(msg, tListLen(msg), "invalid db option cacheBlockSize: %d valid range: [%d, %d]", val,
TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MAX_CACHE_BLOCK_SIZE);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
if (pCreate->precision != TSDB_TIME_PRECISION_MILLI && pCreate->precision != TSDB_TIME_PRECISION_MICRO &&
pCreate->precision != TSDB_TIME_PRECISION_NANO) {
snprintf(msg, tListLen(msg), "invalid db option timePrecision: %d valid value: [%d, %d, %d]", pCreate->precision,
TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
val = pCreate->commitTime;
if (val != -1 && (val < TSDB_MIN_COMMIT_TIME || val > TSDB_MAX_COMMIT_TIME)) {
snprintf(msg, tListLen(msg), "invalid db option commitTime: %d valid range: [%d, %d]", val, TSDB_MIN_COMMIT_TIME,
TSDB_MAX_COMMIT_TIME);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
val = pCreate->fsyncPeriod;
if (val != -1 && (val < TSDB_MIN_FSYNC_PERIOD || val > TSDB_MAX_FSYNC_PERIOD)) {
snprintf(msg, tListLen(msg), "invalid db option fsyncPeriod: %d valid range: [%d, %d]", val, TSDB_MIN_FSYNC_PERIOD,
TSDB_MAX_FSYNC_PERIOD);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
if (pCreate->compression != -1 &&
(pCreate->compression < TSDB_MIN_COMP_LEVEL || pCreate->compression > TSDB_MAX_COMP_LEVEL)) {
snprintf(msg, tListLen(msg), "invalid db option compression: %d valid range: [%d, %d]", pCreate->compression,
TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
val = pCreate->numOfVgroups;
if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) {
snprintf(msg, tListLen(msg), "invalid number of vgroups for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
}
val = pCreate->maxRows;
if (val < TSDB_MIN_MAX_ROW_FBLOCK || val > TSDB_MAX_MAX_ROW_FBLOCK) {
snprintf(msg, tListLen(msg), "invalid number of max rows in file block for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK);
}
val = pCreate->minRows;
if (val < TSDB_MIN_MIN_ROW_FBLOCK || val > TSDB_MAX_MIN_ROW_FBLOCK) {
snprintf(msg, tListLen(msg), "invalid number of min rows in file block for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
}
return TSDB_CODE_SUCCESS;
}
char* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, int32_t* len, SParseContext* pCtx, SMsgBuf* pMsgBuf) {
SCreateDbReq createReq = {0};
if (setDbOptions(&createReq, pCreateDbInfo, pMsgBuf) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
return NULL; return NULL;
} }
...@@ -256,8 +345,23 @@ SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext* pCtx ...@@ -256,8 +345,23 @@ SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext* pCtx
return NULL; return NULL;
} }
tNameGetFullDbName(&name, pCreateMsg->db); tNameGetFullDbName(&name, createReq.db);
return pCreateMsg;
if (doCheckDbOptions(&createReq, pMsgBuf) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
return NULL;
}
int32_t tlen = tSerializeSCreateDbReq(NULL, 0, &createReq);
void* pReq = malloc(tlen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSCreateDbReq(pReq, tlen, &createReq);
*len = tlen;
return pReq;
} }
char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) { char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
......
...@@ -125,99 +125,6 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out ...@@ -125,99 +125,6 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// can only perform the parameters based on the macro definitation
static int32_t doCheckDbOptions(SCreateDbReq* pCreate, SMsgBuf* pMsgBuf) {
char msg[512] = {0};
if (pCreate->walLevel != -1 && (pCreate->walLevel < TSDB_MIN_WAL_LEVEL || pCreate->walLevel > TSDB_MAX_WAL_LEVEL)) {
snprintf(msg, tListLen(msg), "invalid db option walLevel: %d, only 1-2 allowed", pCreate->walLevel);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
if (pCreate->replications != -1 &&
(pCreate->replications < TSDB_MIN_DB_REPLICA_OPTION || pCreate->replications > TSDB_MAX_DB_REPLICA_OPTION)) {
snprintf(msg, tListLen(msg), "invalid db option replications: %d valid range: [%d, %d]", pCreate->replications,
TSDB_MIN_DB_REPLICA_OPTION, TSDB_MAX_DB_REPLICA_OPTION);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
int32_t blocks = ntohl(pCreate->totalBlocks);
if (blocks != -1 && (blocks < TSDB_MIN_TOTAL_BLOCKS || blocks > TSDB_MAX_TOTAL_BLOCKS)) {
snprintf(msg, tListLen(msg), "invalid db option totalBlocks: %d valid range: [%d, %d]", blocks,
TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
if (pCreate->quorum != -1 &&
(pCreate->quorum < TSDB_MIN_DB_QUORUM_OPTION || pCreate->quorum > TSDB_MAX_DB_QUORUM_OPTION)) {
snprintf(msg, tListLen(msg), "invalid db option quorum: %d valid range: [%d, %d]", pCreate->quorum,
TSDB_MIN_DB_QUORUM_OPTION, TSDB_MAX_DB_QUORUM_OPTION);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
int32_t val = htonl(pCreate->daysPerFile);
if (val != -1 && (val < TSDB_MIN_DAYS_PER_FILE || val > TSDB_MAX_DAYS_PER_FILE)) {
snprintf(msg, tListLen(msg), "invalid db option daysPerFile: %d valid range: [%d, %d]", val, TSDB_MIN_DAYS_PER_FILE,
TSDB_MAX_DAYS_PER_FILE);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
val = htonl(pCreate->cacheBlockSize);
if (val != -1 && (val < TSDB_MIN_CACHE_BLOCK_SIZE || val > TSDB_MAX_CACHE_BLOCK_SIZE)) {
snprintf(msg, tListLen(msg), "invalid db option cacheBlockSize: %d valid range: [%d, %d]", val,
TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MAX_CACHE_BLOCK_SIZE);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
if (pCreate->precision != TSDB_TIME_PRECISION_MILLI && pCreate->precision != TSDB_TIME_PRECISION_MICRO &&
pCreate->precision != TSDB_TIME_PRECISION_NANO) {
snprintf(msg, tListLen(msg), "invalid db option timePrecision: %d valid value: [%d, %d, %d]", pCreate->precision,
TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
val = htonl(pCreate->commitTime);
if (val != -1 && (val < TSDB_MIN_COMMIT_TIME || val > TSDB_MAX_COMMIT_TIME)) {
snprintf(msg, tListLen(msg), "invalid db option commitTime: %d valid range: [%d, %d]", val, TSDB_MIN_COMMIT_TIME,
TSDB_MAX_COMMIT_TIME);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
val = htonl(pCreate->fsyncPeriod);
if (val != -1 && (val < TSDB_MIN_FSYNC_PERIOD || val > TSDB_MAX_FSYNC_PERIOD)) {
snprintf(msg, tListLen(msg), "invalid db option fsyncPeriod: %d valid range: [%d, %d]", val, TSDB_MIN_FSYNC_PERIOD,
TSDB_MAX_FSYNC_PERIOD);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
if (pCreate->compression != -1 &&
(pCreate->compression < TSDB_MIN_COMP_LEVEL || pCreate->compression > TSDB_MAX_COMP_LEVEL)) {
snprintf(msg, tListLen(msg), "invalid db option compression: %d valid range: [%d, %d]", pCreate->compression,
TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL);
return buildInvalidOperationMsg(pMsgBuf, msg);
}
val = htonl(pCreate->numOfVgroups);
if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) {
snprintf(msg, tListLen(msg), "invalid number of vgroups for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
}
val = htonl(pCreate->maxRows);
if (val < TSDB_MIN_MAX_ROW_FBLOCK || val > TSDB_MAX_MAX_ROW_FBLOCK) {
snprintf(msg, tListLen(msg), "invalid number of max rows in file block for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK);
}
val = htonl(pCreate->minRows);
if (val < TSDB_MIN_MIN_ROW_FBLOCK || val > TSDB_MAX_MIN_ROW_FBLOCK) {
snprintf(msg, tListLen(msg), "invalid number of min rows in file block for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
}
return TSDB_CODE_SUCCESS;
}
static int32_t validateTableColumns(SArray* pFieldList, int32_t maxRowLength, int32_t maxColumns, SMsgBuf* pMsgBuf) { static int32_t validateTableColumns(SArray* pFieldList, int32_t maxRowLength, int32_t maxColumns, SMsgBuf* pMsgBuf) {
const char* msg2 = "row length exceeds max length"; const char* msg2 = "row length exceeds max length";
const char* msg3 = "duplicated column names"; const char* msg3 = "duplicated column names";
...@@ -852,11 +759,15 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch ...@@ -852,11 +759,15 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
goto _error; goto _error;
} }
SUseDbReq* pUseDbMsg = (SUseDbReq*)calloc(1, sizeof(SUseDbReq)); SUseDbReq usedbReq = {0};
tNameExtractFullName(&n, pUseDbMsg->db); tNameExtractFullName(&n, usedbReq.db);
pDcl->pMsg = (char*)pUseDbMsg; int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
pDcl->msgLen = sizeof(SUseDbReq); void* pBuf = malloc(bufLen);
tSerializeSUseDbReq(pBuf, bufLen, &usedbReq);
pDcl->pMsg = pBuf;
pDcl->msgLen = bufLen;
pDcl->msgType = TDMT_MND_USE_DB; pDcl->msgType = TDMT_MND_USE_DB;
break; break;
} }
...@@ -880,14 +791,11 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch ...@@ -880,14 +791,11 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
goto _error; goto _error;
} }
SCreateDbReq* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf); int32_t bufLen = 0;
if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) { char* pBuf = buildCreateDbMsg(pCreateDB, &bufLen, pCtx, pMsgBuf);
code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
}
pDcl->pMsg = (char*)pCreateMsg; pDcl->pMsg = pBuf;
pDcl->msgLen = sizeof(SCreateDbReq); pDcl->msgLen = bufLen;
pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_DB) ? TDMT_MND_CREATE_DB : TDMT_MND_ALTER_DB; pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_DB) ? TDMT_MND_CREATE_DB : TDMT_MND_ALTER_DB;
break; break;
} }
...@@ -905,15 +813,18 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch ...@@ -905,15 +813,18 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
goto _error; goto _error;
} }
SDropDbReq* pDropDbMsg = (SDropDbReq*)calloc(1, sizeof(SDropDbReq)); SDropDbReq dropdbReq = {0};
code = tNameExtractFullName(&name, dropdbReq.db);
code = tNameExtractFullName(&name, pDropDbMsg->db); dropdbReq.ignoreNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
pDropDbMsg->ignoreNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_DB_NAME_T); assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_DB_NAME_T);
int32_t bufLen = tSerializeSDropDbReq(NULL, 0, &dropdbReq);
void* pBuf = malloc(bufLen);
tSerializeSDropDbReq(pBuf, bufLen, &dropdbReq);
pDcl->msgType = TDMT_MND_DROP_DB; pDcl->msgType = TDMT_MND_DROP_DB;
pDcl->msgLen = sizeof(SDropDbReq); pDcl->msgLen = bufLen;
pDcl->pMsg = (char*)pDropDbMsg; pDcl->pMsg = pBuf;
break; break;
} }
......
...@@ -55,30 +55,24 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 ...@@ -55,30 +55,24 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
if (NULL == input || NULL == msg || NULL == msgLen) { if (NULL == input || NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;
} }
SBuildUseDBInput* bInput = (SBuildUseDBInput *)input; SBuildUseDBInput *bInput = input;
int32_t estimateSize = sizeof(SUseDbReq); SUseDbReq usedbReq = {0};
if (NULL == *msg || msgSize < estimateSize) { strncpy(usedbReq.db, bInput->db, sizeof(usedbReq.db));
tfree(*msg); usedbReq.db[sizeof(usedbReq.db) - 1] = 0;
*msg = rpcMallocCont(estimateSize); usedbReq.vgVersion = bInput->vgVersion;
if (NULL == *msg) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
SUseDbReq *bMsg = (SUseDbReq *)*msg;
strncpy(bMsg->db, bInput->db, sizeof(bMsg->db)); int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
bMsg->db[sizeof(bMsg->db) - 1] = 0; void *pBuf = rpcMallocCont(bufLen);
tSerializeSUseDbReq(pBuf, bufLen, &usedbReq);
bMsg->vgVersion = bInput->vgVersion; *msg = pBuf;
*msgLen = bufLen;
*msgLen = (int32_t)sizeof(*bMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -133,7 +127,7 @@ int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { ...@@ -133,7 +127,7 @@ int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
return code; return code;
_return: _return:
taosArrayDestroy(usedbRsp.pVgroupInfos); tFreeSUsedbRsp(&usedbRsp);
if (pOut) { if (pOut) {
taosHashCleanup(pOut->dbVgroup->vgHash); taosHashCleanup(pOut->dbVgroup->vgHash);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册