提交 48894c8b 编写于 作者: L Liu Jicong

enh(stream): new api for stream queue

上级 ec7df423
......@@ -72,7 +72,7 @@ SHOW STREAMS;
若要展示更详细的信息,可以使用:
```sql
SELECT * from performance_schema.`perf_streams`;
SELECT * from information_schema.`ins_streams`;
```
## 流式计算的触发模式
......
......@@ -140,15 +140,40 @@ typedef struct {
int8_t type;
} SStreamCheckpoint;
typedef struct {
int8_t type;
} SStreamTaskDestroy;
typedef struct {
int8_t type;
SSDataBlock* pBlock;
} SStreamTrigger;
typedef struct SStreamQueueNode SStreamQueueNode;
struct SStreamQueueNode {
SStreamQueueItem* item;
SStreamQueueNode* next;
};
typedef struct {
SStreamQueueNode* head;
int64_t size;
} SStreamQueueRes;
void streamFreeQitem(SStreamQueueItem* data);
bool streamQueueResEmpty(const SStreamQueueRes* pRes);
int64_t streamQueueResSize(const SStreamQueueRes* pRes);
SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes);
SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes);
void streamQueueResClear(SStreamQueueRes* pRes);
SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pNode);
typedef struct {
SStreamQueueNode* pHead;
} SStreamQueue1;
bool streamQueueHasTask(const SStreamQueue1* pQueue);
int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem);
SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue);
typedef struct {
STaosQueue* queue;
STaosQall* qall;
......
......@@ -457,6 +457,8 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
if (topicEp.vgs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosRUnLockLatch(&pConsumer->lock);
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
goto FAIL;
}
......
......@@ -317,9 +317,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
ASSERT(pDbObj != NULL);
sdbRelease(pSdb, pDbObj);
bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
sdbRelease(pSdb, pDbObj);
if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
/*if (true) {*/
......@@ -451,7 +451,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) {
ASSERT(0);
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
......
......@@ -379,7 +379,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle) {
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
ASSERT(0);
return -1;
}
}
......
......@@ -915,6 +915,11 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
}
pDest->info.rows++;
if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) {
void* tbname = NULL;
if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) {
memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
tdbFree(tbname);
} else {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex);
SSDataBlock* pResBlock = createDataBlock();
pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN;
......@@ -944,6 +949,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
blockDataDestroy(pResBlock);
}
}
}
taosArrayDestroy(pParInfo->rowIds);
pParInfo->rowIds = NULL;
blockDataUpdateTsWindow(pDest, pInfo->tsColIndex);
......
......@@ -163,8 +163,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
STableScanInfo* pTableScanInfo = pOperator->info;
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, buf,
GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
if (p1 == NULL) {
return NULL;
......@@ -1312,6 +1312,7 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
varDataSetLen(tbname, strlen(varDataVal(tbname)));
tdbFree(parTbname);
}
appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
tbname[0] == 0 ? NULL : tbname);
......@@ -1928,6 +1929,7 @@ FETCH_NEXT_BLOCK:
if (pInfo->validBlockIndex >= totBlockNum) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo);
qDebug("stream scan return empty, consume block %d", totBlockNum);
return NULL;
}
......@@ -2562,7 +2564,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
uint32_t status = 0;
loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -2893,7 +2895,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
goto _error;
}
initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->pResBlock = createResDataBlock(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
......
......@@ -47,7 +47,6 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
SEpSet* pEpSet);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
void streamFreeQitem(SStreamQueueItem* data);
#ifdef __cplusplus
}
......
......@@ -45,3 +45,59 @@ void streamQueueClose(SStreamQueue* queue) {
taosCloseQueue(queue->queue);
taosMemoryFree(queue);
}
bool streamQueueResEmpty(const SStreamQueueRes* pRes) {
//
return true;
}
int64_t streamQueueResSize(const SStreamQueueRes* pRes) { return pRes->size; }
SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes) { return pRes->head; }
SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes) {
SStreamQueueNode* pRet = pRes->head;
pRes->head = pRes->head->next;
return pRet;
}
void streamQueueResClear(SStreamQueueRes* pRes) {
while (pRes->head) {
SStreamQueueNode* pNode = pRes->head;
streamFreeQitem(pRes->head->item);
pRes->head = pNode;
}
}
SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pTail) {
int64_t size = 0;
SStreamQueueNode* head = NULL;
while (pTail) {
SStreamQueueNode* pTmp = pTail->next;
pTail->next = head;
head = pTail;
pTail = pTmp;
size++;
}
return (SStreamQueueRes){.head = head, .size = size};
}
bool streamQueueHasTask(const SStreamQueue1* pQueue) { return atomic_load_ptr(pQueue->pHead); }
int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem) {
SStreamQueueNode* pNode = taosMemoryMalloc(sizeof(SStreamQueueNode));
pNode->item = pItem;
SStreamQueueNode* pHead = atomic_load_ptr(pQueue->pHead);
while (1) {
pNode->next = pHead;
SStreamQueueNode* pOld = atomic_val_compare_exchange_ptr(pQueue->pHead, pHead, pNode);
if (pOld == pHead) {
break;
}
}
return 0;
}
SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
SStreamQueueNode* pNode = atomic_exchange_ptr(pQueue->pHead, NULL);
if (pNode) return streamQueueBuildRes(pNode);
return (SStreamQueueRes){0};
}
......@@ -195,7 +195,7 @@ sql select * from information_schema.ins_stables
if $rows != 1 then
return -1
endi
#sql select * from performance_schema.perf_streams
#sql select * frominformation_schema.ins_streams
sql select * from information_schema.ins_tables
if $rows <= 0 then
return -1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册