未验证 提交 5f36406b 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14724 from taosdata/feature/stream

refactor(stream)
此差异已折叠。
......@@ -129,7 +129,7 @@ typedef struct {
static STqMgmt tqMgmt = {0};
// tqRead
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* offset);
int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* offset);
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec
......
......@@ -112,7 +112,8 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
};
tmsgSendRsp(&resp);
tqDebug("vgId:%d from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64,
tqDebug("vgId:%d from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, reqOffset:%" PRId64
", rspOffset:%" PRId64,
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->reqOffset, pRsp->rspOffset);
return 0;
......@@ -179,8 +180,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
tDecoderClear(&decoder);
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, offset.subKey,
TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
} else if (offset.val.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
TD_VID(pTq->pVnode), offset.val.version);
......@@ -316,9 +317,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
// 3.query
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
fetchOffsetNew.version++;
if (tqScanLog(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
fetchOffsetNew.version++;
}
if (tqScan(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
ASSERT(0);
code = -1;
goto OVER;
......@@ -333,7 +336,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
goto OVER;
}
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
int64_t fetchVer = fetchOffsetNew.version + 1;
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
......@@ -411,6 +414,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
taosMemoryFree(pCkHead);
#if 0
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqInfo("retrieve using snapshot actual offset: uid %" PRId64 " ts %" PRId64, fetchOffsetNew.uid, fetchOffsetNew.ts);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
......@@ -421,6 +425,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
#endif
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) {
ASSERT(0);
}
......
......@@ -59,15 +59,17 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
return 0;
}
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
qTaskInfo_t task = pExec->execCol.task[0];
if (qStreamPrepareScan(task, pOffset) < 0) {
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
pRsp->rspOffset = *pOffset;
pRsp->rspOffset.version--;
return 0;
}
int32_t rowCnt = 0;
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
......@@ -77,12 +79,21 @@ int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOff
if (pDataBlock != NULL) {
tqAddBlockDataToRsp(pDataBlock, pRsp);
pRsp->blockNum++;
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[0]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader[0]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
} else {
pRsp->withTbName = 0;
}
}
if (pOffset->type == TMQ_OFFSET__LOG) {
continue;
} else {
rowCnt += pDataBlock->info.rows;
if (rowCnt <= 4096) continue;
}
pRsp->blockNum++;
continue;
}
void* meta = qStreamExtractMetaMsg(task);
......@@ -94,18 +105,19 @@ int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOff
ASSERT(0);
}
ASSERT(pRsp->rspOffset.type != 0);
if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version);
}
ASSERT(pRsp->rspOffset.type != 0);
break;
}
return 0;
}
#if 0
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) {
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->execCol.task[workerId];
......@@ -153,6 +165,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
return 0;
}
#endif
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) {
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
......
......@@ -144,6 +144,7 @@ typedef struct {
void* metaBlk; // for tmq fetching meta
SSDataBlock* pullOverBlk; // for streaming
SWalFilterCond cond;
int64_t lastScanUid;
} SStreamTaskInfo;
typedef struct SExecTaskInfo {
......@@ -298,10 +299,12 @@ typedef struct STableScanInfo {
uint64_t queryId; // todo remove it
uint64_t taskId; // todo remove it
#if 0
struct {
uint64_t uid;
int64_t ts;
} lastStatus;
#endif
int8_t scanMode;
int8_t noTable;
......
......@@ -336,8 +336,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
pTableScanInfo->currentTable, tableSz);
} else {
// switch to log
}
} else {
......@@ -353,6 +351,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
return 0;
}
#if 0
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
......@@ -372,3 +371,4 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
}
#endif
......@@ -2463,7 +2463,7 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
blockDataDestroy(pInfo->binfo.pRes);
cleanupAggSup(&pInfo->aggSup);
taosMemoryFreeClear(param);
}
......@@ -2844,7 +2844,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
}
}
}
#if 0
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
uint8_t type = pOperator->operatorType;
......@@ -2930,6 +2930,7 @@ int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
return TSDB_CODE_SUCCESS;
}
#endif
// this is a blocking operator
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
......@@ -3341,8 +3342,8 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
int64_t ekey = Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey
: pInfo->existNewGroupBlock->info.window.ekey;
int64_t ekey =
Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
......@@ -3678,14 +3679,14 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
cleanupBasicInfo(pInfo);
taosMemoryFreeClear(param);
}
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(param);
}
......@@ -3694,7 +3695,7 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosMemoryFreeClear(pInfo->p);
taosMemoryFreeClear(param);
}
......@@ -3706,7 +3707,7 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
taosArrayDestroy(pInfo->pPseudoColInfo);
taosMemoryFreeClear(param);
}
......@@ -3724,7 +3725,7 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pPseudoColInfo);
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFreeClear(param);
}
......@@ -3743,7 +3744,7 @@ void doDestroyExchangeOperatorInfo(void* param) {
}
tsem_destroy(&pExInfo->ready);
taosMemoryFreeClear(param);
}
......@@ -3972,7 +3973,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
pInfo->win = win;
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
taosMemoryFree(pInfo->pFillInfo);
taosMemoryFree(pInfo->p);
......@@ -4465,7 +4466,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pPhyNode->pConditions, pTaskInfo);
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
pPhyNode->pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
......@@ -4504,8 +4506,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr =
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pPhyNode->pConditions, pTaskInfo);
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
pPhyNode->pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
......@@ -4527,7 +4529,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
SColumn col = extractColumnFromColumnNode(pColNode);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions, pTaskInfo);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions,
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
......@@ -4798,7 +4801,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
return TSDB_CODE_OUT_OF_MEMORY;
}
pInserterParam->readHandle = readHandle;
*pParam = pInserterParam;
break;
}
......
......@@ -427,8 +427,11 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
// todo refactor
pTableScanInfo->lastStatus.uid = pBlock->info.uid;
pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;
/*pTableScanInfo->lastStatus.uid = pBlock->info.uid;*/
/*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid;
pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
ASSERT(pBlock->info.uid != 0);
return pBlock;
......@@ -1231,9 +1234,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
doFilter(pInfo->pCondition, pInfo->pRes);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0) {
return 0;
}
return 0;
}
......@@ -1259,7 +1259,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
/*pTaskInfo->streamInfo.lastStatus = ret.offset;*/
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
/*} else {*/
} else {
// data is filtered out, do clean
/*tDeleteSSDataBlock(&ret.data);*/
}
} else if (ret.fetchType == FETCH_TYPE__META) {
......@@ -1268,13 +1270,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.metaBlk = ret.meta;
return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) {
/*if (ret.offset.version == -1) {*/
/*pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;*/
/*pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1;*/
/*} else {*/
pTaskInfo->streamInfo.lastStatus = ret.offset;
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version);
/*}*/
return NULL;
} else {
ASSERT(0);
......@@ -1394,72 +1391,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return NULL;
}
pInfo->pRes->info.rows = block.info.rows;
pInfo->pRes->info.uid = block.info.uid;
pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.capacity = block.info.rows;
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &block.info.uid, sizeof(int64_t));
if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre;
} else {
pInfo->pRes->info.groupId = 0;
}
// for generating rollup SMA result, each time is an independent time serie.
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
if (pInfo->assignBlockUid) {
pInfo->pRes->info.groupId = block.info.uid;
}
// todo extract method
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) {
continue;
}
bool colExists = false;
for (int32_t j = 0; j < blockDataGetNumOfCols(&block); ++j) {
SColumnInfoData* pResCol = bdGetColumnInfoData(&block, j);
if (pResCol->info.colId == pColMatchInfo->colId) {
taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
colExists = true;
break;
}
}
setBlockIntoRes(pInfo, &block);
// the required column does not exists in submit block, let's set it to be all null value
if (!colExists) {
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
}
}
taosArrayDestroy(block.pDataBlock);
ASSERT(pInfo->pRes->pDataBlock != NULL);
#if 0
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE;
pTaskInfo->code = terrno;
return NULL;
}
#endif
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
}
doFilter(pInfo->pCondition, pInfo->pRes);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0) {
break;
}
......@@ -1489,12 +1422,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
#if 0
} else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) {
/*ASSERT(0);*/
ASSERT(0);
// check reader last status
// if not match, reset status
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
return pResult && pResult->info.rows > 0 ? pResult : NULL;
#endif
} else {
ASSERT(0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册