提交 8e15c644 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 1975e945
...@@ -113,14 +113,14 @@ typedef struct { ...@@ -113,14 +113,14 @@ typedef struct {
int64_t ver; int64_t ver;
int32_t* dataRef; int32_t* dataRef;
SPackedData submit; SPackedData submit;
} SStreamDataSubmit2; } SStreamDataSubmit;
typedef struct { typedef struct {
int8_t type; int8_t type;
int64_t ver; int64_t ver;
SArray* dataRefs; // SArray<int32_t*> SArray* dataRefs; // SArray<int32_t*>
SArray* submits; // SArray<SPackedSubmit> SArray* submits; // SArray<SPackedSubmit>
} SStreamMergedSubmit2; } SStreamMergedSubmit;
typedef struct { typedef struct {
int8_t type; int8_t type;
...@@ -209,10 +209,10 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { ...@@ -209,10 +209,10 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
void* streamQueueNextItem(SStreamQueue* queue); void* streamQueueNextItem(SStreamQueue* queue);
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type); SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit); SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit);
typedef struct { typedef struct {
char* qmsg; char* qmsg;
......
...@@ -183,7 +183,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq); ...@@ -183,7 +183,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq);
// tq util // tq util
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); int32_t tqAddBlockNLaunchTask(SStreamTask* pTask, SPackedData* pData);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -312,7 +312,6 @@ int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) { ...@@ -312,7 +312,6 @@ int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) {
void* data = taosMemoryMalloc(len); void* data = taosMemoryMalloc(len);
if (data == NULL) { if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
return -1; return -1;
......
...@@ -120,8 +120,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -120,8 +120,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue; continue;
} }
// append the data for the stream // append the data for the stream
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
} else { } else {
...@@ -145,17 +143,9 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -145,17 +143,9 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue; continue;
} }
SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT);
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
noNewDataInWal = false; noNewDataInWal = false;
code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver); code = tqAddBlockNLaunchTask(pTask, &packData);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
...@@ -164,8 +154,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -164,8 +154,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
tqError("s-task:%s append input queue failed, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); tqError("s-task:%s append input queue failed, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer);
} }
streamDataSubmitDestroy(p);
taosFreeQitem(p);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
} }
......
...@@ -26,10 +26,15 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { ...@@ -26,10 +26,15 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
return taosStrdup(buf); return taosStrdup(buf);
} }
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) { int32_t tqAddBlockNLaunchTask(SStreamTask* pTask, SPackedData *pPackedData) {
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); SStreamDataSubmit* p = streamDataSubmitNew(pPackedData, STREAM_INPUT__DATA_SUBMIT);
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*) p);
streamDataSubmitDestroy(p);
taosFreeQitem(p);
if (code < 0) { if (code < 0) {
tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver); tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, pPackedData->ver);
return -1; return -1;
} }
......
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100) #define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100)
#define ONE_MB_F (1048576.0)
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q)/ONE_MB_F)
int32_t streamInit() { int32_t streamInit() {
int8_t old; int8_t old;
...@@ -288,26 +291,26 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -288,26 +291,26 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type; int8_t type = pItem->type;
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit2*)pItem);
if (pSubmitBlock == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask);
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1;
}
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, numOfBlocks, size); px->submit.msgLen, px->submit.ver, numOfBlocks, size);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
(numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr,
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE,
numOfBlocks, size); numOfBlocks, size);
streamDataSubmitDestroy(pSubmitBlock); return -1;
}
SStreamDataSubmit* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit*)pItem);
if (pSubmitBlock == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask);
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1; return -1;
} }
...@@ -315,7 +318,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -315,7 +318,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
(numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
......
...@@ -67,8 +67,8 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock ...@@ -67,8 +67,8 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
return 0; return 0;
} }
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, submit.msgLen); SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen);
if (pDataSubmit == NULL) { if (pDataSubmit == NULL) {
return NULL; return NULL;
} }
...@@ -79,14 +79,14 @@ SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { ...@@ -79,14 +79,14 @@ SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) {
return NULL; return NULL;
} }
pDataSubmit->submit = submit; pDataSubmit->submit = *pData;
*pDataSubmit->dataRef = 1; // initialize the reference count to be 1 *pDataSubmit->dataRef = 1; // initialize the reference count to be 1
pDataSubmit->type = type; pDataSubmit->type = type;
return pDataSubmit; return pDataSubmit;
} }
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) { void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
...@@ -96,8 +96,8 @@ void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) { ...@@ -96,8 +96,8 @@ void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) {
} }
} }
SStreamMergedSubmit2* streamMergedSubmitNew() { SStreamMergedSubmit* streamMergedSubmitNew() {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM, 0); SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0);
if (pMerged == NULL) { if (pMerged == NULL) {
return NULL; return NULL;
} }
...@@ -116,30 +116,30 @@ SStreamMergedSubmit2* streamMergedSubmitNew() { ...@@ -116,30 +116,30 @@ SStreamMergedSubmit2* streamMergedSubmitNew() {
return pMerged; return pMerged;
} }
int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) { int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef); taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
taosArrayPush(pMerged->submits, &pSubmit->submit); taosArrayPush(pMerged->submits, &pSubmit->submit);
pMerged->ver = pSubmit->ver; pMerged->ver = pSubmit->ver;
return 0; return 0;
} }
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) { static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
atomic_add_fetch_32(pDataSubmit->dataRef, 1); atomic_add_fetch_32(pDataSubmit->dataRef, 1);
} }
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) { SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit) {
int32_t len = 0; int32_t len = 0;
if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) { if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) {
len = pSubmit->submit.msgLen; len = pSubmit->submit.msgLen;
} }
SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, len); SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, len);
if (pSubmitClone == NULL) { if (pSubmitClone == NULL) {
return NULL; return NULL;
} }
streamDataSubmitRefInc(pSubmit); streamDataSubmitRefInc(pSubmit);
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2)); memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
return pSubmitClone; return pSubmitClone;
} }
...@@ -152,17 +152,17 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* ...@@ -152,17 +152,17 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
taosFreeQitem(pElem); taosFreeQitem(pElem);
return dst; return dst;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)dst; SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)pElem; SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem;
streamMergeSubmit(pMerged, pBlockSrc); streamMergeSubmit(pMerged, pBlockSrc);
taosFreeQitem(pElem); taosFreeQitem(pElem);
return dst; return dst;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit2* pMerged = streamMergedSubmitNew(); SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
// todo handle error // todo handle error
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst); streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem); streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
taosFreeQitem(dst); taosFreeQitem(dst);
taosFreeQitem(pElem); taosFreeQitem(pElem);
return (SStreamQueueItem*)pMerged; return (SStreamQueueItem*)pMerged;
...@@ -180,10 +180,10 @@ void streamFreeQitem(SStreamQueueItem* data) { ...@@ -180,10 +180,10 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitDestroy((SStreamDataSubmit2*)data); streamDataSubmitDestroy((SStreamDataSubmit*)data);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__MERGED_SUBMIT) { } else if (type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data; SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
int32_t sz = taosArrayGetSize(pMerge->submits); int32_t sz = taosArrayGetSize(pMerge->submits);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
......
...@@ -51,7 +51,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -51,7 +51,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr, qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr,
pSubmit->submit.msgLen, pSubmit->submit.ver); pSubmit->submit.msgLen, pSubmit->submit.ver);
...@@ -63,7 +63,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -63,7 +63,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data; const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data;
SArray* pBlockList = pMerged->submits; SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList); int32_t numOfBlocks = taosArrayGetSize(pBlockList);
...@@ -366,11 +366,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -366,11 +366,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qRes->blocks = pRes; qRes->blocks = pRes;
if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) { if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)pInput; SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pInput;
qRes->childId = pTask->selfChildId; qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver; qRes->sourceVer = pSubmit->ver;
} else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) { } else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)pInput; SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pInput;
qRes->childId = pTask->selfChildId; qRes->childId = pTask->selfChildId;
qRes->sourceVer = pMerged->ver; qRes->sourceVer = pMerged->ver;
} }
......
...@@ -105,3 +105,61 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { ...@@ -105,3 +105,61 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
return (SStreamQueueRes){0}; return (SStreamQueueRes){0};
} }
#endif #endif
#define MAX_STREAM_EXEC_BATCH_NUM 128
#define MIN_STREAM_EXEC_BATCH_NUM 16
// todo refactor:
// read data from input queue
typedef struct SQueueReader {
SStreamQueue* pQueue;
int32_t taskLevel;
int32_t maxBlocks; // maximum block in one batch
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
} SQueueReader;
SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* idstr) {
int32_t numOfBlocks = 0;
int32_t tryCount = 0;
SStreamQueueItem* pRet = NULL;
while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pReader->pQueue);
if (qItem == NULL) {
if (pReader->taskLevel == TASK_LEVEL__SOURCE && numOfBlocks < MIN_STREAM_EXEC_BATCH_NUM && tryCount < pReader->waitDuration) {
tryCount++;
taosMsleep(1);
qDebug("===stream===try again batchSize:%d", numOfBlocks);
continue;
}
qDebug("===stream===break batchSize:%d", numOfBlocks);
break;
}
if (pRet == NULL) {
pRet = qItem;
streamQueueProcessSuccess(pReader->pQueue);
if (pReader->taskLevel == TASK_LEVEL__SINK) {
break;
}
} else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = NULL;
if ((newRet = streamMergeQueueItem(pRet, qItem)) == NULL) {
streamQueueProcessFail(pReader->pQueue);
break;
} else {
numOfBlocks++;
pRet = newRet;
streamQueueProcessSuccess(pReader->pQueue);
if (numOfBlocks > pReader->maxBlocks) {
qDebug("maximum blocks limit:%d reached, processing, %s", pReader->maxBlocks, idstr);
break;
}
}
}
}
return pRet;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册