提交 b8ac811f 编写于 作者: L liuyao

fix bug

上级 5850a3ab
...@@ -88,6 +88,8 @@ int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal); ...@@ -88,6 +88,8 @@ int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
int32_t streamStateDel(SStreamState* pState, const SWinKey* key); int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateClear(SStreamState* pState); int32_t streamStateClear(SStreamState* pState);
void streamStateSetNumber(SStreamState* pState, int32_t number); void streamStateSetNumber(SStreamState* pState, int32_t number);
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen);
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen); int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
......
...@@ -36,6 +36,7 @@ int32_t scanDebug = 0; ...@@ -36,6 +36,7 @@ int32_t scanDebug = 0;
#define MULTI_READER_MAX_TABLE_NUM 5000 #define MULTI_READER_MAX_TABLE_NUM 5000
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
#define STREAM_SCAN_OP_NAME "StreamScanOperator"
typedef struct STableMergeScanExecInfo { typedef struct STableMergeScanExecInfo {
SFileBlockLoadRecorder blockRecorder; SFileBlockLoadRecorder blockRecorder;
...@@ -1771,7 +1772,7 @@ int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) { ...@@ -1771,7 +1772,7 @@ int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
// other properties are recovered from the execution plan // other properties are recovered from the execution plan
void streamScanOperatorDeocde(void* pBuff, int32_t len, SStreamScanInfo* pInfo) { void streamScanOperatorDeocde(void* pBuff, int32_t len, SStreamScanInfo* pInfo) {
if (!pBuff) { if (!pBuff || len == 0) {
return; return;
} }
...@@ -2054,10 +2055,12 @@ FETCH_NEXT_BLOCK: ...@@ -2054,10 +2055,12 @@ FETCH_NEXT_BLOCK:
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo); doClearBufferedBlocks(pInfo);
qDebug("stream scan return empty, consume block %d", totBlockNum); qDebug("stream scan return empty, consume block %d", totBlockNum);
// void* buff = NULL; void* buff = NULL;
// int32_t len = streamScanOperatorEncode(pInfo, &buff); int32_t len = streamScanOperatorEncode(pInfo, &buff);
// todo(liuyao) save buff if (len > 0) {
// taosMemoryFreeClear(buff); streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), buff, len);
}
taosMemoryFreeClear(buff);
return NULL; return NULL;
} }
...@@ -2484,12 +2487,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2484,12 +2487,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->twAggSup.maxTs = INT64_MIN; pInfo->twAggSup.maxTs = INT64_MIN;
pInfo->pState = NULL; pInfo->pState = NULL;
// todo(liuyao) get buff from rocks db;
void* buff = NULL; void* buff = NULL;
int32_t len = 0; int32_t len = 0;
streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), &buff, &len);
streamScanOperatorDeocde(buff, len, pInfo); streamScanOperatorDeocde(buff, len, pInfo);
setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo); pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
......
...@@ -2846,6 +2846,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin ...@@ -2846,6 +2846,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
} }
SStreamScanInfo* pScanInfo = downstream->info; SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
pScanInfo->pState = pAggSup->pState;
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark); pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
} }
......
...@@ -395,6 +395,32 @@ int32_t streamStateClear(SStreamState* pState) { ...@@ -395,6 +395,32 @@ int32_t streamStateClear(SStreamState* pState) {
void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; } void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
#ifdef USE_ROCKSDB
int32_t code = 0;
void* batch = streamStateCreateBatch();
code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen);
if (code != 0) {
return code;
}
code = streamStatePutBatch_rocksdb(pState, batch);
streamStateDestroyBatch(batch);
return code;
#else
return 0;
#endif
}
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen) {
#ifdef USE_ROCKSDB
int32_t code = 0;
code = streamDefaultGet_rocksdb(pState, pKey, pVal, pLen);
return code;
#else
return 0;
#endif
}
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
return streamStateGet(pState, key, pVal, pVLen); return streamStateGet(pState, key, pVal, pVLen);
...@@ -1066,7 +1092,7 @@ void streamStateDestroy(SStreamState* pState) { ...@@ -1066,7 +1092,7 @@ void streamStateDestroy(SStreamState* pState) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
streamFileStateDestroy(pState->pFileState); streamFileStateDestroy(pState->pFileState);
streamStateDestroy_rocksdb(pState); streamStateDestroy_rocksdb(pState);
taosMemoryFreeClear(pState->parNameMap); tSimpleHashCleanup(pState->parNameMap);
// do nothong // do nothong
#endif #endif
taosMemoryFreeClear(pState->pTdbState); taosMemoryFreeClear(pState->pTdbState);
......
...@@ -274,7 +274,10 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) { ...@@ -274,7 +274,10 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) {
} }
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) { int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) {
ASSERT(pInfo); if(!pInfo) {
return 0;
}
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
......
...@@ -48,23 +48,34 @@ sleep 100 ...@@ -48,23 +48,34 @@ sleep 100
#=================================================================== #===================================================================
print =============== query data from child table print =============== query data from child table
$loop_count = 0
loop0:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb
print rows: $rows print rows: $rows
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03
if $rows != 1 then if $rows != 1 then
return -1 goto loop0
endi endi
if $data01 != 234 then if $data01 != 234 then
return -1 goto loop0
endi endi
if $data02 != 234 then if $data02 != 234 then
return -1 goto loop0
endi endi
if $data03 != 234 then if $data03 != 234 then
return -1 goto loop0
endi endi
#=================================================================== #===================================================================
...@@ -77,36 +88,47 @@ sleep 100 ...@@ -77,36 +88,47 @@ sleep 100
#=================================================================== #===================================================================
print =============== query data from child table print =============== query data from child table
$loop_count = 0
loop1:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb
print rows: $rows print rows: $rows
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13 print $data10 $data11 $data12 $data13
if $rows != 2 then if $rows != 2 then
return -1 goto loop1
endi endi
if $data01 != 234 then if $data01 != 234 then
return -1 goto loop1
endi endi
if $data02 != 234 then if $data02 != 234 then
return -1 goto loop1
endi endi
if $data03 != 234 then if $data03 != 234 then
return -1 goto loop1
endi endi
if $data11 != -111 then if $data11 != -111 then
return -1 goto loop1
endi endi
if $data12 != -111 then if $data12 != -111 then
return -1 goto loop1
endi endi
if $data13 != -111 then if $data13 != -111 then
return -1 goto loop1
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册