提交 a717042a 编写于 作者: L Liu Jicong

refactor(stream): simple batch

上级 eb49ec97
...@@ -307,7 +307,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem ...@@ -307,7 +307,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1; return -1;
} }
qInfo("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data); qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
// qStreamInput(pTask->exec.executor, pSubmitClone); // qStreamInput(pTask->exec.executor, pSubmitClone);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
......
...@@ -357,16 +357,16 @@ SArray *vmGetMsgHandles() { ...@@ -357,16 +357,16 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -653,11 +653,11 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -653,11 +653,11 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
void printDataBlock(SSDataBlock* pBlock, const char* flag) { void printDataBlock(SSDataBlock* pBlock, const char* flag) {
if (pBlock == NULL) { if (pBlock == NULL) {
qInfo("======printDataBlock Block is Null"); qDebug("======printDataBlock Block is Null");
return; return;
} }
char* pBuf = NULL; char* pBuf = NULL;
qInfo("%s", dumpBlockData(pBlock, flag, &pBuf)); qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
} }
......
...@@ -80,7 +80,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -80,7 +80,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
while (1) { while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) { if (qItem == NULL) {
qInfo("stream exec over, queue empty"); qDebug("stream exec over, queue empty");
break; break;
} }
if (data == NULL) { if (data == NULL) {
...@@ -101,9 +101,9 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -101,9 +101,9 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
} }
if (data == NULL) break; if (data == NULL) break;
qInfo("stream task %d exec begin, batch msg: %d", pTask->taskId, cnt); qDebug("stream task %d exec begin, batch msg: %d", pTask->taskId, cnt);
streamTaskExecImpl(pTask, data, pRes); streamTaskExecImpl(pTask, data, pRes);
qInfo("stream task %d exec end", pTask->taskId); qDebug("stream task %d exec end", pTask->taskId);
if (pTask->taskStatus == TASK_STATUS__DROPPING) { if (pTask->taskStatus == TASK_STATUS__DROPPING) {
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock); taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册