提交 8a6e4b87 编写于 作者: L Liu Jicong

refactor(stream): simple batch

上级 ba8d02e6
......@@ -186,9 +186,12 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
void* qExtractReaderFromStreamScanner(void* scanner);
void* qExtractReaderFromStreamScanner(void* scanner);
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem);
#ifdef __cplusplus
}
#endif
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "os.h"
#include "query.h"
#include "tdatablock.h"
......@@ -120,7 +121,6 @@ static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue
static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
if (dequeueFlag == STREAM_QUEUE__FAILED) {
ASSERT(0);
ASSERT(queue->qItem != NULL);
return streamQueueCurItem(queue);
} else {
......@@ -309,12 +309,16 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
}
qInfo("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
// qStreamInput(pTask->exec.executor, pSubmitClone);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (pItem->type == STREAM_INPUT__TRIGGER) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
}
if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
......
......@@ -8,7 +8,7 @@ add_library(executor STATIC ${EXECUTOR_SRC})
# )
target_link_libraries(executor
PRIVATE os util common function parser planner qcom vnode scalar nodes index
PRIVATE os util common function parser planner qcom vnode scalar nodes index stream
)
target_include_directories(
......
......@@ -39,6 +39,7 @@ extern "C" {
#include "tmsg.h"
#include "tpagedbuf.h"
#include "tstreamUpdate.h"
#include "tstream.h"
#include "vnode.h"
#include "executorInt.h"
......@@ -139,12 +140,14 @@ typedef struct STaskIdInfo {
} STaskIdInfo;
typedef struct {
//TODO remove prepareStatus
STqOffsetVal prepareStatus; // for tmq
STqOffsetVal lastStatus; // for tmq
void* metaBlk; // for tmq fetching meta
SSDataBlock* pullOverBlk; // for streaming
SWalFilterCond cond;
int64_t lastScanUid;
SStreamQueue* inputQueue;
} SStreamTaskInfo;
typedef struct SExecTaskInfo {
......
......@@ -60,8 +60,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
taosArrayPush(pInfo->pBlockLists, &p);
}
/*} else if (type == STREAM_INPUT__TABLE_SCAN) {*/
/*ASSERT(pInfo->blockType == STREAM_INPUT__TABLE_SCAN);*/
} else {
ASSERT(0);
}
......
......@@ -44,6 +44,13 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
goto _error;
}
if (model == OPTR_EXEC_MODEL_STREAM) {
(*pTask)->streamInfo.inputQueue = streamQueueOpen();
if ((*pTask)->streamInfo.inputQueue == NULL) {
goto _error;
}
}
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
code = dsDataSinkMgtInit(&cfg);
if (code != TSDB_CODE_SUCCESS) {
......@@ -252,6 +259,13 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
}
}
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
return 0;
}
void* qExtractReaderFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return (void*)pInfo->tqReader;
......
......@@ -1202,15 +1202,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
taosArrayDestroy(pBlock->pDataBlock);
ASSERT(pInfo->pRes->pDataBlock != NULL);
#if 0
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE;
pTaskInfo->code = terrno;
return -1;
}
#endif
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
......@@ -1231,11 +1222,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info;
/*pTaskInfo->code = pOperator->fpSet._openFn(pOperator);*/
/*if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {*/
/*return NULL;*/
/*}*/
qDebug("stream scan called");
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
while (1) {
......@@ -1425,15 +1411,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
qDebug("scan rows: %d", pBlockInfo->rows);
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
#if 0
} else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) {
ASSERT(0);
// check reader last status
// if not match, reset status
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
return pResult && pResult->info.rows > 0 ? pResult : NULL;
#endif
} else {
ASSERT(0);
return NULL;
......
......@@ -653,11 +653,11 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
if (pBlock == NULL) {
qDebug("======printDataBlock Block is Null");
qInfo("======printDataBlock Block is Null");
return;
}
char* pBuf = NULL;
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
qInfo("%s", dumpBlockData(pBlock, flag, &pBuf));
taosMemoryFree(pBuf);
}
......
......@@ -13,4 +13,4 @@ target_link_libraries(qworker
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
\ No newline at end of file
endif(${BUILD_TEST})
......@@ -42,6 +42,9 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
void streamFreeQitem(SStreamQueueItem* data);
#ifdef __cplusplus
}
#endif
......
......@@ -97,3 +97,29 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
taosMemoryFree(pDataSubmit->dataRef);
}
}
int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
ASSERT(elem);
if (dst->type == elem->type && dst->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)elem;
taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
return 0;
} else {
return -1;
}
}
void streamFreeQitem(SStreamQueueItem* data) {
int8_t type = data->type;
if (type == STREAM_INPUT__TRIGGER) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
}
}
......@@ -251,8 +251,8 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data,
ASSERT(vgId > 0 || vgId == SNODE_HANDLE);
req.taskId = downstreamTaskId;
qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
downstreamTaskId, vgId);
qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
downstreamTaskId, vgId);
// serialize
int32_t tlen;
......@@ -298,6 +298,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
if (pBlock == NULL) {
qDebug("stream stop dispatching since no output: task %d", pTask->taskId);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return 0;
}
......
......@@ -75,10 +75,35 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
while (1) {
void* data = streamQueueNextItem(pTask->inputQueue);
int32_t cnt = 0;
void* data = NULL;
while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
qInfo("stream exec over, queue empty");
break;
}
if (data == NULL) {
data = qItem;
streamQueueProcessSuccess(pTask->inputQueue);
continue;
} else {
if (streamAppendQueueItem(data, qItem) < 0) {
streamQueueProcessFail(pTask->inputQueue);
break;
} else {
cnt++;
streamQueueProcessSuccess(pTask->inputQueue);
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
taosFreeQitem(qItem);
}
}
}
if (data == NULL) break;
qInfo("stream task %d exec begin, batch msg: %d", pTask->taskId, cnt);
streamTaskExecImpl(pTask, data, pRes);
qInfo("stream task %d exec end", pTask->taskId);
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
......@@ -95,27 +120,16 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
if (streamTaskOutput(pTask, qRes) < 0) {
streamQueueProcessFail(pTask->inputQueue);
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(qRes);
return NULL;
}
streamQueueProcessSuccess(pTask->inputQueue);
/*streamQueueProcessSuccess(pTask->inputQueue);*/
pRes = taosArrayInit(0, sizeof(SSDataBlock));
}
int8_t type = ((SStreamQueueItem*)data)->type;
if (type == STREAM_INPUT__TRIGGER) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan);
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
}
streamFreeQitem(data);
}
return pRes;
}
......@@ -129,6 +143,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING);
if (execStatus == TASK_EXEC_STATUS__IDLE) {
// first run
qDebug("stream exec, enter exec status");
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
......@@ -136,11 +151,13 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
// second run, make sure inputQ and qall are cleared
qDebug("stream exec, enter closing status");
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
qDebug("stream exec, return result");
return 0;
} else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
continue;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册