diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a3d62cc52d6fec125d74765845e354040f9c9df7..7690e535941742ba575d9517f662aeb077fd1a3c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1520,7 +1520,7 @@ typedef struct SMqSetCVgReq { int32_t vgId; int64_t consumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; - char cGroup[TSDB_CONSUMER_GROUP_LEN]; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; char* sql; char* logicalPlan; char* physicalPlan; @@ -1532,7 +1532,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->topicName); - tlen += taosEncodeString(buf, pReq->cGroup); + tlen += taosEncodeString(buf, pReq->cgroup); tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); @@ -1543,7 +1543,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeStringTo(buf, pReq->topicName); - buf = taosDecodeStringTo(buf, pReq->cGroup); + buf = taosDecodeStringTo(buf, pReq->cgroup); buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 4158d7e7c73bf46e01a01e08418592d12dad1c2e..4cf7505f74ec214606f44b86a40b696aad4cd5f0 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -100,7 +100,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { .vgId = pCEp->vgId, .consumerId = consumerId, }; - strcpy(req.cGroup, cgroup); + strcpy(req.cgroup, cgroup); strcpy(req.topicName, topic); strcpy(req.sql, pTopic->sql); strcpy(req.logicalPlan, pTopic->logicalPlan); @@ -168,7 +168,7 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume .vgId = vgId, .consumerId = pConsumer->consumerId, }; - strcpy(req.cGroup, pConsumer->cgroup); + strcpy(req.cgroup, pConsumer->cgroup); strcpy(req.topicName, pTopic->name); strcpy(req.sql, pTopic->sql); strcpy(req.logicalPlan, pTopic->logicalPlan); diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index e394297ab8db0021193bbdb856397aa21bc0a634..ec71777882ba72769c90d9d222dbd949a0733090 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -297,7 +297,7 @@ typedef struct STQ { STqCfg* tqConfig; STqMemRef tqMemRef; STqMetaStore* tqMeta; - SWal * pWal; + SWal* pWal; } STQ; typedef struct STqMgmt { @@ -331,7 +331,8 @@ int tqRegisterContext(STqGroup*, void* ahandle); int tqSendLaunchQuery(STqMsgItem*, int64_t offset); #endif -int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp); +int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp); +int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq); typedef struct STqReadHandle { int64_t ver; @@ -340,13 +341,15 @@ typedef struct STqReadHandle { SSubmitMsgIter msgIter; SSubmitBlkIter blkIter; SMeta* pMeta; + SArray* pColumnIdList; } STqReadHandle; -STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg); +STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList); +void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle* pHandle); int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo); // return SArray -SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList); +SArray* tqRetrieveDataBlock(STqReadHandle* pHandle); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 15c797125250c2a4ff328e9ea52459f6daaaf10b..ead856a06b17ccdb3cb01e1645d1c96860c10720 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -606,7 +606,7 @@ int tqItemSSize() { return 0; } -int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { +int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { SMqCVConsumeReq* pReq = pMsg->pCont; int64_t reqId = pReq->reqId; int64_t consumerId = pReq->consumerId; @@ -623,6 +623,7 @@ int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1); if (old == 1) { // do nothing + continue; } if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) { // TODO @@ -635,7 +636,17 @@ int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg; + // TODO: launch query and get output data void* outputData; + pHandle->buffer.output[pos].dst = outputData; + if (pHandle->buffer.firstOffset == -1 + || pReq->offset < pHandle->buffer.firstOffset) { + pHandle->buffer.firstOffset = pReq->offset; + } + if (pHandle->buffer.lastOffset == -1 + || pReq->offset > pHandle->buffer.lastOffset) { + pHandle->buffer.lastOffset = pReq->offset; + } atomic_store_8(&pHandle->buffer.output[pos].status, 1); // put output into rsp @@ -647,18 +658,64 @@ int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { return 0; } -STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg) { +int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { + STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1); + if (pConsumer == NULL) { + return -1; + } + + STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); + if (pTopic == NULL) { + free(pConsumer); + return -1; + } + strcpy(pTopic->topicName, pReq->topicName); + strcpy(pTopic->cgroup, pReq->cgroup); + strcpy(pTopic->sql, pReq->sql); + strcpy(pTopic->logicalPlan, pReq->logicalPlan); + strcpy(pTopic->physicalPlan, pReq->physicalPlan); + SArray *pArray; + //TODO: deserialize to SQueryDag + SQueryDag *pDag; + // convert to task + if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { + // TODO: handle error + } + ASSERT(taosArrayGetSize(pArray) == 0); + STaskInfo *pInfo = taosArrayGet(pArray, 0); + SArray* pTasks; + schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); + pTopic->buffer.firstOffset = -1; + pTopic->buffer.lastOffset = -1; + for (int i = 0; i < TQ_BUFFER_SIZE; i++) { + SSubQueryMsg* pMsg = taosArrayGet(pTasks, i); + pTopic->buffer.output[i].pMsg = pMsg; + pTopic->buffer.output[i].status = 0; + } + pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); + // write mq meta + return 0; +} + +STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList) { STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle)); if (pReadHandle == NULL) { return NULL; } pReadHandle->pMeta = pMeta; - pReadHandle->pMsg = pMsg; - tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter); + pReadHandle->pMsg = NULL; pReadHandle->ver = -1; + pReadHandle->pColumnIdList = pColumnIdList; return NULL; } +void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) { + pReadHandle->pMsg = pMsg; + tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter); + pReadHandle->ver = ver; + memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); +} + bool tqNextDataBlock(STqReadHandle* pHandle) { if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { return false; @@ -676,7 +733,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) // TODO: filter out unused column return 0; } -SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) { +SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { int32_t sversion = pHandle->pBlock->sversion; SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true); STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion); @@ -691,11 +748,6 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) { return NULL; } - for (int i = 0; i < pTschema->numOfCols; i++) { - // TODO: filter out unused column - taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId)); - } - SMemRow row; int32_t kvIdx; while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index fb36e82dec1a948e901db55879321a2173bf3707..a01960b77371d6226748318ce507a1a0dc08de89 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -58,7 +58,7 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { case TDMT_VND_TABLE_META: return vnodeGetTableMeta(pVnode, pMsg, pRsp); case TDMT_VND_CONSUME: - return tqProcessConsume(pVnode->pTq, pMsg, pRsp); + return tqProcessConsumeReq(pVnode->pTq, pMsg, pRsp); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 6e2d438970f00fa3c87d8ebadece933ee4cbb9c8..d1b529f7fb7d9587dfa700b4f7d922c8c0f66205 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "vnd.h" #include "tq.h" +#include "vnd.h" int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { @@ -34,7 +34,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); // ser request version - void * pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + void *pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int64_t ver = pVnode->state.processed++; taosEncodeFixedU64(&pBuf, ver); @@ -53,7 +53,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { SVCreateTbReq vCreateTbReq; SVCreateTbBatchReq vCreateTbBatchReq; - void * ptr = vnodeMalloc(pVnode, pMsg->contLen); + void *ptr = vnodeMalloc(pVnode, pMsg->contLen); if (ptr == NULL) { // TODO: handle error } @@ -110,43 +110,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } break; case TDMT_VND_MQ_SET_CONN: { - //TODO: wrap in a function - char* reqStr = ptr; SMqSetCVgReq req; - tDecodeSMqSetCVgReq(reqStr, &req); - STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1); - - STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); - if (pTopic == NULL) { - // TODO: handle error - } - strcpy(pTopic->topicName, req.topicName); - strcpy(pTopic->cgroup, req.cGroup); - strcpy(pTopic->sql, req.sql); - strcpy(pTopic->logicalPlan, req.logicalPlan); - strcpy(pTopic->physicalPlan, req.physicalPlan); - SArray *pArray; - //TODO: deserialize to SQueryDag - SQueryDag *pDag; - // convert to task - if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { - // TODO: handle error - } - ASSERT(taosArrayGetSize(pArray) == 0); - STaskInfo *pInfo = taosArrayGet(pArray, 0); - SArray* pTasks; - schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); - pTopic->buffer.firstOffset = -1; - pTopic->buffer.lastOffset = -1; - for (int i = 0; i < TQ_BUFFER_SIZE; i++) { - SSubQueryMsg* pMsg = taosArrayGet(pTasks, i); - pTopic->buffer.output[i].pMsg = pMsg; - pTopic->buffer.output[i].status = 0; + tDecodeSMqSetCVgReq(ptr, &req); + if (tqProcessSetConnReq(pVnode->pTq, &req) < 0) { } - pTopic->pReadhandle = walOpenReadHandle(pVnode->pTq->pWal); - // write mq meta - } - break; + } break; default: ASSERT(0); break;