未验证 提交 fa145e68 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #9963 from taosdata/feature/tq

consume skip ununsed table
...@@ -1590,8 +1590,8 @@ typedef struct SMqCVConsumeReq { ...@@ -1590,8 +1590,8 @@ typedef struct SMqCVConsumeReq {
typedef struct SMqConsumeRspBlock { typedef struct SMqConsumeRspBlock {
int32_t bodyLen; int32_t bodyLen;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char body[]; char body[];
} SMqConsumeRspBlock; } SMqConsumeRspBlock;
typedef struct SMqCVConsumeRsp { typedef struct SMqCVConsumeRsp {
......
...@@ -68,12 +68,13 @@ typedef struct { ...@@ -68,12 +68,13 @@ typedef struct {
typedef struct STqReadHandle { typedef struct STqReadHandle {
int64_t ver; int64_t ver;
int64_t tbUid;
SSubmitMsg* pMsg; SSubmitMsg* pMsg;
SSubmitBlk* pBlock; SSubmitBlk* pBlock;
SSubmitMsgIter msgIter; SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter; SSubmitBlkIter blkIter;
SMeta* pMeta; SMeta* pMeta;
SArray* pColumnIdList; SArray* pColIdList;
} STqReadHandle; } STqReadHandle;
/* ------------------------ SVnode ------------------------ */ /* ------------------------ SVnode ------------------------ */
...@@ -199,8 +200,12 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); ...@@ -199,8 +200,12 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta); STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta);
static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColumnIdList) { static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) {
pReadHandle->pColumnIdList = pColumnIdList; pReadHandle->pColIdList = pColIdList;
}
static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) {
pHandle->tbUid = tbUid;
} }
void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver); void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver);
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "tqInt.h" #include "tqInt.h"
#include "tqMetaStore.h" #include "tqMetaStore.h"
#include "tcompare.h"
// static // static
// read next version data // read next version data
...@@ -424,7 +425,7 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { ...@@ -424,7 +425,7 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
/*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/ /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
/*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/ /*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/
/*if (code < 0) {*/ /*if (code < 0) {*/
// TODO: error // TODO: error
/*}*/ /*}*/
// get msgType // get msgType
// if submitblk // if submitblk
...@@ -610,61 +611,91 @@ int tqItemSSize() { ...@@ -610,61 +611,91 @@ int tqItemSSize() {
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
SMqCVConsumeReq* pReq = pMsg->pCont; SMqCVConsumeReq* pReq = pMsg->pCont;
SRpcMsg rpcMsg;
int64_t reqId = pReq->reqId; int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
int64_t reqOffset = pReq->offset; int64_t reqOffset = pReq->offset;
int64_t fetchOffset = reqOffset; int64_t fetchOffset = reqOffset;
int64_t blockingTime = pReq->blockingTime; int64_t blockingTime = pReq->blockingTime;
int rspLen = 0;
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
int sz = taosArrayGetSize(pConsumer->topics); int sz = taosArrayGetSize(pConsumer->topics);
for (int i = 0 ; i < sz; i++) { for (int i = 0; i < sz; i++) {
STqTopicHandle *pTopic = taosArrayGet(pConsumer->topics, i); STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
int8_t pos = fetchOffset % TQ_BUFFER_SIZE; int8_t pos;
int8_t old = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1); int8_t skip = 0;
if (old == 1) { SWalHead* pHead;
// do nothing
continue;
}
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
return -1;
}
SWalHead* pHead = pTopic->pReadhandle->pHead;
while (1) { while (1) {
pos = fetchOffset % TQ_BUFFER_SIZE;
skip = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1);
if (skip == 1) {
// do nothing
break;
}
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
// check err
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
break;
}
// read until find TDMT_VND_SUBMIT // read until find TDMT_VND_SUBMIT
pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) {
break;
}
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
return -1; atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
break;
} }
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
fetchOffset++;
} }
if (skip == 1) continue;
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
void* task = pTopic->buffer.output[pos].task; qTaskInfo_t task = pTopic->buffer.output[pos].task;
qSetStreamInput(task, pCont); qSetStreamInput(task, pCont);
SSDataBlock* pDataBlock;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
//SArray<SSDataBlock>
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
break;
}
if (pDataBlock != NULL) {
taosArrayPush(pRes, pDataBlock);
} else {
break;
}
}
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
if (taosArrayGetSize(pRes) == 0) {
taosArrayDestroy(pRes);
fetchOffset++;
continue;
} }
// TODO: launch query and get output data
pTopic->buffer.output[pos].dst = pDataBlock; pTopic->buffer.output[pos].dst = pRes;
if (pTopic->buffer.firstOffset == -1 if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) {
|| pReq->offset < pTopic->buffer.firstOffset) {
pTopic->buffer.firstOffset = pReq->offset; pTopic->buffer.firstOffset = pReq->offset;
} }
if (pTopic->buffer.lastOffset == -1 if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) {
|| pReq->offset > pTopic->buffer.lastOffset) {
pTopic->buffer.lastOffset = pReq->offset; pTopic->buffer.lastOffset = pReq->offset;
} }
atomic_store_8(&pTopic->buffer.output[pos].status, 1);
// put output into rsp // put output into rsp
} }
// launch query // launch query
// get result // get result
SMqCvConsumeRsp* pRsp;
return 0; return 0;
} }
...@@ -673,14 +704,14 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { ...@@ -673,14 +704,14 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
if (pConsumer == NULL) { if (pConsumer == NULL) {
return -1; return -1;
} }
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
if (pTopic == NULL) { if (pTopic == NULL) {
free(pConsumer); free(pConsumer);
return -1; return -1;
} }
strcpy(pTopic->topicName, pReq->topicName); strcpy(pTopic->topicName, pReq->topicName);
strcpy(pTopic->cgroup, pReq->cgroup); strcpy(pTopic->cgroup, pReq->cgroup);
strcpy(pTopic->sql, pReq->sql); strcpy(pTopic->sql, pReq->sql);
strcpy(pTopic->logicalPlan, pReq->logicalPlan); strcpy(pTopic->logicalPlan, pReq->logicalPlan);
strcpy(pTopic->physicalPlan, pReq->physicalPlan); strcpy(pTopic->physicalPlan, pReq->physicalPlan);
...@@ -689,7 +720,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { ...@@ -689,7 +720,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
pTopic->buffer.lastOffset = -1; pTopic->buffer.lastOffset = -1;
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
if (pTopic->pReadhandle == NULL) { if (pTopic->pReadhandle == NULL) {
} }
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
...@@ -708,7 +738,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { ...@@ -708,7 +738,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
pReadHandle->pMeta = pMeta; pReadHandle->pMeta = pMeta;
pReadHandle->pMsg = NULL; pReadHandle->pMsg = NULL;
pReadHandle->ver = -1; pReadHandle->ver = -1;
pReadHandle->pColumnIdList = NULL; pReadHandle->pColIdList = NULL;
return NULL; return NULL;
} }
...@@ -720,20 +750,18 @@ void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ve ...@@ -720,20 +750,18 @@ void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ve
} }
bool tqNextDataBlock(STqReadHandle* pHandle) { bool tqNextDataBlock(STqReadHandle* pHandle) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) >= 0) {
return false; if (pHandle->tbUid == pHandle->pBlock->uid) return true;
} }
return true; return false;
} }
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
SMemRow row; /*int32_t sversion = pHandle->pBlock->sversion;*/
int32_t sversion = pHandle->pBlock->sversion; /*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/
SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false); pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
pBlockInfo->numOfCols = pSchema->nCols;
pBlockInfo->rows = pHandle->pBlock->numOfRows; pBlockInfo->rows = pHandle->pBlock->numOfRows;
pBlockInfo->uid = pHandle->pBlock->uid; pBlockInfo->uid = pHandle->pBlock->uid;
// TODO: filter out unused column
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册