未验证 提交 78222a17 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11706 from taosdata/feature/tq

refactor(tmq): rewrite tq read function
...@@ -30,6 +30,7 @@ typedef struct SPlanContext { ...@@ -30,6 +30,7 @@ typedef struct SPlanContext {
SNode* pAstRoot; SNode* pAstRoot;
bool topicQuery; bool topicQuery;
bool streamQuery; bool streamQuery;
bool rSmaQuery;
bool showRewrite; bool showRewrite;
int8_t triggerType; int8_t triggerType;
int64_t watermark; int64_t watermark;
......
...@@ -29,6 +29,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -29,6 +29,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -34,6 +34,54 @@ ...@@ -34,6 +34,54 @@
extern bool tsStreamSchedV; extern bool tsStreamSchedV;
int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen) {
SNode* pAst = NULL;
SQueryPlan* pPlan = NULL;
terrno = TSDB_CODE_SUCCESS;
if (nodesStringToNode(ast, &pAst) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.rSmaQuery = true,
.triggerType = triggerType,
.watermark = watermark,
};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
if (levelNum != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
int32_t opNum = LIST_LENGTH(inner->pNodeList);
if (opNum != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
if (qSubPlanToString(plan, pStr, pLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
END:
if (pAst) nodesDestroyNode(pAst);
if (pPlan) nodesDestroyNode(pPlan);
return terrno;
}
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) { int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
SCoder encoder; SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
......
...@@ -109,8 +109,7 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList ...@@ -109,8 +109,7 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlock(STqReadHandle *pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows);
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
// need to reposition // need to reposition
......
...@@ -82,16 +82,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { ...@@ -82,16 +82,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
return false; return false;
} }
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows) {
// currently only rows are used
pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
pBlockInfo->rows = pHandle->pBlock->numOfRows;
// pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData.
return 0;
}
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
/*int32_t sversion = pHandle->pBlock->sversion;*/ /*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion // TODO set to real sversion
int32_t sversion = 0; int32_t sversion = 0;
...@@ -112,7 +103,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -112,7 +103,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
STSchema* pTschema = pHandle->pSchema; STSchema* pTschema = pHandle->pSchema;
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
int32_t numOfRows = pHandle->pBlock->numOfRows; *pNumOfRows = pHandle->pBlock->numOfRows;
/*int32_t numOfCols = pHandle->pSchema->numOfCols;*/ /*int32_t numOfCols = pHandle->pSchema->numOfCols;*/
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
...@@ -120,10 +111,11 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -120,10 +111,11 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
colNumNeed = pSchemaWrapper->nCols; colNumNeed = pSchemaWrapper->nCols;
} }
SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData)); *ppCols = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
if (pArray == NULL) { if (*ppCols == NULL) {
return NULL; return -1;
} }
int32_t colMeta = 0; int32_t colMeta = 0;
int32_t colNeed = 0; int32_t colNeed = 0;
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) { while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
...@@ -136,21 +128,24 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -136,21 +128,24 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
colNeed++; colNeed++;
} else { } else {
SColumnInfoData colInfo = {0}; SColumnInfoData colInfo = {0};
/*int sz = numOfRows * pColSchema->bytes;*/
colInfo.info.bytes = pColSchema->bytes; colInfo.info.bytes = pColSchema->bytes;
colInfo.info.colId = pColSchema->colId; colInfo.info.colId = pColSchema->colId;
colInfo.info.type = pColSchema->type; colInfo.info.type = pColSchema->type;
if (colInfoDataEnsureCapacity(&colInfo, 0, numOfRows) < 0) { if (colInfoDataEnsureCapacity(&colInfo, 0, *pNumOfRows) < 0) {
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); goto FAIL;
return NULL;
} }
taosArrayPush(pArray, &colInfo); taosArrayPush(*ppCols, &colInfo);
colMeta++; colMeta++;
colNeed++; colNeed++;
} }
} }
int32_t colActual = taosArrayGetSize(*ppCols);
// TODO in stream shuffle case, fetch groupId
*pGroupId = 0;
STSRowIter iter = {0}; STSRowIter iter = {0};
tdSTSRowIterInit(&iter, pTschema); tdSTSRowIterInit(&iter, pTschema);
STSRow* row; STSRow* row;
...@@ -159,22 +154,22 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -159,22 +154,22 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row); tdSTSRowIterReset(&iter, row);
// get all wanted col of that block // get all wanted col of that block
int32_t colTot = taosArrayGetSize(pArray); for (int32_t i = 0; i < colActual; i++) {
for (int32_t i = 0; i < colTot; i++) { SColumnInfoData* pColData = taosArrayGet(*ppCols, i);
SColumnInfoData* pColData = taosArrayGet(pArray, i);
SCellVal sVal = {0}; SCellVal sVal = {0};
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) { if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
break; break;
} }
/*if (colDataAppend(pColData, curRow, sVal.val, false) < 0) {*/
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) { if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); goto FAIL;
return NULL;
} }
} }
curRow++; curRow++;
} }
return pArray; return 0;
FAIL:
taosArrayDestroy(*ppCols);
return -1;
} }
void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册