提交 b4203385 编写于 作者: D dapan1121

feature/qnode

上级 bd10336a
......@@ -774,8 +774,8 @@ typedef struct {
} SAuthVnodeMsg;
typedef struct {
int32_t vgId;
char tableFname[TSDB_TABLE_FNAME_LEN];
SMsgHead header;
char tableFname[TSDB_TABLE_FNAME_LEN];
} STableInfoMsg;
typedef struct {
......@@ -1059,6 +1059,7 @@ typedef struct {
} SUpdateTagValRsp;
typedef struct SSubQueryMsg {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
......@@ -1067,6 +1068,7 @@ typedef struct SSubQueryMsg {
} SSubQueryMsg;
typedef struct SResReadyMsg {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
......@@ -1077,6 +1079,7 @@ typedef struct SResReadyRsp {
} SResReadyRsp;
typedef struct SResFetchMsg {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
......
......@@ -121,8 +121,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg;
......
......@@ -17,6 +17,7 @@
#include "vnodeDef.h"
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); }
......@@ -43,6 +44,8 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case TDMT_VND_SHOW_TABLES_FETCH:
return vnodeGetTableList(pVnode, pMsg);
// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg);
case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg, pRsp);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
......@@ -88,7 +91,8 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pTagSchema = NULL;
}
pTbMetaMsg = (STableMetaMsg *)calloc(1, sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols));
int msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols);
pTbMetaMsg = (STableMetaMsg *)rpcMallocCont(msgLen);
if (pTbMetaMsg == NULL) {
return -1;
}
......@@ -115,6 +119,16 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pSch->bytes = htonl(pSch->bytes);
}
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pTbMetaMsg,
.contLen = msgLen,
.code = 0,
};
rpcSendResponse(&rpcMsg);
return 0;
}
......@@ -166,4 +180,4 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
rpcSendResponse(&rpcMsg);
return 0;
}
\ No newline at end of file
}
......@@ -34,7 +34,6 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S
if (NULL == info) {
*inCache = false;
assert(0);
ctgWarn("no db cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
}
......@@ -190,14 +189,13 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
}
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
char tbFullName[TSDB_TABLE_FNAME_LEN];
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);
tNameExtractFullName(pTableName, tbFullName);
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName};
char *msg = NULL;
......@@ -429,8 +427,6 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
if (0 == forceUpdate) {
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
assert(inCache);
if (inCache) {
return TSDB_CODE_SUCCESS;
}
......@@ -694,9 +690,9 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
STableMetaOutput output = {0};
//CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output));
CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo, &output));
CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
//CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
......
......@@ -40,7 +40,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
STableInfoMsg *bMsg = (STableInfoMsg *)*msg;
bMsg->vgId = bInput->vgId;
bMsg->header.vgId = htonl(bInput->vgId);
strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname));
bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册