提交 24bc7156 编写于 作者: L Liu Jicong

refactor(stream): batch optimization for submit msg

上级 5200a481
...@@ -51,10 +51,13 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -51,10 +51,13 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
/*qError("submit msg messed up when initing stream block, %s" PRIx64, id);*/ /*qError("submit msg messed up when initing stream block, %s" PRIx64, id);*/
/*return TSDB_CODE_QRY_APP_ERROR;*/ /*return TSDB_CODE_QRY_APP_ERROR;*/
/*}*/ /*}*/
taosArrayClear(pInfo->pBlockLists); if (numOfBlocks == 1) {
for (int32_t i = 0; i < numOfBlocks; i++) { taosArrayPush(pInfo->pBlockLists, &input);
SSubmitReq* pReq = POINTER_SHIFT(input, i * sizeof(void*)); } else {
taosArrayPush(pInfo->pBlockLists, &pReq); for (int32_t i = 0; i < numOfBlocks; i++) {
SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
taosArrayPush(pInfo->pBlockLists, &pReq);
}
} }
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
......
...@@ -1433,7 +1433,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1433,7 +1433,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
break; break;
} else { } else {
pInfo->tqReader->pMsg = NULL; pInfo->tqReader->pMsg = NULL;
return NULL; continue;
} }
/*blockDataCleanup(pInfo->pRes);*/ /*blockDataCleanup(pInfo->pRes);*/
} }
......
...@@ -81,7 +81,7 @@ SStreamMergedSubmit* streamMergedSubmitNew() { ...@@ -81,7 +81,7 @@ SStreamMergedSubmit* streamMergedSubmitNew() {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM); SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM);
if (pMerged == NULL) return NULL; if (pMerged == NULL) return NULL;
pMerged->reqs = taosArrayInit(0, sizeof(void*)); pMerged->reqs = taosArrayInit(0, sizeof(void*));
pMerged->dataRefs = taosArrayInit(0, sizeof(int32_t*)); pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL; if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL;
pMerged->type = STREAM_INPUT__MERGED_SUBMIT; pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
return pMerged; return pMerged;
...@@ -93,7 +93,7 @@ FAIL: ...@@ -93,7 +93,7 @@ FAIL:
} }
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
taosArrayPush(pMerged->dataRefs, pSubmit->dataRef); taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
taosArrayPush(pMerged->reqs, &pSubmit->data); taosArrayPush(pMerged->reqs, &pSubmit->data);
pMerged->ver = pSubmit->ver; pMerged->ver = pSubmit->ver;
return 0; return 0;
...@@ -165,7 +165,7 @@ void streamFreeQitem(SStreamQueueItem* data) { ...@@ -165,7 +165,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
int32_t sz = taosArrayGetSize(pMerge->reqs); int32_t sz = taosArrayGetSize(pMerge->reqs);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t* ref = taosArrayGet(pMerge->dataRefs, i); int32_t* ref = taosArrayGetP(pMerge->dataRefs, i);
(*ref)--; (*ref)--;
if (*ref == 0) { if (*ref == 0) {
void* data = taosArrayGetP(pMerge->reqs, i); void* data = taosArrayGetP(pMerge->reqs, i);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册