提交 069778c6 编写于 作者: 5 54liuyao

feat(stream): add log

上级 e4152b43
...@@ -1752,7 +1752,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -1752,7 +1752,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
int32_t len = 0; int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, "%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld|======\n", "dumpBlockData", len += snprintf(dumpBuf + len, size - len, "===stream===%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld|\n", flag,
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
pDataBlock->info.uid); pDataBlock->info.uid);
if (len >= size - 1) return dumpBuf; if (len >= size - 1) return dumpBuf;
......
...@@ -1353,13 +1353,13 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, ...@@ -1353,13 +1353,13 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
if (chIds && pPullDataMap) { if (chIds && pPullDataMap) {
SArray* chAy = *(SArray**)chIds; SArray* chAy = *(SArray**)chIds;
int32_t size = taosArrayGetSize(chAy); int32_t size = taosArrayGetSize(chAy);
qDebug("window %" PRId64 " wait child size:%d", win.skey, size); qDebug("===stream===window %" PRId64 " wait child size:%d", win.skey, size);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
qDebug("window %" PRId64 " wait child id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i)); qDebug("===stream===window %" PRId64 " wait child id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i));
} }
continue; continue;
} else if (pPullDataMap) { } else if (pPullDataMap) {
qDebug("close window %" PRId64, win.skey); qDebug("===stream===close window %" PRId64, win.skey);
} }
SResultRowPosition* pPos = (SResultRowPosition*)pIte; SResultRowPosition* pPos = (SResultRowPosition*)pIte;
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
...@@ -2482,7 +2482,9 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2482,7 +2482,9 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId}; SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId};
// add pull data request // add pull data request
taosArrayPush(pInfo->pPullWins, &pull); taosArrayPush(pInfo->pPullWins, &pull);
addPullWindow(pInfo->pPullDataMap, &winRes, taosArrayGetSize(pInfo->pChildren)); int32_t size = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, &winRes, size);
qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
} else { } else {
int32_t index = -1; int32_t index = -1;
SArray* chArray = NULL; SArray* chArray = NULL;
...@@ -2492,14 +2494,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2492,14 +2494,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
chId = getChildIndex(pSDataBlock); chId = getChildIndex(pSDataBlock);
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
} }
if (index != -1 && pSDataBlock->info.type == STREAM_PULL_DATA) { // if (index != -1 && pSDataBlock->info.type == STREAM_PULL_DATA) {
qDebug("======delete child id %d", chId); // qDebug("===stream===delete child id %d", chId);
taosArrayRemove(chArray, index); // taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) { // if (taosArrayGetSize(chArray) == 0) {
// pull data is over // // pull data is over
taosHashRemove(pInfo->pPullDataMap, &winRes, sizeof(SWinRes)); // taosHashRemove(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
} // }
} // }
if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) { if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) {
ignore = false; ignore = false;
} }
...@@ -2623,6 +2625,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { ...@@ -2623,6 +2625,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
SArray* chArray = *(SArray**)chIds; SArray* chArray = *(SArray**)chIds;
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
if (index != -1) { if (index != -1) {
qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId);
taosArrayRemove(chArray, index); taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) { if (taosArrayGetSize(chArray) == 0) {
// pull data is over // pull data is over
...@@ -2641,7 +2644,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2641,7 +2644,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
...@@ -2657,18 +2660,18 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2657,18 +2660,18 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
return NULL; return NULL;
} }
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} else { } else {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows != 0) { if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) { if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false; pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo)); ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
// process the rest of the data // process the rest of the data
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
...@@ -2676,13 +2679,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2676,13 +2679,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->pPullDataRes->info.rows != 0) { if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
ASSERT(IS_FINAL_OP(pInfo)); ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pPullDataRes; return pInfo->pPullDataRes;
} }
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) { if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
} }
...@@ -2693,10 +2696,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2693,10 +2696,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
clearSpecialDataBlock(pInfo->pUpdateRes); clearSpecialDataBlock(pInfo->pUpdateRes);
removeDeleteResults(pUpdated, pInfo->pDelWins); removeDeleteResults(pUpdated, pInfo->pDelWins);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
break; break;
} }
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv"); printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv");
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA || if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA ||
...@@ -2771,6 +2774,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2771,6 +2774,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pTmpInfo = pChildOp->info; SStreamFinalIntervalOperatorInfo* pTmpInfo = pChildOp->info;
pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
taosArrayPush(pInfo->pChildren, &pChildOp); taosArrayPush(pInfo->pChildren, &pChildOp);
qDebug("===stream===add child, id:%d", chIndex);
} }
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
...@@ -2795,14 +2799,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2795,14 +2799,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows != 0) { if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) { if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false; pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo)); ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
// process the rest of the data // process the rest of the data
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
...@@ -2811,14 +2815,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2811,14 +2815,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->pPullDataRes->info.rows != 0) { if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
ASSERT(IS_FINAL_OP(pInfo)); ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pPullDataRes; return pInfo->pPullDataRes;
} }
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) { if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
// ASSERT(false); // ASSERT(false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册