未验证 提交 7fba094c 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #9915 from taosdata/feature/tq

add msg for vnode consume
......@@ -1552,6 +1552,25 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
return buf;
}
typedef struct SMqCVConsumeReq {
int64_t reqId;
int64_t offset;
int64_t clientId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqCVConsumeReq;
typedef struct SMqCVConsumeRsp {
int64_t reqId;
int64_t clientId;
int64_t committedOffset;
int64_t receiveOffset;
int64_t rspOffset;
int32_t skipLogNum;
int32_t bodyLen;
char topicName[TSDB_TOPIC_FNAME_LEN];
char body[];
} SMqCvConsumeRsp;
#ifdef __cplusplus
}
......
......@@ -175,6 +175,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SCHEDULE_DATA_SINK, "vnode-schedule-data-sink", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
// Requests handled by QNODE
......
......@@ -464,7 +464,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
taosArrayPush(pSub->availConsumer, &consumerId);
//TODO: no need
// TODO: no need
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
taosArrayPush(pConsumer->topics, pConsumerTopic);
......@@ -542,7 +542,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
return 0;
}
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
......
......@@ -25,6 +25,7 @@ target_link_libraries(
PUBLIC bdb
PUBLIC tfs
PUBLIC wal
PUBLIC scheduler
PUBLIC qworker
)
......
......@@ -18,14 +18,15 @@
#include "common.h"
#include "mallocator.h"
#include "meta.h"
#include "os.h"
#include "scheduler.h"
#include "taoserror.h"
#include "tmsg.h"
#include "tlist.h"
#include "tmsg.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"
#include "meta.h"
#ifdef __cplusplus
extern "C" {
......@@ -150,15 +151,40 @@ typedef struct STqListHandle {
} STqList;
typedef struct STqGroup {
int64_t clientId;
int64_t cgId;
void* ahandle;
int32_t topicNum;
int64_t clientId;
int64_t cgId;
void* ahandle;
int32_t topicNum;
STqList* head;
SList* topicList; // SList<STqTopic>
STqRspHandle rspHandle;
} STqGroup;
typedef struct STqTaskItem {
int32_t status;
void* dst;
SSubQueryMsg* pMsg;
} STqTaskItem;
// new version
typedef struct STqBuffer {
int64_t firstOffset;
int64_t lastOffset;
STqTaskItem output[TQ_BUFFER_SIZE];
} STqBuffer;
typedef struct STqClientHandle {
int64_t clientId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cGroup[TSDB_TOPIC_FNAME_LEN];
char* sql;
char* logicalPlan;
char* physicalPlan;
int64_t committedOffset;
int64_t currentOffset;
STqBuffer buffer;
} STqClientHandle;
typedef struct STqQueryMsg {
STqMsgItem* item;
struct STqQueryMsg* next;
......@@ -253,7 +279,7 @@ typedef struct STqMetaStore {
// a table head
STqMetaList* unpersistHead;
// topics that are not connectted
STqMetaList* unconnectTopic;
STqMetaList* unconnectTopic;
// TODO:temporaral use, to be replaced by unified tfile
int fileFd;
......@@ -316,24 +342,22 @@ const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
static int tqQueryExecuting(int32_t status) { return status; }
typedef struct STqReadHandle {
int64_t ver;
SSubmitMsg* pMsg;
SSubmitBlk* pBlock;
int64_t ver;
SSubmitMsg* pMsg;
SSubmitBlk* pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
SMeta* pMeta;
SMeta* pMeta;
} STqReadHandle;
typedef struct SSubmitBlkScanInfo {
} SSubmitBlkScanInfo;
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg);
bool tqNextDataBlock(STqReadHandle* pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo *pBlockInfo);
//return SArray<SColumnInfoData>
SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList);
//int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status);
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg);
bool tqNextDataBlock(STqReadHandle* pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);
// return SArray<SColumnInfoData>
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList);
#ifdef __cplusplus
}
......
......@@ -177,4 +177,4 @@ bool vmaIsFull(SVMemAllocator* pVMA);
}
#endif
#endif /*_TD_VNODE_DEF_H_*/
\ No newline at end of file
#endif /*_TD_VNODE_DEF_H_*/
......@@ -57,6 +57,8 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg);
case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg, pRsp);
case TDMT_VND_CONSUME:
return 0;
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
......
......@@ -14,6 +14,7 @@
*/
#include "vnd.h"
#include "tq.h"
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) {
......@@ -111,9 +112,34 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case TDMT_VND_MQ_SET_CONN: {
char* reqStr = ptr;
SMqSetCVgReq req;
/*tDecodeSMqSetCVgReq(reqStr, &req);*/
// create topic if not exist
tDecodeSMqSetCVgReq(reqStr, &req);
STqClientHandle* pHandle = calloc(sizeof(STqClientHandle), 1);
if (pHandle == NULL) {
// TODO: handle error
}
strcpy(pHandle->topicName, req.topicName);
strcpy(pHandle->cGroup, req.cGroup);
strcpy(pHandle->sql, req.sql);
strcpy(pHandle->logicalPlan, req.logicalPlan);
strcpy(pHandle->physicalPlan, req.physicalPlan);
SArray *pArray;
//TODO: deserialize to SQueryDag
SQueryDag *pDag;
// convert to task
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
// TODO: handle error
}
ASSERT(taosArrayGetSize(pArray) == 0);
STaskInfo *pInfo = taosArrayGet(pArray, 0);
SArray* pTasks;
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
pHandle->buffer.firstOffset = -1;
pHandle->buffer.lastOffset = -1;
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
pHandle->buffer.output[i].pMsg = pMsg;
pHandle->buffer.output[i].status = 0;
}
// write mq meta
}
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册