提交 7d8a2822 编写于 作者: 5 54liuyao

fix(stream):tqBuildDeleteReq error

上级 f2538701
...@@ -99,6 +99,8 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); ...@@ -99,6 +99,8 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
// char* streamStateSessionDump(SStreamState* pState);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -42,7 +42,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -42,7 +42,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
if (metaGetTableEntryByName(&mr, name) < 0) { if (metaGetTableEntryByName(&mr, name) < 0) {
metaReaderClear(&mr); metaReaderClear(&mr);
taosMemoryFree(name); taosMemoryFree(name);
return -1; continue;
} }
int64_t uid = mr.me.uid; int64_t uid = mr.me.uid;
......
...@@ -1299,6 +1299,9 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr ...@@ -1299,6 +1299,9 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SSessionKey startWin = {0}; SSessionKey startWin = {0};
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, &startWin); getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, &startWin);
if (IS_INVALID_SESSION_WIN_KEY(startWin)) { if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
// char* tmp = streamStateSessionDump(pInfo->windowSup.pStreamAggSup->pState);
// qInfo("%s", tmp);
// taosMemoryFree(tmp);
// window has been closed. // window has been closed.
continue; continue;
} }
......
...@@ -380,7 +380,7 @@ typedef struct SGroupKeyInfo { ...@@ -380,7 +380,7 @@ typedef struct SGroupKeyInfo {
numOfElem += 1; \ numOfElem += 1; \
pStddevRes->count -= 1; \ pStddevRes->count -= 1; \
sumT -= plist[i]; \ sumT -= plist[i]; \
pStddevRes->quadraticISum -= plist[i] * plist[i]; \ pStddevRes->quadraticISum -= (int64_t)(plist[i] * plist[i]); \
} \ } \
} while (0) } while (0)
...@@ -2526,8 +2526,9 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) { ...@@ -2526,8 +2526,9 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
// might be a race condition here that pHisto can be overwritten or setup function // might be a race condition here that pHisto can be overwritten or setup function
// has not been called, need to relink the buffer pHisto points to. // has not been called, need to relink the buffer pHisto points to.
buildHistogramInfo(pInfo); buildHistogramInfo(pInfo);
qDebug("%s before add %d elements into histogram, total:%" PRId64 ", numOfEntry:%d, pHisto:%p, elems: %p", __FUNCTION__, qDebug("%s before add %d elements into histogram, total:%" PRId64 ", numOfEntry:%d, pHisto:%p, elems: %p",
numOfElems, pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries, pInfo->pHisto, pInfo->pHisto->elems); __FUNCTION__, numOfElems, pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries, pInfo->pHisto,
pInfo->pHisto->elems);
for (int32_t i = start; i < pInput->numOfRows + start; ++i) { for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
if (colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
......
...@@ -497,7 +497,7 @@ SStreamStateCur* streamStateSessionGetRanomCur(SStreamState* pState, const SSess ...@@ -497,7 +497,7 @@ SStreamStateCur* streamStateSessionGetRanomCur(SStreamState* pState, const SSess
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL); tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL);
int32_t c = 0; int32_t c = -2;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c); tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c);
if (c != 0) { if (c != 0) {
...@@ -727,3 +727,43 @@ _end: ...@@ -727,3 +727,43 @@ _end:
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return res; return res;
} }
#if 0
char* streamStateSessionDump(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
tdbTbcMoveToFirst(pCur->pCur);
SSessionKey key = {0};
int32_t code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
if (code != 0) {
return NULL;
}
int32_t size = 2048;
char* dumpBuf = taosMemoryCalloc(size, 1);
int64_t len = 0;
len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
while (1) {
tdbTbcMoveToNext(pCur->pCur);
key = (SSessionKey){0};
code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
if (code != 0) {
return dumpBuf;
}
len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
}
return dumpBuf;
}
#endif
...@@ -599,12 +599,62 @@ if $loop_count == 20 then ...@@ -599,12 +599,62 @@ if $loop_count == 20 then
endi endi
if $rows != 4 then if $rows != 4 then
print =====rows=$rows print ======rows=$rows
print =====rows=$rows goto loop15
print =====rows=$rows endi
#goto loop15
sql insert into t1 values(1648791223001,11,2,3,1.0);
sql insert into t2 values(1648791223001,21,2,3,1.0);
sql insert into t3 values(1648791223001,31,2,3,1.0);
sql insert into t4 values(1648791223001,41,2,3,1.0);
sleep 300
sql delete from st where ts = 1648791223001;
$loop_count = 0
loop16:
sleep 50
sql select * from test.streamt5 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
print ======rows=$rows
return -1
endi
if $rows != 4 then
goto loop16
endi
sql insert into t1 values(1648791223001,12,2,3,1.0);
sql insert into t2 values(1648791223001,22,2,3,1.0);
sql insert into t3 values(1648791223001,32,2,3,1.0);
sql insert into t4 values(1648791223001,42,2,3,1.0);
sleep 300
sql delete from st where ts = 1648791223001;
$loop_count = 0
loop17:
sleep 50
sql select * from test.streamt5 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi endi
if $rows != 4 then
print ======rows=$rows
print ======rows=$rows
print ======rows=$rows
return 1
#goto loop17
endi
$loop_all = $loop_all + 1 $loop_all = $loop_all + 1
print ============loop_all=$loop_all print ============loop_all=$loop_all
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册