diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 17a4d4ad2cbcc446ebe860b8cb0d747189485053..0b7ba24c6b174ad0698d9520d1b115a0dc9f7c20 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -774,8 +774,8 @@ typedef struct { } SAuthVnodeMsg; typedef struct { - int32_t vgId; - char tableFname[TSDB_TABLE_FNAME_LEN]; + SMsgHead header; + char tableFname[TSDB_TABLE_FNAME_LEN]; } STableInfoMsg; typedef struct { @@ -1059,6 +1059,7 @@ typedef struct { } SUpdateTagValRsp; typedef struct SSubQueryMsg { + SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; @@ -1067,6 +1068,7 @@ typedef struct SSubQueryMsg { } SSubQueryMsg; typedef struct SResReadyMsg { + SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; @@ -1077,6 +1079,7 @@ typedef struct SResReadyRsp { } SResReadyRsp; typedef struct SResFetchMsg { + SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index e5be16937bb4bc5eb909eba8cccf4f94ac2578c6..dc5248c8e0ecd5cfddab311d2db5b8cb1ab12897 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -121,8 +121,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeQueryMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeQueryMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg; diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index 01df929080265477536b07a47aa42009051e18ab..daa1e964dea80b11f12bb3a861a69c6437563f37 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -17,6 +17,7 @@ #include "vnodeDef.h" static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); +static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); } @@ -43,6 +44,8 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { case TDMT_VND_SHOW_TABLES_FETCH: return vnodeGetTableList(pVnode, pMsg); // return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg); + case TDMT_VND_TABLE_META: + return vnodeGetTableMeta(pVnode, pMsg, pRsp); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; @@ -88,7 +91,8 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { pTagSchema = NULL; } - pTbMetaMsg = (STableMetaMsg *)calloc(1, sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols)); + int msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols); + pTbMetaMsg = (STableMetaMsg *)rpcMallocCont(msgLen); if (pTbMetaMsg == NULL) { return -1; } @@ -115,6 +119,16 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { pSch->bytes = htonl(pSch->bytes); } + SRpcMsg rpcMsg = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pTbMetaMsg, + .contLen = msgLen, + .code = 0, + }; + + rpcSendResponse(&rpcMsg); + return 0; } @@ -166,4 +180,4 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { rpcSendResponse(&rpcMsg); return 0; -} \ No newline at end of file +} diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index b8fa779e70db601d91543115745a6f5a3ec8c479..68ff1b8557f3276f800f0bb33ea21bcbcab12444 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -34,7 +34,6 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S if (NULL == info) { *inCache = false; - assert(0); ctgWarn("no db cache, dbName:%s", dbName); return TSDB_CODE_SUCCESS; } @@ -190,14 +189,13 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE } -int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) { +int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } char tbFullName[TSDB_TABLE_FNAME_LEN]; - - snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName); + tNameExtractFullName(pTableName, tbFullName); SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName}; char *msg = NULL; @@ -429,8 +427,6 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm if (0 == forceUpdate) { CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache)); - assert(inCache); - if (inCache) { return TSDB_CODE_SUCCESS; } @@ -694,9 +690,9 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe STableMetaOutput output = {0}; - //CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output)); + CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo, &output)); - CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output)); + //CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output)); CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output)); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 27ca406fc47745d2857719865d079f371c22064b..117297b9ffa90b93d56be5a872950a4f0ac9d1ff 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -40,7 +40,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 STableInfoMsg *bMsg = (STableInfoMsg *)*msg; - bMsg->vgId = bInput->vgId; + bMsg->header.vgId = htonl(bInput->vgId); strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;