未验证 提交 cc80848b 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #9935 from taosdata/feature/tq

refine tqRead interface
...@@ -1520,7 +1520,7 @@ typedef struct SMqSetCVgReq { ...@@ -1520,7 +1520,7 @@ typedef struct SMqSetCVgReq {
int32_t vgId; int32_t vgId;
int64_t consumerId; int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cGroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
...@@ -1532,7 +1532,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* ...@@ -1532,7 +1532,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeString(buf, pReq->topicName); tlen += taosEncodeString(buf, pReq->topicName);
tlen += taosEncodeString(buf, pReq->cGroup); tlen += taosEncodeString(buf, pReq->cgroup);
tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan);
...@@ -1543,7 +1543,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { ...@@ -1543,7 +1543,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeStringTo(buf, pReq->topicName); buf = taosDecodeStringTo(buf, pReq->topicName);
buf = taosDecodeStringTo(buf, pReq->cGroup); buf = taosDecodeStringTo(buf, pReq->cgroup);
buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan);
......
...@@ -100,7 +100,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -100,7 +100,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
.vgId = pCEp->vgId, .vgId = pCEp->vgId,
.consumerId = consumerId, .consumerId = consumerId,
}; };
strcpy(req.cGroup, cgroup); strcpy(req.cgroup, cgroup);
strcpy(req.topicName, topic); strcpy(req.topicName, topic);
strcpy(req.sql, pTopic->sql); strcpy(req.sql, pTopic->sql);
strcpy(req.logicalPlan, pTopic->logicalPlan); strcpy(req.logicalPlan, pTopic->logicalPlan);
...@@ -168,7 +168,7 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume ...@@ -168,7 +168,7 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
.vgId = vgId, .vgId = vgId,
.consumerId = pConsumer->consumerId, .consumerId = pConsumer->consumerId,
}; };
strcpy(req.cGroup, pConsumer->cgroup); strcpy(req.cgroup, pConsumer->cgroup);
strcpy(req.topicName, pTopic->name); strcpy(req.topicName, pTopic->name);
strcpy(req.sql, pTopic->sql); strcpy(req.sql, pTopic->sql);
strcpy(req.logicalPlan, pTopic->logicalPlan); strcpy(req.logicalPlan, pTopic->logicalPlan);
......
...@@ -297,7 +297,7 @@ typedef struct STQ { ...@@ -297,7 +297,7 @@ typedef struct STQ {
STqCfg* tqConfig; STqCfg* tqConfig;
STqMemRef tqMemRef; STqMemRef tqMemRef;
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
SWal * pWal; SWal* pWal;
} STQ; } STQ;
typedef struct STqMgmt { typedef struct STqMgmt {
...@@ -331,7 +331,8 @@ int tqRegisterContext(STqGroup*, void* ahandle); ...@@ -331,7 +331,8 @@ int tqRegisterContext(STqGroup*, void* ahandle);
int tqSendLaunchQuery(STqMsgItem*, int64_t offset); int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
#endif #endif
int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp); int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq);
typedef struct STqReadHandle { typedef struct STqReadHandle {
int64_t ver; int64_t ver;
...@@ -340,13 +341,15 @@ typedef struct STqReadHandle { ...@@ -340,13 +341,15 @@ typedef struct STqReadHandle {
SSubmitMsgIter msgIter; SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter; SSubmitBlkIter blkIter;
SMeta* pMeta; SMeta* pMeta;
SArray* pColumnIdList;
} STqReadHandle; } STqReadHandle;
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg); STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList);
void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle* pHandle); bool tqNextDataBlock(STqReadHandle* pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo); int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);
// return SArray<SColumnInfoData> // return SArray<SColumnInfoData>
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList); SArray* tqRetrieveDataBlock(STqReadHandle* pHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -606,7 +606,7 @@ int tqItemSSize() { ...@@ -606,7 +606,7 @@ int tqItemSSize() {
return 0; return 0;
} }
int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
SMqCVConsumeReq* pReq = pMsg->pCont; SMqCVConsumeReq* pReq = pMsg->pCont;
int64_t reqId = pReq->reqId; int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
...@@ -623,6 +623,7 @@ int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { ...@@ -623,6 +623,7 @@ int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1); int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1);
if (old == 1) { if (old == 1) {
// do nothing // do nothing
continue;
} }
if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) { if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) {
// TODO // TODO
...@@ -635,7 +636,17 @@ int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { ...@@ -635,7 +636,17 @@ int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg; SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;
// TODO: launch query and get output data
void* outputData; void* outputData;
pHandle->buffer.output[pos].dst = outputData;
if (pHandle->buffer.firstOffset == -1
|| pReq->offset < pHandle->buffer.firstOffset) {
pHandle->buffer.firstOffset = pReq->offset;
}
if (pHandle->buffer.lastOffset == -1
|| pReq->offset > pHandle->buffer.lastOffset) {
pHandle->buffer.lastOffset = pReq->offset;
}
atomic_store_8(&pHandle->buffer.output[pos].status, 1); atomic_store_8(&pHandle->buffer.output[pos].status, 1);
// put output into rsp // put output into rsp
...@@ -647,18 +658,64 @@ int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { ...@@ -647,18 +658,64 @@ int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
return 0; return 0;
} }
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg) { int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
if (pConsumer == NULL) {
return -1;
}
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
if (pTopic == NULL) {
free(pConsumer);
return -1;
}
strcpy(pTopic->topicName, pReq->topicName);
strcpy(pTopic->cgroup, pReq->cgroup);
strcpy(pTopic->sql, pReq->sql);
strcpy(pTopic->logicalPlan, pReq->logicalPlan);
strcpy(pTopic->physicalPlan, pReq->physicalPlan);
SArray *pArray;
//TODO: deserialize to SQueryDag
SQueryDag *pDag;
// convert to task
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
// TODO: handle error
}
ASSERT(taosArrayGetSize(pArray) == 0);
STaskInfo *pInfo = taosArrayGet(pArray, 0);
SArray* pTasks;
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1;
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
pTopic->buffer.output[i].pMsg = pMsg;
pTopic->buffer.output[i].status = 0;
}
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
// write mq meta
return 0;
}
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList) {
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle)); STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
if (pReadHandle == NULL) { if (pReadHandle == NULL) {
return NULL; return NULL;
} }
pReadHandle->pMeta = pMeta; pReadHandle->pMeta = pMeta;
pReadHandle->pMsg = pMsg; pReadHandle->pMsg = NULL;
tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
pReadHandle->ver = -1; pReadHandle->ver = -1;
pReadHandle->pColumnIdList = pColumnIdList;
return NULL; return NULL;
} }
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg;
tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
pReadHandle->ver = ver;
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
}
bool tqNextDataBlock(STqReadHandle* pHandle) { bool tqNextDataBlock(STqReadHandle* pHandle) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false; return false;
...@@ -676,7 +733,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) ...@@ -676,7 +733,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
// TODO: filter out unused column // TODO: filter out unused column
return 0; return 0;
} }
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) { SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
int32_t sversion = pHandle->pBlock->sversion; int32_t sversion = pHandle->pBlock->sversion;
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true); SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion); STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
...@@ -691,11 +748,6 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) { ...@@ -691,11 +748,6 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
return NULL; return NULL;
} }
for (int i = 0; i < pTschema->numOfCols; i++) {
// TODO: filter out unused column
taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId));
}
SMemRow row; SMemRow row;
int32_t kvIdx; int32_t kvIdx;
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
......
...@@ -58,7 +58,7 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -58,7 +58,7 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg, pRsp); return vnodeGetTableMeta(pVnode, pMsg, pRsp);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessConsume(pVnode->pTq, pMsg, pRsp); return tqProcessConsumeReq(pVnode->pTq, pMsg, pRsp);
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "vnd.h"
#include "tq.h" #include "tq.h"
#include "vnd.h"
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
...@@ -34,7 +34,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { ...@@ -34,7 +34,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
// ser request version // ser request version
void * pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); void *pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int64_t ver = pVnode->state.processed++; int64_t ver = pVnode->state.processed++;
taosEncodeFixedU64(&pBuf, ver); taosEncodeFixedU64(&pBuf, ver);
...@@ -53,7 +53,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { ...@@ -53,7 +53,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SVCreateTbReq vCreateTbReq; SVCreateTbReq vCreateTbReq;
SVCreateTbBatchReq vCreateTbBatchReq; SVCreateTbBatchReq vCreateTbBatchReq;
void * ptr = vnodeMalloc(pVnode, pMsg->contLen); void *ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) { if (ptr == NULL) {
// TODO: handle error // TODO: handle error
} }
...@@ -110,43 +110,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -110,43 +110,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
break; break;
case TDMT_VND_MQ_SET_CONN: { case TDMT_VND_MQ_SET_CONN: {
//TODO: wrap in a function
char* reqStr = ptr;
SMqSetCVgReq req; SMqSetCVgReq req;
tDecodeSMqSetCVgReq(reqStr, &req); tDecodeSMqSetCVgReq(ptr, &req);
STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1); if (tqProcessSetConnReq(pVnode->pTq, &req) < 0) {
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
if (pTopic == NULL) {
// TODO: handle error
}
strcpy(pTopic->topicName, req.topicName);
strcpy(pTopic->cgroup, req.cGroup);
strcpy(pTopic->sql, req.sql);
strcpy(pTopic->logicalPlan, req.logicalPlan);
strcpy(pTopic->physicalPlan, req.physicalPlan);
SArray *pArray;
//TODO: deserialize to SQueryDag
SQueryDag *pDag;
// convert to task
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
// TODO: handle error
}
ASSERT(taosArrayGetSize(pArray) == 0);
STaskInfo *pInfo = taosArrayGet(pArray, 0);
SArray* pTasks;
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1;
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
pTopic->buffer.output[i].pMsg = pMsg;
pTopic->buffer.output[i].status = 0;
} }
pTopic->pReadhandle = walOpenReadHandle(pVnode->pTq->pWal); } break;
// write mq meta
}
break;
default: default:
ASSERT(0); ASSERT(0);
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册