提交 b6f55fa0 编写于 作者: L Liu Jicong

refactor(stream): batch optimization for submit msg

上级 4dad4138
......@@ -54,12 +54,12 @@ enum {
enum {
STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__MERGED_SUBMIT,
// STREAM_INPUT__TABLE_SCAN,
STREAM_INPUT__TQ_SCAN,
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__DROP,
};
typedef enum EStreamType {
......
......@@ -77,6 +77,13 @@ typedef struct {
SSubmitReq* data;
} SStreamDataSubmit;
typedef struct {
int8_t type;
int64_t ver;
SArray* dataRefs; // SArray<int32_t*>
SArray* reqs; // SArray<SSubmitReq*>
} SStreamMergedSubmit;
typedef struct {
int8_t type;
......
......@@ -77,6 +77,27 @@ FAIL:
return NULL;
}
SStreamMergedSubmit* streamMergedSubmitNew() {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM);
if (pMerged == NULL) return NULL;
pMerged->reqs = taosArrayInit(0, sizeof(void*));
pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL;
return pMerged;
FAIL:
if (pMerged->reqs) taosArrayDestroy(pMerged->reqs);
if (pMerged->dataRefs) taosArrayDestroy(pMerged->dataRefs);
taosFreeQitem(pMerged);
return NULL;
}
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
taosArrayPush(pMerged->dataRefs, pSubmit->dataRef);
taosArrayPush(pMerged->reqs, pSubmit->data);
pMerged->ver = pSubmit->ver;
return 0;
}
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
atomic_add_fetch_32(pDataSubmit->dataRef, 1);
}
......@@ -102,10 +123,26 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
ASSERT(elem);
if (dst->type == elem->type && dst->type == STREAM_INPUT__DATA_BLOCK) {
if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)elem;
taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
taosArrayDestroy(pBlockSrc->blocks);
taosFreeQitem(elem);
return 0;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)elem;
streamMergeSubmit(pMerged, pBlockSrc);
taosFreeQitem(elem);
return 0;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
ASSERT(pMerged);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)elem);
taosFreeQitem(dst);
taosFreeQitem(elem);
return 0;
} else {
return -1;
......@@ -123,5 +160,20 @@ void streamFreeQitem(SStreamQueueItem* data) {
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
int32_t sz = taosArrayGetSize(pMerge->reqs);
for (int32_t i = 0; i < sz; i++) {
int32_t* ref = taosArrayGet(pMerge->dataRefs, i);
(*ref)--;
if (*ref == 0) {
void* data = taosArrayGet(pMerge->reqs, i);
taosMemoryFree(data);
taosMemoryFree(ref);
}
}
taosArrayDestroy(pMerge->reqs);
taosArrayDestroy(pMerge->dataRefs);
taosFreeQitem(pMerge);
}
}
......@@ -33,9 +33,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
SArray* blocks = pBlock->blocks;
qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DROP) {
// TODO exec drop
return 0;
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
SArray* blocks = pMerged->reqs;
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_SUBMIT, false);
} else {
ASSERT(0);
}
// exec
......@@ -155,11 +158,9 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (data == NULL) {
data = qItem;
streamQueueProcessSuccess(pTask->inputQueue);
if (qItem->type == STREAM_INPUT__DATA_BLOCK) {
/*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
} else {
break;
}
/*}*/
} else {
if (streamAppendQueueItem(data, qItem) < 0) {
streamQueueProcessFail(pTask->inputQueue);
......@@ -168,11 +169,10 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
cnt++;
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
streamQueueProcessSuccess(pTask->inputQueue);
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
taosFreeQitem(qItem);
}
}
}
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
if (data) streamFreeQitem(data);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
......@@ -194,6 +194,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
// TODO log failed ver
streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes);
return NULL;
......@@ -201,6 +202,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
if (streamTaskOutput(pTask, qRes) < 0) {
// TODO log failed ver
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册