未验证 提交 f2f7590c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16975 from taosdata/feature/stream

fix(stream): check delete uid
......@@ -116,6 +116,7 @@ enum {
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__REF_DATA_BLOCK,
STREAM_INPUT__DESTROY,
};
......
......@@ -125,6 +125,14 @@ typedef struct {
SArray* blocks; // SArray<SSDataBlock>
} SStreamDataBlock;
// ref data block, for delete
typedef struct {
int8_t type;
int64_t ver;
int32_t* dataRef;
SSDataBlock* pBlock;
} SStreamRefDataBlock;
typedef struct {
int8_t type;
} SStreamCheckpoint;
......@@ -339,7 +347,8 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
// qStreamInput(pTask->exec.executor, pSubmitClone);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE ||
pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
......@@ -492,7 +501,9 @@ typedef struct {
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tFreeStreamDispatchReq(SStreamDispatchReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupTrigger(SStreamTask* pTask);
......
......@@ -59,7 +59,7 @@ static void destroySTqHandle(void* data) {
tqCloseReader(pData->execHandle.pExecReader);
walCloseReader(pData->pWalReader);
taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
walCloseReader(pData->pWalReader);
tqCloseReader(pData->execHandle.pExecReader);
}
......@@ -664,7 +664,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
req.newConsumerId, req.oldConsumerId);
}
ASSERT(req.newConsumerId != -1);
if (req.newConsumerId == -1) {
tqError("vgId:%d, tq invalid rebalance request, new consumerId %ld", req.vgId, req.newConsumerId);
return 0;
}
STqHandle tqHandle = {0};
pHandle = &tqHandle;
/*taosInitRWLatch(&pExec->lock);*/
......@@ -876,6 +879,9 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
taosArrayDestroy(pRes->uidList);
int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
*pRef = 1;
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
......@@ -885,6 +891,33 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
if (!failed) {
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM);
pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
pRefBlock->pBlock = pDelBlock;
pRefBlock->dataRef = pRef;
atomic_add_fetch_32(pRefBlock->dataRef, 1);
if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
qError("stream task input del failed, task id %d", pTask->taskId);
continue;
}
if (streamSchedExec(pTask) < 0) {
qError("stream task launch failed, task id %d", pTask->taskId);
continue;
}
} else {
streamTaskInputFail(pTask);
}
}
int32_t ref = atomic_sub_fetch_32(pRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
taosMemoryFree(pDelBlock);
taosMemoryFree(pRef);
}
#if 0
SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
......@@ -908,6 +941,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
}
}
blockDataDestroy(pDelBlock);
#endif
return 0;
}
......@@ -1045,6 +1079,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.dstTaskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
......@@ -1053,6 +1088,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
.code = 0,
};
streamProcessRetrieveReq(pTask, &req, &rsp);
tDeleteStreamRetrieveReq(&req);
return 0;
} else {
return -1;
......
......@@ -1480,6 +1480,40 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
}
}
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
STqReader* pReader = pInfo->tqReader;
int32_t rows = pSrc->info.rows;
blockDataEnsureCapacity(pDst, rows);
SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
uint64_t* startCol = (uint64_t*)pSrcStartCol->pData;
SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
uint64_t* endCol = (uint64_t*)pSrcEndCol->pData;
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pSrcUidCol->pData;
SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
int32_t j = 0;
for (int32_t i = 0; i < rows; i++) {
if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
colDataAppend(pDstStartCol, j, (const char*)&startCol[i], false);
colDataAppend(pDstEndCol, j, (const char*)&endCol[i], false);
colDataAppend(pDstUidCol, j, (const char*)&uidCol[i], false);
colDataAppendNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
j++;
}
}
pDst->info = pSrc->info;
pDst->info.rows = j;
return 0;
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -1568,6 +1602,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} break;
case STREAM_DELETE_DATA: {
printDataBlock(pBlock, "stream scan delete recv");
if (pInfo->tqReader) {
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
filterDelBlockByUid(pDelBlock, pBlock, pInfo);
pBlock = pDelBlock;
}
printDataBlock(pBlock, "stream scan delete recv filtered");
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
generateDeleteResultBlock(pInfo, pBlock, pInfo->pDeleteDataRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
......
......@@ -182,7 +182,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
pReq->upstreamTaskId);
streamTaskEnqueue(pTask, pReq, pRsp);
tFreeStreamDispatchReq(pReq);
tDeleteStreamDispatchReq(pReq);
if (exec) {
if (streamTryExec(pTask) < 0) {
......
......@@ -179,5 +179,15 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosArrayDestroy(pMerge->reqs);
taosArrayDestroy(pMerge->dataRefs);
taosFreeQitem(pMerge);
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
int32_t ref = atomic_sub_fetch_32(pRefBlock->dataRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
blockDataDestroy(pRefBlock->pBlock);
taosMemoryFree(pRefBlock->dataRef);
}
taosFreeQitem(pRefBlock);
}
}
......@@ -62,7 +62,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
return 0;
}
void tFreeStreamDispatchReq(SStreamDispatchReq* pReq) {
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq) {
taosArrayDestroyP(pReq->data, taosMemoryFree);
taosArrayDestroy(pReq->dataLen);
}
......@@ -95,7 +95,10 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
return 0;
}
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) {
int32_t code = -1;
SRetrieveTableRsp* pRetrieve = NULL;
void* buf = NULL;
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
......@@ -143,7 +146,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
buf = rpcMallocCont(sizeof(SMsgHead) + len);
if (buf == NULL) {
goto FAIL;
goto CLEAR;
}
((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
......@@ -151,6 +154,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeStreamRetrieveReq(&encoder, &req);
tEncoderClear(&encoder);
SRpcMsg rpcMsg = {
.code = 0,
......@@ -161,17 +165,18 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
ASSERT(0);
return -1;
goto CLEAR;
}
buf = NULL;
qDebug("task %d(child %d) send retrieve req to task %d at node %d, reqId %" PRId64, pTask->taskId,
pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
}
return 0;
FAIL:
if (pRetrieve) taosMemoryFree(pRetrieve);
if (buf) taosMemoryFree(buf);
return -1;
code = 0;
CLEAR:
taosMemoryFree(pRetrieve);
rpcFreeCont(buf);
return code;
}
static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
......
......@@ -38,6 +38,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
SArray* blocks = pMerged->reqs;
qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT);
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data;
qSetMultiStreamInput(exec, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else {
ASSERT(0);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册