diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4ff8af9b602170b71b1c38e9ed3bb5b08ab0d393..b0b0687d16e36458d215fcb7b7cc547e6c697791 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -168,7 +168,7 @@ typedef struct { int32_t vgId; char* dbFName; char* tbName; -} SBuildTableMetaInput; +} SBuildTableInput; typedef struct { char db[TSDB_DB_FNAME_LEN]; @@ -667,6 +667,12 @@ int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp int32_t tDeserializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp); +typedef struct { + SMsgHead header; + char dbFName[TSDB_DB_FNAME_LEN]; + char tbName[TSDB_TABLE_NAME_LEN]; +} STableCfgReq; + typedef struct { char tbName[TSDB_TABLE_NAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN]; @@ -684,6 +690,15 @@ typedef struct { SSchema* pSchemas; } STableCfg; +typedef STableCfg STableCfgRsp; + +int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq); +int32_t tDeserializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq); + +int32_t tSerializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp); +int32_t tDeserializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp); + + typedef struct { char db[TSDB_DB_FNAME_LEN]; int32_t numOfVgroups; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index f89f548fc5b97bca0fe6cfb0164debda72627c77..a6c40cd6a9c14eaa297c24bba29514ce33c30a37 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -131,6 +131,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_INDEX, "get-table-index", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL) @@ -171,6 +172,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_UPDATE_TAG_VAL, "update-tag-val", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TABLE_CFG, "vnode-table-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2443ffbb7f24fb3a405fb5445cd14d95f662351d..b88b9f660cb2c3a70ae30c8591ecee8cafe8d660 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1757,6 +1757,61 @@ void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp *pRsp) { taosArrayDestroy(pRsp->pFuncInfos); } +int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + if (buf != NULL) { + buf = (char *)buf + headLen; + bufLen -= headLen; + } + + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->tbName) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + if (buf != NULL) { + SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); + pHead->vgId = htonl(pReq->header.vgId); + pHead->contLen = htonl(tlen + headLen); + } + + return tlen + headLen; +} + +int32_t tDeserializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + + SMsgHead *pHead = buf; + pHead->vgId = pReq->header.vgId; + pHead->contLen = pReq->header.contLen; + + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->tbName) < 0) return -1; + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp) { + +} + +int32_t tDeserializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp) { + +} + + int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index a845ae7b39d2582e52a6a01cfb222d848d3abc25..ffc6fc137ca826102f232aaa28b7c81723d6f4d8 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -180,6 +180,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_CFG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index f112727c0863849119468708af1ce2d2ffbf605c..f6459a9ee1bf671d166f19ef066617ce14ecbecc 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -330,6 +330,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index c37cdd188875bef4f16a4c5cd4827a99c761df56..2865f5cbc76f2ddc6b7feded1204640a2794f21c 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -62,6 +62,7 @@ int32_t mndInitStb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq); + mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 2333a93ce094e000b725e4df917d7b088f4cc4e3..31d8cf9a60bb9159b4bd46c265d4a68c1821eab2 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -124,6 +124,102 @@ _exit: return TSDB_CODE_SUCCESS; } +int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg) { + STableCfgReq cfgReq = {0}; + STableCfgRsp cfgRsp = {0}; + SMetaReader mer1 = {0}; + SMetaReader mer2 = {0}; + char tableFName[TSDB_TABLE_FNAME_LEN]; + SRpcMsg rpcMsg; + int32_t code = 0; + int32_t rspLen = 0; + void *pRsp = NULL; + SSchemaWrapper schema = {0}; + SSchemaWrapper schemaTag = {0}; + + // decode req + if (tDeserializeSTableCfgReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + strcpy(cfgRsp.tbName, cfgReq.tbName); + memcpy(cfgRsp.dbFName, cfgReq.dbFName, sizeof(cfgRsp.dbFName)); + + sprintf(tableFName, "%s.%s", cfgReq.dbFName, cfgReq.tbName); + code = vnodeValidateTableHash(pVnode, tableFName); + if (code) { + goto _exit; + } + + // query meta + metaReaderInit(&mer1, pVnode->pMeta, 0); + + if (metaGetTableEntryByName(&mer1, cfgReq.tbName) < 0) { + code = terrno; + goto _exit; + } + + cfgRsp.tableType = mer1.me.type; + + if (mer1.me.type == TSDB_SUPER_TABLE) { + strcpy(cfgRsp.stbName, mer1.me.name); + schema = mer1.me.stbEntry.schemaRow; + schemaTag = mer1.me.stbEntry.schemaTag; + } else if (mer1.me.type == TSDB_CHILD_TABLE) { + metaReaderInit(&mer2, pVnode->pMeta, 0); + if (metaGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; + + strcpy(cfgRsp.stbName, mer2.me.name); + schema = mer2.me.stbEntry.schemaRow; + schemaTag = mer2.me.stbEntry.schemaTag; + } else if (mer1.me.type == TSDB_NORMAL_TABLE) { + schema = mer1.me.ntbEntry.schemaRow; + } else { + ASSERT(0); + } + + cfgRsp.numOfTags = schemaTag.nCols; + cfgRsp.numOfColumns = schema.nCols; + cfgRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (cfgRsp.numOfColumns + cfgRsp.numOfTags)); + + memcpy(cfgRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols); + if (schemaTag.nCols) { + memcpy(cfgRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols); + } + + // encode and send response + rspLen = tSerializeSTableCfgRsp(NULL, 0, &cfgRsp); + if (rspLen < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + tSerializeSTableCfgRsp(pRsp, rspLen, &cfgRsp); + +_exit: + rpcMsg.info = pMsg->info; + rpcMsg.pCont = pRsp; + rpcMsg.contLen = rspLen; + rpcMsg.code = code; + + if (code) { + qError("get table %s cfg failed cause of %s", cfgReq.tbName, tstrerror(code)); + } + + tmsgSendRsp(&rpcMsg); + + taosMemoryFree(cfgRsp.pSchemas); + metaReaderClear(&mer2); + metaReaderClear(&mer1); + return TSDB_CODE_SUCCESS; +} + int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->vgId = TD_VID(pVnode); pLoad->syncState = syncGetMyRole(pVnode->sync); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index feb8419bdf3a1333e962ff164bd87b21bb0f26ca..4b1e6f800ec30a65c96072c67ce00f4046231c71 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -251,6 +251,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_VND_TABLE_META: return vnodeGetTableMeta(pVnode, pMsg); + case TDMT_VND_TABLE_CFG: + return vnodeGetTableCfg(pVnode, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId); case TDMT_STREAM_TASK_RUN: diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 7c90c3538cc3428d6c6456c8c67f9acb4818e82c..76b8634c5b765c2ee7fca255b3eb20d79007a3a2 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -520,6 +520,7 @@ int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp); int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation); int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation); int32_t ctgOpClearCache(SCtgCacheOperation *operation); +int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tableName, int32_t *tbType); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index bb02895569c180ea442992f60818e541a842eb8d..7096d4b750fab85da46ed25a426a92d20bb55796 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -381,6 +381,23 @@ _return: return TSDB_CODE_SUCCESS; } +int32_t ctgGetTbType(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, int32_t *tbType) { + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pTableName, dbFName); + CTG_ERR_RET(ctgReadTbTypeFromCache(pCtg, dbFName, pTableName->tname, tbType)); + if (*tbType > 0) { + return TSDB_CODE_SUCCESS; + } + + STableMeta* pMeta = NULL; + CTG_ERR_RET(catalogGetTableMeta(pCtg, pConn, pTableName, &pMeta)); + + *tbType = pMeta->tableType; + taosMemoryFree(pMeta); + + return TSDB_CODE_SUCCESS; +} + int32_t ctgGetTbIndex(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, SArray** pRes) { CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pTableName, pRes)); if (*pRes) { @@ -419,6 +436,20 @@ _return: CTG_RET(code); } +int32_t ctgGetTbCfg(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, STableCfg** pCfg) { + int32_t tbType = 0; + CTG_ERR_RET(ctgGetTbType(pCtg, pConn, pTableName, &tbType)); + + if (TSDB_SUPER_TABLE == tbType) { + CTG_ERR_RET(ctgGetTableCfgFromMnode(pCtg, pConn, pTableName, pCfg, NULL)); + } else { + SVgroupInfo vgroupInfo = {0}; + CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pConn, pTableName, &vgroupInfo)); + CTG_ERR_RET(ctgGetTableCfgFromVnode(pCtg, pConn, pTableName, &vgroupInfo, pCfg, NULL)); + } + + CTG_RET(TSDB_CODE_SUCCESS); +} int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, SArray** pVgList) { STableMeta *tbMeta = NULL; @@ -1207,6 +1238,16 @@ _return: CTG_API_LEAVE(code); } +int32_t catalogGetTableCfg(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg** pCfg) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pConn || NULL == pTableName || NULL == pCfg) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + CTG_API_LEAVE(ctgGetTbCfg(pCtg, pConn, pTableName, pCfg, NULL)); +} + int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char* funcName, SFuncInfo* pInfo) { CTG_API_ENTER(); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 3650b4d9686fd99fdbe9fe9e8ec2625c0daeadfc..c87965c504abf6336ae7a9afddb0c69bf1d79d55 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -540,10 +540,10 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, } -int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tableName, int32_t *tbType) { +int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tbName, int32_t *tbType) { SCtgDBCache *dbCache = NULL; SCtgTbCache *tbCache = NULL; - CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tableName, &dbCache, &tbCache)); + CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache)); if (NULL == tbCache) { ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); return TSDB_CODE_SUCCESS; @@ -552,7 +552,7 @@ int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tableName, i *tbType = tbCache->pMeta->tableType; ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tableName, *tbType, dbFName); + ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index fa1a26283286d01cc4574c4babbb28ad5fb36996..885aaa3a7c405559443a47848fd299befa804000 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -550,7 +550,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask) { - SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; + SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; @@ -608,7 +608,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa ctgDebug("try to get table meta from vnode, vgId:%d, tbFName:%s", vgroupInfo->vgId, tbFName); - SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)}; + SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)}; char *msg = NULL; int32_t msgLen = 0; @@ -646,4 +646,83 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa return TSDB_CODE_SUCCESS; } +int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask) { + char *msg = NULL; + int32_t msgLen = 0; + int32_t reqType = TDMT_VND_TABLE_CFG; + char tbFName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pTableName, tbFName); + void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pTableName, dbFName); + SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = pTableName->tname}; + + ctgDebug("try to get table cfg from vnode, vgId:%d, tbFName:%s", vgroupInfo->vgId, tbFName); + + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); + if (code) { + ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName); + CTG_ERR_RET(code); + } + + if (pTask) { + CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + } + + SRpcMsg rpcMsg = { + .msgType = reqType, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); + + CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask) { + char *msg = NULL; + int32_t msgLen = 0; + int32_t reqType = TDMT_MND_TABLE_CFG; + char tbFName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pTableName, tbFName); + void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pTableName, dbFName); + SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = pTableName->tname}; + + ctgDebug("try to get table cfg from mnode, tbFName:%s", tbFName); + + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); + if (code) { + ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName); + CTG_ERR_RET(code); + } + + if (pTask) { + CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + } + + SRpcMsg rpcMsg = { + .msgType = reqType, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); + + CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); + + return TSDB_CODE_SUCCESS; +} + diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index b89c220519c60c3c3b67cc10f001ff3447448bbd..2766e068a9420d5db014ae7963c4feed3e8abf68 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -63,7 +63,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { } int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { - SBuildTableMetaInput *pInput = input; + SBuildTableInput *pInput = input; if (NULL == input || NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -221,6 +221,27 @@ int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_ return TSDB_CODE_SUCCESS; } +int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { + if (NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SBuildTableInput *pInput = input; + STableCfgReq cfgReq = {0}; + cfgReq.header.vgId = pInput->vgId; + strcpy(cfgReq.dbFName, pInput->dbFName); + strcpy(cfgReq.tbName, pInput->tbName); + + int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq); + void *pBuf = (*mallcFp)(bufLen); + tSerializeSTableCfgReq(pBuf, bufLen, &cfgReq); + + *msg = pBuf; + *msgLen = bufLen; + + return TSDB_CODE_SUCCESS; +} + int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { SUseDbOutput *pOut = output; @@ -494,6 +515,24 @@ int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) { } +int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) { + if (NULL == output || NULL == msg || msgSize <= 0) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + STableCfgRsp *out = taosMemoryCalloc(1, sizeof(STableCfgRsp)); + if (tDeserializeSTableCfgRsp(msg, msgSize, out) != 0) { + qError("tDeserializeSTableCfgRsp failed, msgSize:%d", msgSize); + return TSDB_CODE_INVALID_MSG; + } + + *(STableCfgRsp**)output = out; + + return TSDB_CODE_SUCCESS; +} + + + void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; @@ -504,6 +543,8 @@ void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryBuildGetUserAuthMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryBuildGetTbIndexMsg; + queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryBuildGetTbCfgMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryBuildGetTbCfgMsg; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; @@ -514,6 +555,8 @@ void initQueryModuleMsgHandle() { queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryProcessGetUserAuthRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp; } #pragma GCC diagnostic pop