提交 72fba5a8 编写于 作者: L Liu Jicong

fix(stream): check delete uid

上级 8972b64c
...@@ -116,6 +116,7 @@ enum { ...@@ -116,6 +116,7 @@ enum {
STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES, STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT, STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__REF_DATA_BLOCK,
STREAM_INPUT__DESTROY, STREAM_INPUT__DESTROY,
}; };
......
...@@ -125,6 +125,14 @@ typedef struct { ...@@ -125,6 +125,14 @@ typedef struct {
SArray* blocks; // SArray<SSDataBlock> SArray* blocks; // SArray<SSDataBlock>
} SStreamDataBlock; } SStreamDataBlock;
// ref data block, for delete
typedef struct {
int8_t type;
int64_t ver;
int32_t* dataRef;
SSDataBlock* pBlock;
} SStreamRefDataBlock;
typedef struct { typedef struct {
int8_t type; int8_t type;
} SStreamCheckpoint; } SStreamCheckpoint;
...@@ -339,7 +347,8 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem ...@@ -339,7 +347,8 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data); qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
// qStreamInput(pTask->exec.executor, pSubmitClone); // qStreamInput(pTask->exec.executor, pSubmitClone);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE ||
pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem); // qStreamInput(pTask->exec.executor, pItem);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) { } else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
...@@ -492,7 +501,9 @@ typedef struct { ...@@ -492,7 +501,9 @@ typedef struct {
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tFreeStreamDispatchReq(SStreamDispatchReq* pReq); void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamSetupTrigger(SStreamTask* pTask);
......
...@@ -59,7 +59,7 @@ static void destroySTqHandle(void* data) { ...@@ -59,7 +59,7 @@ static void destroySTqHandle(void* data) {
tqCloseReader(pData->execHandle.pExecReader); tqCloseReader(pData->execHandle.pExecReader);
walCloseReader(pData->pWalReader); walCloseReader(pData->pWalReader);
taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid); taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE){ } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
walCloseReader(pData->pWalReader); walCloseReader(pData->pWalReader);
tqCloseReader(pData->execHandle.pExecReader); tqCloseReader(pData->execHandle.pExecReader);
} }
...@@ -664,7 +664,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe ...@@ -664,7 +664,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey, tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
req.newConsumerId, req.oldConsumerId); req.newConsumerId, req.oldConsumerId);
} }
ASSERT(req.newConsumerId != -1); if (req.newConsumerId == -1) {
tqError("vgId:%d, tq invalid rebalance request, new consumerId %ld", req.vgId, req.newConsumerId);
return 0;
}
STqHandle tqHandle = {0}; STqHandle tqHandle = {0};
pHandle = &tqHandle; pHandle = &tqHandle;
/*taosInitRWLatch(&pExec->lock);*/ /*taosInitRWLatch(&pExec->lock);*/
...@@ -876,6 +879,9 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { ...@@ -876,6 +879,9 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
taosArrayDestroy(pRes->uidList); taosArrayDestroy(pRes->uidList);
int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
*pRef = 1;
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
...@@ -885,6 +891,33 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { ...@@ -885,6 +891,33 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver); qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
if (!failed) {
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM);
pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
pRefBlock->pBlock = pDelBlock;
pRefBlock->dataRef = pRef;
atomic_add_fetch_32(pRefBlock->dataRef, 1);
if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
qError("stream task input del failed, task id %d", pTask->taskId);
continue;
}
if (streamSchedExec(pTask) < 0) {
qError("stream task launch failed, task id %d", pTask->taskId);
continue;
}
} else {
streamTaskInputFail(pTask);
}
}
int32_t ref = atomic_sub_fetch_32(pRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
taosMemoryFree(pDelBlock);
taosMemoryFree(pRef);
}
#if 0
SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
pStreamBlock->type = STREAM_INPUT__DATA_BLOCK; pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock)); pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
...@@ -908,6 +941,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { ...@@ -908,6 +941,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
} }
} }
blockDataDestroy(pDelBlock); blockDataDestroy(pDelBlock);
#endif
return 0; return 0;
} }
...@@ -1045,6 +1079,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1045,6 +1079,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen); tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req); tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.dstTaskId; int32_t taskId = req.dstTaskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) { if (pTask) {
...@@ -1053,6 +1088,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1053,6 +1088,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
.code = 0, .code = 0,
}; };
streamProcessRetrieveReq(pTask, &req, &rsp); streamProcessRetrieveReq(pTask, &req, &rsp);
tDeleteStreamRetrieveReq(&req);
return 0; return 0;
} else { } else {
return -1; return -1;
......
...@@ -1480,6 +1480,40 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1480,6 +1480,40 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
} }
} }
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
STqReader* pReader = pInfo->tqReader;
int32_t rows = pSrc->info.rows;
blockDataEnsureCapacity(pDst, rows);
SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
uint64_t* startCol = (uint64_t*)pSrcStartCol->pData;
SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
uint64_t* endCol = (uint64_t*)pSrcEndCol->pData;
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pSrcUidCol->pData;
SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
int32_t j = 0;
for (int32_t i = 0; i < rows; i++) {
if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
colDataAppend(pDstStartCol, j, (const char*)&startCol[i], false);
colDataAppend(pDstEndCol, j, (const char*)&endCol[i], false);
colDataAppend(pDstUidCol, j, (const char*)&uidCol[i], false);
colDataAppendNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
j++;
}
}
pDst->info = pSrc->info;
pDst->info.rows = j;
return 0;
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -1568,6 +1602,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1568,6 +1602,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} break; } break;
case STREAM_DELETE_DATA: { case STREAM_DELETE_DATA: {
printDataBlock(pBlock, "stream scan delete recv"); printDataBlock(pBlock, "stream scan delete recv");
if (pInfo->tqReader) {
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
filterDelBlockByUid(pDelBlock, pBlock, pInfo);
pBlock = pDelBlock;
}
printDataBlock(pBlock, "stream scan delete recv filtered");
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) { if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
generateDeleteResultBlock(pInfo, pBlock, pInfo->pDeleteDataRes); generateDeleteResultBlock(pInfo, pBlock, pInfo->pDeleteDataRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
......
...@@ -182,7 +182,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S ...@@ -182,7 +182,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
pReq->upstreamTaskId); pReq->upstreamTaskId);
streamTaskEnqueue(pTask, pReq, pRsp); streamTaskEnqueue(pTask, pReq, pRsp);
tFreeStreamDispatchReq(pReq); tDeleteStreamDispatchReq(pReq);
if (exec) { if (exec) {
if (streamTryExec(pTask) < 0) { if (streamTryExec(pTask) < 0) {
......
...@@ -179,5 +179,15 @@ void streamFreeQitem(SStreamQueueItem* data) { ...@@ -179,5 +179,15 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosArrayDestroy(pMerge->reqs); taosArrayDestroy(pMerge->reqs);
taosArrayDestroy(pMerge->dataRefs); taosArrayDestroy(pMerge->dataRefs);
taosFreeQitem(pMerge); taosFreeQitem(pMerge);
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
int32_t ref = atomic_sub_fetch_32(pRefBlock->dataRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
blockDataDestroy(pRefBlock->pBlock);
taosMemoryFree(pRefBlock->dataRef);
}
taosFreeQitem(pRefBlock);
} }
} }
...@@ -62,7 +62,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { ...@@ -62,7 +62,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
return 0; return 0;
} }
void tFreeStreamDispatchReq(SStreamDispatchReq* pReq) { void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq) {
taosArrayDestroyP(pReq->data, taosMemoryFree); taosArrayDestroyP(pReq->data, taosMemoryFree);
taosArrayDestroy(pReq->dataLen); taosArrayDestroy(pReq->dataLen);
} }
...@@ -95,7 +95,10 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) { ...@@ -95,7 +95,10 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
return 0; return 0;
} }
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) { int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) {
int32_t code = -1;
SRetrieveTableRsp* pRetrieve = NULL; SRetrieveTableRsp* pRetrieve = NULL;
void* buf = NULL; void* buf = NULL;
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
...@@ -143,7 +146,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -143,7 +146,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
buf = rpcMallocCont(sizeof(SMsgHead) + len); buf = rpcMallocCont(sizeof(SMsgHead) + len);
if (buf == NULL) { if (buf == NULL) {
goto FAIL; goto CLEAR;
} }
((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId); ((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
...@@ -151,6 +154,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -151,6 +154,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, len); tEncoderInit(&encoder, abuf, len);
tEncodeStreamRetrieveReq(&encoder, &req); tEncodeStreamRetrieveReq(&encoder, &req);
tEncoderClear(&encoder);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.code = 0, .code = 0,
...@@ -161,17 +165,18 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -161,17 +165,18 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) { if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
ASSERT(0); ASSERT(0);
return -1; goto CLEAR;
} }
buf = NULL;
qDebug("task %d(child %d) send retrieve req to task %d at node %d, reqId %" PRId64, pTask->taskId, qDebug("task %d(child %d) send retrieve req to task %d at node %d, reqId %" PRId64, pTask->taskId,
pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId); pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
} }
return 0; code = 0;
FAIL: CLEAR:
if (pRetrieve) taosMemoryFree(pRetrieve); taosMemoryFree(pRetrieve);
if (buf) taosMemoryFree(buf); rpcFreeCont(buf);
return -1; return code;
} }
static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
......
...@@ -38,6 +38,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -38,6 +38,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
SArray* blocks = pMerged->reqs; SArray* blocks = pMerged->reqs;
qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size); qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT);
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data;
qSetMultiStreamInput(exec, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else { } else {
ASSERT(0); ASSERT(0);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册