提交 2cc2e5d5 编写于 作者: dengyihao's avatar dengyihao

add backend

上级 874b6916
......@@ -1065,12 +1065,9 @@ void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExpr
bool returnNotNull = false;
for (int32_t j = 0; j < numOfExprs; ++j) {
SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
qWarn("offset: idx: %d, val: %d", j, rowEntryOffset[j]);
if (!isRowEntryInitialized(pResInfo)) {
qWarn("no result");
continue;
} else {
qWarn("has result");
}
if (pRow->numOfRows < pResInfo->numOfRes) {
......
......@@ -2352,8 +2352,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
SResultRow* pResult = NULL;
int32_t forwardRows = 0;
int stepTrace = 0;
qWarn("step1 %d", stepTrace++);
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
......@@ -2366,17 +2364,14 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
}
while (1) {
qWarn("step1 %d", stepTrace++);
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
qWarn("step1 %d", stepTrace++);
break;
}
continue;
}
qWarn("step1 %d", stepTrace++);
if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) {
bool ignore = true;
......@@ -2407,7 +2402,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
ignore = false;
}
}
qWarn("step1 %d", stepTrace++);
if (ignore) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
......@@ -2417,27 +2411,22 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
continue;
}
}
qWarn("step1 %d", stepTrace++);
int32_t code = setOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
qWarn("step1 %d", stepTrace++);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
qWarn("step1 %d", stepTrace++);
if (IS_FINAL_OP(pInfo)) {
forwardRows = 1;
} else {
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
NULL, TSDB_ORDER_ASC);
}
qWarn("step1 %d", stepTrace++);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap);
}
qWarn("step1 %d", stepTrace++);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
SWinKey key = {
.ts = pResult->win.skey,
......@@ -2446,7 +2435,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0);
}
qWarn("step1 %d", stepTrace++);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pSDataBlock->info.rows, numOfOutput);
......@@ -2455,7 +2443,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
.groupId = groupId,
};
qWarn("step1 %d", stepTrace++);
saveOutputBuf(pInfo->pState, &key, pResult, pInfo->aggSup.resultRowSize);
releaseOutputBuf(pInfo->pState, &key, pResult);
if (pInfo->delKey.ts > key.ts) {
......@@ -2474,7 +2461,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
}
}
qWarn("step1 %d", stepTrace++);
if (IS_FINAL_OP(pInfo)) {
startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos);
......@@ -2483,7 +2469,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
}
if (startPos < 0) {
qWarn("step1 %d", stepTrace++);
break;
}
}
......
......@@ -333,6 +333,8 @@ int streamInitBackend(SStreamState* pState, char* path) {
pState->pTdbState->rocksdb = db;
pState->pTdbState->pHandle = cfHandle;
pState->pTdbState->wopts = rocksdb_writeoptions_create();
// rocksdb_writeoptions_
rocksdb_writeoptions_set_no_slowdown(pState->pTdbState->wopts, 1);
pState->pTdbState->ropts = rocksdb_readoptions_create();
return 0;
}
......@@ -341,6 +343,8 @@ void streamCleanBackend(SStreamState* pState) {
for (int i = 0; i < cfLen; i++) {
rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
}
rocksdb_writeoptions_destroy(pState->pTdbState->wopts);
rocksdb_readoptions_destroy(pState->pTdbState->ropts);
rocksdb_close(pState->pTdbState->rocksdb);
}
......@@ -352,7 +356,18 @@ int streamGetInit(const char* funcName) {
}
return -1;
}
bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) {
bool valid = false;
rocksdb_iter_seek(iter, buf, len);
if (!rocksdb_iter_valid(iter)) {
rocksdb_iter_seek_for_prev(iter, buf, len);
if (!rocksdb_iter_valid(iter)) {
return valid;
}
}
valid = true;
return valid;
}
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
do { \
code = 0; \
......@@ -542,14 +557,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
// char toString[128] = {0};
// stateSessionKeyToString(&sKey, toString);
// qWarn("streamState seek key %s", toString);
rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len);
if (valid == false) {
streamStateFreeCur(pCur);
return NULL;
}
int32_t c = 0;
......@@ -582,13 +593,18 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
rocksdb_iter_seek(pCur->iter, (const char*)buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
// rocksdb_iter_seek(pCur->iter, (const char*)buf, len);
// if (!rocksdb_iter_valid(pCur->iter)) {
// rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
// if (!rocksdb_iter_valid(pCur->iter)) {
// streamStateFreeCur(pCur);
// return NULL;
// }
// }
bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len);
if (valid == false) {
streamStateFreeCur(pCur);
return NULL;
}
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
......@@ -617,14 +633,13 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
char buf[128] = {0};
int len = stateSessionKeyEncode(&sKey, buf);
rocksdb_iter_seek(pCur->iter, (const char*)buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len);
if (valid == false) {
streamStateFreeCur(pCur);
return NULL;
}
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
......@@ -721,14 +736,13 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]);
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len);
if (valid == false) {
streamStateFreeCur(pCur);
return NULL;
}
if (rocksdb_iter_valid(pCur->iter)) {
size_t kLen;
SWinKey curKey;
......@@ -837,13 +851,10 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
int len = stateKeyEncode((void*)&sKey, buf);
rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
if (rocksdb_iter_valid(pCur->iter)) {
......@@ -871,14 +882,12 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
{
SWinKey curKey;
size_t kLen = 0;
......@@ -905,13 +914,9 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
{
......@@ -956,13 +961,10 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
int32_t c = 0;
char buf[128] = {0};
int len = stateSessionKeyEncode(&sKey, buf);
rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return -1;
}
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return -1;
}
int32_t kLen;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册