From 9f4744c6f3d7897d947d482fc8249e228c27a869 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 8 Apr 2022 16:05:30 +0800 Subject: [PATCH] feature/qnode --- include/common/tmsg.h | 37 +++++++++ include/common/tmsgdef.h | 1 + include/libs/catalog/catalog.h | 4 + source/common/src/tmsg.c | 92 +++++++++++++++++++++ source/dnode/mgmt/mm/mmHandle.c | 1 + source/dnode/mnode/impl/inc/mndDef.h | 2 + source/dnode/mnode/impl/src/mndDb.c | 90 ++++++++++++++++++++ source/dnode/mnode/impl/src/mndInfoSchema.c | 6 +- source/libs/catalog/src/catalog.c | 48 ++++++++++- source/libs/parser/src/parTranslater.c | 2 + source/libs/qcom/src/querymsg.c | 40 ++++++++- source/libs/scheduler/src/scheduler.c | 6 +- 12 files changed, 322 insertions(+), 7 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 61da9cb213..4efb68d490 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -512,6 +512,7 @@ typedef struct { int32_t maxRows; int32_t commitTime; int32_t fsyncPeriod; + int32_t ttl; int8_t walLevel; int8_t precision; // time resolution int8_t compression; @@ -521,6 +522,7 @@ typedef struct { int8_t cacheLastRow; int8_t ignoreExist; int8_t streamMode; + int8_t singleSTable; int32_t numOfRetensions; SArray* pRetensions; // SRetention } SCreateDbReq; @@ -586,6 +588,41 @@ int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp); int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp); void tFreeSUsedbRsp(SUseDbRsp* pRsp); +typedef struct { + char db[TSDB_DB_FNAME_LEN]; +} SDbCfgReq; + +int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq); +int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq); + +typedef struct { + int32_t numOfVgroups; + int32_t cacheBlockSize; + int32_t totalBlocks; + int32_t daysPerFile; + int32_t daysToKeep0; + int32_t daysToKeep1; + int32_t daysToKeep2; + int32_t minRows; + int32_t maxRows; + int32_t commitTime; + int32_t fsyncPeriod; + int32_t ttl; + int8_t walLevel; + int8_t precision; + int8_t compression; + int8_t replications; + int8_t quorum; + int8_t update; + int8_t cacheLastRow; + int8_t streamMode; + int8_t singleSTable; +} SDbCfgRsp; + +int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp); +int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp); + + typedef struct { int32_t rowNum; } SQnodeListReq; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 31ca2ac215..63d1fc5014 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -155,6 +155,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "mnode-drop-stream", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 9f0d4b11c2..a0b342fca2 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -77,6 +77,8 @@ typedef struct SDbVgVersion { int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SDbVgVersion; +typedef SDbCfgRsp SDbCfgInfo; + int32_t catalogInit(SCatalogCfg *cfg); /** @@ -217,6 +219,8 @@ int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stable int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num); +int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg); + /** * Destroy catalog and relase all resources diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index acb77648ae..57a8953253 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1515,6 +1515,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) { if (tEncodeI32(&encoder, pReq->maxRows) < 0) return -1; if (tEncodeI32(&encoder, pReq->commitTime) < 0) return -1; if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1; + if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1; if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1; if (tEncodeI8(&encoder, pReq->precision) < 0) return -1; if (tEncodeI8(&encoder, pReq->compression) < 0) return -1; @@ -1524,6 +1525,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) { if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1; if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1; if (tEncodeI8(&encoder, pReq->streamMode) < 0) return -1; + if (tEncodeI8(&encoder, pReq->singleSTable) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1; for (int32_t i = 0; i < pReq->numOfRetensions; ++i) { SRetention *pRetension = taosArrayGet(pReq->pRetensions, i); @@ -1556,6 +1558,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) if (tDecodeI32(&decoder, &pReq->maxRows) < 0) return -1; if (tDecodeI32(&decoder, &pReq->commitTime) < 0) return -1; if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1; if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1; if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1; if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1; @@ -1565,6 +1568,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1; if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1; if (tDecodeI8(&decoder, &pReq->streamMode) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->singleSTable) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1; pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention)); if (pReq->pRetensions == NULL) { @@ -1942,6 +1946,94 @@ void tFreeSUseDbBatchRsp(SUseDbBatchRsp *pRsp) { taosArrayDestroy(pRsp->pArray); } +int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->numOfVgroups) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->cacheBlockSize) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->totalBlocks) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->daysPerFile) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->daysToKeep0) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->daysToKeep1) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->daysToKeep2) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->minRows) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->maxRows) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->commitTime) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->fsyncPeriod) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->walLevel) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->precision) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->compression) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->replications) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->quorum) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->update) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->cacheLastRow) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->streamMode) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->numOfVgroups) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->cacheBlockSize) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->totalBlocks) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->daysPerFile) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->daysToKeep0) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->daysToKeep1) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->daysToKeep2) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->minRows) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->maxRows) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->commitTime) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->fsyncPeriod) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->walLevel) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->precision) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->compression) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->replications) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->quorum) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->update) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->cacheLastRow) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->streamMode) < 0) return -1; + + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + + int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); diff --git a/source/dnode/mgmt/mm/mmHandle.c b/source/dnode/mgmt/mm/mmHandle.c index dc7725dcd2..e5495c7e1d 100644 --- a/source/dnode/mgmt/mm/mmHandle.c +++ b/source/dnode/mgmt/mm/mmHandle.c @@ -146,6 +146,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE); // Requests handled by VNODE dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 38ef1185e8..38dbdc4799 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -260,6 +260,7 @@ typedef struct { int32_t maxRows; int32_t commitTime; int32_t fsyncPeriod; + int32_t ttl; int8_t walLevel; int8_t precision; int8_t compression; @@ -268,6 +269,7 @@ typedef struct { int8_t update; int8_t cacheLastRow; int8_t streamMode; + int8_t singleSTable; int32_t numOfRetensions; SArray* pRetensions; } SDbCfg; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 44c547bec3..ff2ea162fb 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -40,6 +40,7 @@ static int32_t mndProcessCompactDbReq(SNodeMsg *pReq); static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextDb(SMnode *pMnode, void *pIter); +static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq); int32_t mndInitDb(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_DB, @@ -56,6 +57,7 @@ int32_t mndInitDb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_USE_DB, mndProcessUseDbReq); mndSetMsgHandle(pMnode, TDMT_MND_SYNC_DB, mndProcessSyncDbReq); mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq); + mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DB, mndGetDbMeta); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs); @@ -268,6 +270,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) { if (pCfg->minRows > pCfg->maxRows) return -1; if (pCfg->commitTime < TSDB_MIN_COMMIT_TIME || pCfg->commitTime > TSDB_MAX_COMMIT_TIME) return -1; if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) return -1; + if (pCfg->ttl < TSDB_MIN_DB_TTL_OPTION && pCfg->ttl != TSDB_DEFAULT_DB_TTL_OPTION) return -1; if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL || pCfg->walLevel > TSDB_MAX_WAL_LEVEL) return -1; if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) return -1; if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1; @@ -278,6 +281,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) { if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) return -1; if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1; if (pCfg->streamMode < TSDB_MIN_DB_STREAM_MODE || pCfg->streamMode > TSDB_MAX_DB_STREAM_MODE) return -1; + if (pCfg->singleSTable < TSDB_MIN_DB_SINGLE_STABLE_OPTION || pCfg->streamMode > TSDB_MAX_DB_SINGLE_STABLE_OPTION) return -1; return TSDB_CODE_SUCCESS; } @@ -293,6 +297,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->maxRows < 0) pCfg->maxRows = TSDB_DEFAULT_MAX_ROW_FBLOCK; if (pCfg->commitTime < 0) pCfg->commitTime = TSDB_DEFAULT_COMMIT_TIME; if (pCfg->fsyncPeriod < 0) pCfg->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; + if (pCfg->ttl < 0) pCfg->ttl = TSDB_DEFAULT_DB_TTL_OPTION; if (pCfg->walLevel < 0) pCfg->walLevel = TSDB_DEFAULT_WAL_LEVEL; if (pCfg->precision < 0) pCfg->precision = TSDB_DEFAULT_PRECISION; if (pCfg->compression < 0) pCfg->compression = TSDB_DEFAULT_COMP_LEVEL; @@ -301,6 +306,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->update < 0) pCfg->update = TSDB_DEFAULT_DB_UPDATE_OPTION; if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW; if (pCfg->streamMode < 0) pCfg->streamMode = TSDB_DEFAULT_DB_STREAM_MODE; + if (pCfg->singleSTable < 0) pCfg->singleSTable = TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION; if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0; } @@ -437,6 +443,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate .maxRows = pCreate->maxRows, .commitTime = pCreate->commitTime, .fsyncPeriod = pCreate->fsyncPeriod, + .ttl = pCreate->ttl, .walLevel = pCreate->walLevel, .precision = pCreate->precision, .compression = pCreate->compression, @@ -445,6 +452,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate .update = pCreate->update, .cacheLastRow = pCreate->cacheLastRow, .streamMode = pCreate->streamMode, + .singleSTable = pCreate->singleSTable, }; dbObj.cfg.numOfRetensions = pCreate->numOfRetensions; @@ -730,6 +738,71 @@ ALTER_DB_OVER: return code; } +static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; + int32_t code = -1; + SDbObj *pDb = NULL; + SDbCfgReq cfgReq = {0}; + SDbCfgRsp cfgRsp = {0}; + + if (tDeserializeSDbCfgReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &cfgReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto GET_DB_CFG_OVER; + } + + pDb = mndAcquireDb(pMnode, cfgReq.db); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_EXIST; + goto GET_DB_CFG_OVER; + } + + cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups; + cfgRsp.cacheBlockSize = pDb->cfg.cacheBlockSize; + cfgRsp.totalBlocks = pDb->cfg.totalBlocks; + cfgRsp.daysPerFile = pDb->cfg.daysPerFile; + cfgRsp.daysToKeep0 = pDb->cfg.daysToKeep0; + cfgRsp.daysToKeep1 = pDb->cfg.daysToKeep1; + cfgRsp.daysToKeep2 = pDb->cfg.daysToKeep2; + cfgRsp.minRows = pDb->cfg.minRows; + cfgRsp.maxRows = pDb->cfg.maxRows; + cfgRsp.commitTime = pDb->cfg.commitTime; + cfgRsp.fsyncPeriod = pDb->cfg.fsyncPeriod; + cfgRsp.ttl = pDb->cfg.ttl; + cfgRsp.walLevel = pDb->cfg.walLevel; + cfgRsp.precision = pDb->cfg.precision; + cfgRsp.compression = pDb->cfg.compression; + cfgRsp.replications = pDb->cfg.replications; + cfgRsp.quorum = pDb->cfg.quorum; + cfgRsp.update = pDb->cfg.update; + cfgRsp.cacheLastRow = pDb->cfg.cacheLastRow; + cfgRsp.streamMode = pDb->cfg.streamMode; + cfgRsp.singleSTable = pDb->cfg.singleSTable; + + int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp); + void *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = -1; + goto GET_DB_CFG_OVER; + } + + tSerializeSDbCfgRsp(pRsp, contLen, &cfgRsp); + + pReq->pRsp = pRsp; + pReq->rspLen = contLen; + +GET_DB_CFG_OVER: + + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("db:%s, failed to get cfg since %s", cfgReq.db, terrstr()); + } + + mndReleaseDb(pMnode, pDb); + + return code; +} + + static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { SSdbRaw *pRedoRaw = mndDbActionEncode(pDb); if (pRedoRaw == NULL) return -1; @@ -1509,6 +1582,23 @@ static void dumpDbInfoToPayload(char *data, SDbObj *pDb, SShowObj *pShow, int32_ STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2); cols++; + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int32_t *)pWrite = pDb->cfg.ttl; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int8_t *)pWrite = pDb->cfg.singleSTable; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int8_t *)pWrite = pDb->cfg.streamMode; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + char *status = "ready"; + STR_WITH_SIZE_TO_VARSTR(pWrite, status, strlen(status)); + cols++; + // pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); // *(int8_t *)pWrite = pDb->cfg.update; } diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index 92b854157b..aadd914439 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -63,7 +63,11 @@ static const SInfosTableSchema userDBSchema[] = { {.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, {.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, - {.name = "precision", .bytes = 3 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "ttl", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "single_stable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "stream_mode", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, // {.name = "update", .bytes = 1, .type = // TSDB_DATA_TYPE_TINYINT}, // disable update }; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index e1f5332899..6ed09ce7eb 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -569,6 +569,44 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE return TSDB_CODE_SUCCESS; } +int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *dbFName, SDbCfgInfo *out) { + char *msg = NULL; + int32_t msgLen = 0; + + ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName); + + int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)]((void *)dbFName, &msg, 0, &msgLen); + if (code) { + ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName); + CTG_ERR_RET(code); + } + + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_GET_DB_CFG, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + if (TSDB_CODE_SUCCESS != rpcRsp.code) { + ctgError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rpcRsp.code), dbFName); + CTG_ERR_RET(rpcRsp.code); + } + + code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)](out, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + ctgError("Process get db cfg rsp failed, code:%x, db:%s", code, dbFName); + CTG_ERR_RET(code); + } + + ctgDebug("Got db cfg from mnode, dbFName:%s", dbFName); + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) { if (NULL == pCtg->dbCache) { *exist = 0; @@ -2137,7 +2175,6 @@ _return: CTG_RET(code); } - int32_t catalogInit(SCatalogCfg *cfg) { if (gCtgMgmt.pCluster) { qError("catalog already initialized"); @@ -2717,6 +2754,15 @@ int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num) CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion))); } +int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == dbFName || NULL == pDbCfg) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + CTG_API_LEAVE(ctgGetDBCfgFromMnode(pCtg, pRpc, pMgmtEps, dbFName, pDbCfg)); +} void catalogDestroy(void) { qInfo("start to destroy catalog"); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3491974eab..8b3bf5b492 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -984,6 +984,8 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS pReq->cacheLastRow = GET_OPTION_VAL(pStmt->pOptions->pCachelast, TSDB_DEFAULT_CACHE_LAST_ROW); pReq->ignoreExist = pStmt->ignoreExists; pReq->streamMode = GET_OPTION_VAL(pStmt->pOptions->pStreamMode, TSDB_DEFAULT_DB_STREAM_MODE_OPTION); + pReq->ttl = GET_OPTION_VAL(pStmt->pOptions->pTtl, TSDB_DEFAULT_DB_TTL_OPTION); + pReq->singleSTable = GET_OPTION_VAL(pStmt->pOptions->pSingleStable, TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION); return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq); } diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 1e91c55dc0..d211e780b0 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -121,6 +121,23 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } +int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SDbCfgReq dbCfgReq = {0}; + strcpy(dbCfgReq.db, input); + + int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq); + void *pBuf = rpcMallocCont(bufLen); + tSerializeSDbCfgReq(pBuf, bufLen, &dbCfgReq); + + *msg = pBuf; + *msgLen = bufLen; + + return TSDB_CODE_SUCCESS; +} int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { SUseDbOutput *pOut = output; @@ -309,17 +326,36 @@ PROCESS_QLIST_OVER: return code; } +int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) { + SDbCfgRsp out = {0}; + + if (NULL == output || NULL == msg || msgSize <= 0) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + if (tDeserializeSDbCfgRsp(msg, msgSize, &out) != 0) { + qError("tDeserializeSDbCfgRsp failed, msgSize:%d", msgSize); + return TSDB_CODE_INVALID_MSG; + } + + memcpy(output, &out, sizeof(out)); + + return TSDB_CODE_SUCCESS; +} + void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp; } #pragma GCC diagnostic pop diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index d1352bac24..5fb6069eff 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1251,8 +1251,8 @@ int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask ** return TSDB_CODE_SUCCESS; } -int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle) { - if (NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 || taosArrayGetSize(pTask->execNodes) <= 0) { +int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) { + if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 || taosArrayGetSize(pTask->execNodes) <= 0) { return TSDB_CODE_SUCCESS; } @@ -1293,7 +1293,7 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode)); SCH_SET_TASK_HANDLE(pTask, pMsg->handle); - schUpdateTaskExecNodeHandle(pTask, pMsg->handle); + schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); _return: -- GitLab