提交 482c319b 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 3ef7d43d
...@@ -223,27 +223,12 @@ static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { ...@@ -223,27 +223,12 @@ static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) {
return queue->qItem; return queue->qItem;
} }
static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) { void* streamQueueNextItem(SStreamQueue* queue);
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
if (dequeueFlag == STREAM_QUEUE__FAILED) {
ASSERT(queue->qItem != NULL);
return streamQueueCurItem(queue);
} else {
queue->qItem = NULL;
taosGetQitem(queue->qall, &queue->qItem);
if (queue->qItem == NULL) {
taosReadAllQitems(queue->queue, queue->qall);
taosGetQitem(queue->qall, &queue->qItem);
}
return streamQueueCurItem(queue);
}
}
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit); SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit);
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit);
void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit); SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit);
SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit);
typedef struct { typedef struct {
char* qmsg; char* qmsg;
......
...@@ -1431,7 +1431,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { ...@@ -1431,7 +1431,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
} }
if (pSubmit) { if (pSubmit) {
streamDataSubmitRefDec(pSubmit); streamDataSubmitDestroy(pSubmit);
taosFreeQitem(pSubmit); taosFreeQitem(pSubmit);
} }
......
...@@ -30,7 +30,7 @@ static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubm ...@@ -30,7 +30,7 @@ static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubm
// update processed // update processed
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver); atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitRefDec(pSubmit); streamDataSubmitDestroy(pSubmit);
if (pRsp->blockNum > 0) { if (pRsp->blockNum > 0) {
*ppSubmit = pSubmit; *ppSubmit = pSubmit;
return 0; return 0;
...@@ -58,7 +58,7 @@ int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { ...@@ -58,7 +58,7 @@ int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
} }
while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) { while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) {
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitRefDec(pSubmit); streamDataSubmitDestroy(pSubmit);
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
if (pSubmit == NULL) break; if (pSubmit == NULL) break;
} }
...@@ -120,7 +120,7 @@ int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHan ...@@ -120,7 +120,7 @@ int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHan
int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) { int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) {
int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus); int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus);
if (inputStatus == TASK_INPUT_STATUS__NORMAL) { if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone(pSubmit); SStreamDataSubmit* pSubmitClone = streamSubmitBlockClone(pSubmit);
if (pSubmitClone == NULL) { if (pSubmitClone == NULL) {
return -1; return -1;
} }
......
...@@ -279,18 +279,18 @@ int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -279,18 +279,18 @@ int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type; int8_t type = pItem->type;
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem); SStreamDataSubmit2* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit2*)pItem);
if (pSubmitClone == NULL) { if (pSubmitBlock == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask); qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1; return -1;
} }
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock);
qDebug("stream task:%d %p submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->taskId, qDebug("stream task:%d %p submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->taskId,
pTask, pItem, pSubmitClone, pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen, pTask, pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
pSubmitClone->submit.ver, pTask->inputQueue->queue->numOfItems); pSubmitBlock->submit.ver, pTask->inputQueue->queue->numOfItems);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
...@@ -309,4 +309,20 @@ int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -309,4 +309,20 @@ int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif #endif
return 0; return 0;
}
void* streamQueueNextItem(SStreamQueue* queue) {
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
if (dequeueFlag == STREAM_QUEUE__FAILED) {
ASSERT(queue->qItem != NULL);
return streamQueueCurItem(queue);
} else {
queue->qItem = NULL;
taosGetQitem(queue->qall, &queue->qItem);
if (queue->qItem == NULL) {
taosReadAllQitems(queue->queue, queue->qall);
taosGetQitem(queue->qall, &queue->qItem);
}
return streamQueueCurItem(queue);
}
} }
\ No newline at end of file
...@@ -48,10 +48,12 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock ...@@ -48,10 +48,12 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
if (pArray == NULL) { if (pArray == NULL) {
return -1; return -1;
} }
taosArrayPush(pArray, &(SSDataBlock){0}); taosArrayPush(pArray, &(SSDataBlock){0});
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
SSDataBlock* pDataBlock = taosArrayGet(pArray, 0); SSDataBlock* pDataBlock = taosArrayGet(pArray, 0);
blockDecode(pDataBlock, pRetrieve->data); blockDecode(pDataBlock, pRetrieve->data);
// TODO: refactor // TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
...@@ -79,25 +81,40 @@ SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit) { ...@@ -79,25 +81,40 @@ SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit) {
} }
pDataSubmit->submit = submit; pDataSubmit->submit = submit;
*pDataSubmit->dataRef = 1; *pDataSubmit->dataRef = 1; // initialize the reference count to be 1
pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT; pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT;
return pDataSubmit; return pDataSubmit;
} }
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) {
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
if (ref == 0) {
taosMemoryFree(pDataSubmit->submit.msgStr);
taosMemoryFree(pDataSubmit->dataRef);
}
}
SStreamMergedSubmit2* streamMergedSubmitNew() { SStreamMergedSubmit2* streamMergedSubmitNew() {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM, 0); SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM, 0);
if (pMerged == NULL) {
return NULL;
}
if (pMerged == NULL) return NULL;
pMerged->submits = taosArrayInit(0, sizeof(SPackedData)); pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
pMerged->dataRefs = taosArrayInit(0, sizeof(void*)); pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
if (pMerged->dataRefs == NULL || pMerged->submits == NULL) goto FAIL;
if (pMerged->dataRefs == NULL || pMerged->submits == NULL) {
taosArrayDestroy(pMerged->submits);
taosArrayDestroy(pMerged->dataRefs);
taosFreeQitem(pMerged);
return NULL;
}
pMerged->type = STREAM_INPUT__MERGED_SUBMIT; pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
return pMerged; return pMerged;
FAIL:
if (pMerged->submits) taosArrayDestroy(pMerged->submits);
if (pMerged->dataRefs) taosArrayDestroy(pMerged->dataRefs);
taosFreeQitem(pMerged);
return NULL;
} }
int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) { int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) {
...@@ -111,26 +128,17 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) ...@@ -111,26 +128,17 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit)
atomic_add_fetch_32(pDataSubmit->dataRef, 1); atomic_add_fetch_32(pDataSubmit->dataRef, 1);
} }
SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit) { SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) {
SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0);
if (pSubmitClone == NULL) { if (pSubmitClone == NULL) {
return NULL; return NULL;
} }
streamDataSubmitRefInc(pSubmit); streamDataSubmitRefInc(pSubmit);
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2)); memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2));
return pSubmitClone; return pSubmitClone;
} }
void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit) {
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
taosMemoryFree(pDataSubmit->submit.msgStr);
taosMemoryFree(pDataSubmit->dataRef);
}
}
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
ASSERT(elem); ASSERT(elem);
if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) { if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) {
...@@ -168,7 +176,7 @@ void streamFreeQitem(SStreamQueueItem* data) { ...@@ -168,7 +176,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit2*)data); streamDataSubmitDestroy((SStreamDataSubmit2*)data);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__MERGED_SUBMIT) { } else if (type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data; SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册