提交 26ebf798 编写于 作者: wmmhello's avatar wmmhello

rollbackfix error in mem order desc

上级 36837467
...@@ -5504,20 +5504,35 @@ static bool tail_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn ...@@ -5504,20 +5504,35 @@ static bool tail_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn
static void tail_function(SQLFunctionCtx *pCtx) { static void tail_function(SQLFunctionCtx *pCtx) {
STailInfo *pRes = getOutputInfo(pCtx); STailInfo *pRes = getOutputInfo(pCtx);
for (int32_t i = 0; i < pCtx->size; ++i) { if (pCtx->stableQuery){
if (pRes->offset++ < (int32_t)pCtx->param[1].i64){ for (int32_t i = 0; i < pCtx->size; ++i) {
continue; char *data = GET_INPUT_DATA(pCtx, i);
}
if (pRes->num >= (int32_t)pCtx->param[0].i64){ TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
break; do_tail_function_add(pRes, (int32_t)(pCtx->param[0].i64 + pCtx->param[1].i64), data, ts,
pCtx->inputBytes, &pCtx->tagInfo, NULL, pCtx->currentStage);
} }
char *data = GET_INPUT_DATA(pCtx, i); }else{
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
if (pRes->offset++ < (int32_t)pCtx->param[1].i64){
continue;
}
if (pRes->num >= (int32_t)pCtx->param[0].i64){ // query complete
pCtx->resultInfo->complete = true;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
SQLFunctionCtx *ctx = pCtx->tagInfo.pTagCtxList[j];
ctx->resultInfo->complete = true;
}
break;
}
char *data = GET_INPUT_DATA(pCtx, i);
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
valueTailAssign(pRes->res[pRes->num], pCtx->inputBytes, data, ts, &pCtx->tagInfo, NULL, pCtx->currentStage); valueTailAssign(pRes->res[pRes->num], pCtx->inputBytes, data, ts, &pCtx->tagInfo, NULL, pCtx->currentStage);
pRes->num++; pRes->num++;
}
} }
// treat the result as only one result // treat the result as only one result
...@@ -5534,7 +5549,7 @@ static void tail_func_merge(SQLFunctionCtx *pCtx) { ...@@ -5534,7 +5549,7 @@ static void tail_func_merge(SQLFunctionCtx *pCtx) {
// the intermediate result is binary, we only use the output data type // the intermediate result is binary, we only use the output data type
for (int32_t i = 0; i < pInput->num; ++i) { for (int32_t i = 0; i < pInput->num; ++i) {
do_tail_function_add(pOutput, (int32_t)pCtx->param[0].i64, pInput->res[i]->data, pInput->res[i]->timestamp, do_tail_function_add(pOutput, (int32_t)(pCtx->param[0].i64 + pCtx->param[1].i64), pInput->res[i]->data, pInput->res[i]->timestamp,
pCtx->outputBytes, &pCtx->tagInfo, pInput->res[i]->data + pCtx->outputBytes, pCtx->currentStage); pCtx->outputBytes, &pCtx->tagInfo, pInput->res[i]->data + pCtx->outputBytes, pCtx->currentStage);
} }
...@@ -5555,9 +5570,11 @@ static void tail_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -5555,9 +5570,11 @@ static void tail_func_finalizer(SQLFunctionCtx *pCtx) {
int32_t bytes = 0; int32_t bytes = 0;
int32_t type = 0; int32_t type = 0;
int32_t start = 0;
if (pCtx->currentStage == MERGE_STAGE) { if (pCtx->currentStage == MERGE_STAGE) {
bytes = pCtx->outputBytes; bytes = pCtx->outputBytes;
type = pCtx->outputType; type = pCtx->outputType;
start = pCtx->param[1].i64;
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
} else { } else {
bytes = pCtx->inputBytes; bytes = pCtx->inputBytes;
...@@ -5580,7 +5597,7 @@ static void tail_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -5580,7 +5597,7 @@ static void tail_func_finalizer(SQLFunctionCtx *pCtx) {
qError("calloc error in tail_func_finalizer: size:%d, num:%d", (int32_t)size, GET_RES_INFO(pCtx)->numOfRes); qError("calloc error in tail_func_finalizer: size:%d, num:%d", (int32_t)size, GET_RES_INFO(pCtx)->numOfRes);
return; return;
} }
for(int32_t start = pCtx->param[1].i64, i = 0; start < pRes->num; start++, i++){ for(int32_t i = 0; start < pRes->num; start++, i++){
memcpy(data + i * size, pRes->res[start], size); memcpy(data + i * size, pRes->res[start], size);
} }
taosqsort(data, (size_t)GET_RES_INFO(pCtx)->numOfRes, size, &support, sortCompareFn); taosqsort(data, (size_t)GET_RES_INFO(pCtx)->numOfRes, size, &support, sortCompareFn);
......
...@@ -38,10 +38,12 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo ...@@ -38,10 +38,12 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo
if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP || if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_SAMPLE || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_SAMPLE ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_HISTOGRAM || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_HISTOGRAM) {
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TAIL) {
return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64; return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64;
} }
if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TAIL) {
return (int32_t)(pQueryAttr->pExpr1[i].base.param[0].i64 + pQueryAttr->pExpr1[i].base.param[1].i64);
}
} }
if (pQueryAttr->uniqueQuery){ if (pQueryAttr->uniqueQuery){
return MAX_UNIQUE_RESULT_ROWS; return MAX_UNIQUE_RESULT_ROWS;
......
...@@ -1855,12 +1855,11 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, ...@@ -1855,12 +1855,11 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
while(i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) { while(i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
// if (ASCENDING_TRAVERSE(pQueryHandle->order)) { pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
// pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; } else {
// } else { pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
// pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; }
// }
int32_t colIdOfRow1; int32_t colIdOfRow1;
if(j >= numOfColsOfRow1) { if(j >= numOfColsOfRow1) {
...@@ -1991,12 +1990,11 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, ...@@ -1991,12 +1990,11 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
if(forceSetNull) { if(forceSetNull) {
while (i < numOfCols) { // the remain columns are all null data while (i < numOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
// if (ASCENDING_TRAVERSE(pQueryHandle->order)) { pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
// pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; } else {
// } else { pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
// pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; }
// }
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type); setVardataNull(pData, pColInfo->info.type);
...@@ -2342,7 +2340,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -2342,7 +2340,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
SWAP(cur->win.skey, cur->win.ekey, TSKEY); SWAP(cur->win.skey, cur->win.ekey, TSKEY);
} }
//moveDataToFront(pQueryHandle, numOfRows, numOfCols); moveDataToFront(pQueryHandle, numOfRows, numOfCols);
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos); updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
doCheckGeneratedBlockRange(pQueryHandle); doCheckGeneratedBlockRange(pQueryHandle);
...@@ -2980,14 +2978,14 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -2980,14 +2978,14 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
assert(numOfRows <= maxRowsToRead); assert(numOfRows <= maxRowsToRead);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer // if the buffer is not full in case of descending order query, move the data in the front of the buffer
// if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) { if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) {
// int32_t emptySize = maxRowsToRead - numOfRows; int32_t emptySize = maxRowsToRead - numOfRows;
//
// for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
// SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
// memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
// } }
// } }
int64_t elapsedTime = taosGetTimestampUs() - st; int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, 0x%"PRIx64, pQueryHandle, tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, 0x%"PRIx64, pQueryHandle,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册