提交 c8b180b1 编写于 作者: H Haojun Liao

fix(query): update the cached last query.

上级 109d50a4
......@@ -29,48 +29,37 @@ typedef struct SCacheRowsReader {
SArray* pTableList; // table id list
} SCacheRowsReader;
static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) {
static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
SFirstLastRes** pRes) {
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
int32_t numOfRows = pBlock->info.rows;
SColVal colVal = {0};
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
SFirstLastRes *pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + TSDB_KEYSIZE);
SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, i);
if (slotIds[i] == -1) {
pRes->ts = pColVal->ts;
pRes->bytes = TSDB_KEYSIZE;
pRes->isNull = false;
pRes->hasResult = true;
colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false);
if (slotIds[i] == -1) { // the primary timestamp
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
pRes[i]->ts = pColVal->ts;
memcpy(pRes[i]->buf, &pColVal->ts, TSDB_KEYSIZE);
} else {
int32_t slotId = slotIds[i];
int32_t slotId = slotIds[i];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
int32_t bytes = pReader->pSchema->columns[slotId].bytes;
pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes);
pRes->bytes = bytes;
pRes->hasResult = true;
pRes[i]->ts = pColVal->ts;
pRes[i]->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
if (IS_VAR_DATA_TYPE(colVal.type)) {
if (!COL_VAL_IS_VALUE(&colVal)) {
pRes->isNull = true;
pRes->ts = pColVal->ts;
colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false);
if (!pRes[i]->isNull) {
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
varDataSetLen(pRes[i]->buf, pColVal->colVal.value.nData);
memcpy(varDataVal(pRes[i]->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
} else {
varDataSetLen(pRes->buf, colVal.value.nData);
memcpy(varDataVal(pRes->buf), colVal.value.pData, colVal.value.nData);
pRes->bytes = colVal.value.nData;
colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false);
memcpy(pRes[i]->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
}
} else {
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&colVal));
}
}
pRes[i]->hasResult = true;
colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
}
pBlock->info.rows += 1;
......@@ -142,7 +131,7 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint
}
// no data in the table of Uid
if (*h != NULL) { // todo convert to SArray
if (*h != NULL) {
*pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
}
} else {
......@@ -172,15 +161,16 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
LRUHandle* h = NULL;
SArray* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList);
bool hasRes = false;
int64_t* lastTs = taosMemoryMalloc(TSDB_KEYSIZE * pr->pSchema->numOfCols);
for(int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
lastTs[i] = INT64_MIN;
SFirstLastRes** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
for (int32_t j = 0; j < pr->numOfCols; ++j) {
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes);
pRes[j]->ts = INT64_MIN;
}
// retrieve the only one last row of all tables in the uid list.
if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) {
bool internalResult = false;
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
......@@ -194,18 +184,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
{
SFirstLastRes** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
for(int32_t j = 0; j < pr->numOfCols; ++j) {
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes);
pRes[j]->ts = INT64_MIN;
}
for (int32_t k = 0; k < pr->numOfCols; ++k) {
SColumnInfoData* pColInfoData = taosArrayGet(pResBlock->pDataBlock, k);
if (slotIds[k] == -1) { // the primary timestamp
SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, k);
if (slotIds[k] == -1) { // the primary timestamp
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k);
if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) {
hasRes = true;
pRes[k]->hasResult = true;
pRes[k]->ts = pColVal->ts;
memcpy(pRes[k]->buf, &pColVal->ts, TSDB_KEYSIZE);
......@@ -217,6 +202,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) {
hasRes = true;
pRes[k]->hasResult = true;
pRes[k]->ts = pColVal->ts;
......@@ -236,25 +222,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
}
/*
if (pRow->ts > lastKey) {
printf("qualified:%ld, old Value:%ld\n", pRow->ts, lastKey);
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not.
if (internalResult) {
pResBlock->info.rows -= 1;
taosArrayClear(pTableUidList);
}
saveOneRow(pRow, pResBlock, pr, slotIds);
taosArrayPush(pTableUidList, &pKeyInfo->uid);
internalResult = true;
lastKey = pRow->ts;
}
*/
tsdbCacheRelease(lruCache, h);
}
if (hasRes) {
pResBlock->info.rows = 1;
}
} else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
......@@ -267,9 +241,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
continue;
}
saveOneRow(pRow, pResBlock, pr, slotIds);
taosArrayPush(pTableUidList, &pKeyInfo->uid);
saveOneRow(pRow, pResBlock, pr, slotIds, pRes);
// TODO reset the pRes
taosArrayPush(pTableUidList, &pKeyInfo->uid);
tsdbCacheRelease(lruCache, h);
pr->tableIndex += 1;
......
......@@ -193,6 +193,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pInfo->currentGroupIndex += 1;
// check for tag values
// TODO NOTE: The uid of pInfo->pRes is required.
if (pInfo->pRes->info.rows > 0) {
if (pInfo->pseudoExprSup.numOfExprs > 0) {
SExprSupp* pSup = &pInfo->pseudoExprSup;
......
......@@ -1420,8 +1420,9 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* pGpDatas = (uint64_t*)pGpCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
SResultRowInfo dumyInfo;
SResultRowInfo dumyInfo = {0};
dumyInfo.cur.pageId = -1;
STimeWindow win = {0};
if (IS_FINAL_OP(pInfo)) {
win.skey = startTsCols[i];
......@@ -5828,27 +5829,30 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->isFinal = false;
if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
initResultSizeInfo(&pOperator->resultInfo, 4096);
SExprSupp* pSup = &pOperator->exprSupp;
initBasicInfo(&pInfo->binfo, pResBlock);
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initBasicInfo(&pInfo->binfo, pResBlock);
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
pInfo->invertible = allInvertible(pSup->pCtx, numOfCols);
pInfo->invertible = false;
......
......@@ -2402,7 +2402,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = cachedLastRowFunction,
.processFunc = lastFunctionMerge,
.finalizeFunc = firstLastFinalize
},
{
......
......@@ -6148,99 +6148,6 @@ int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes;
}
int32_t interpFunction(SqlFunctionCtx* pCtx) {
#if 0
int32_t fillType = (int32_t) pCtx->param[2].i64;
//bool ascQuery = (pCtx->order == TSDB_ORDER_ASC);
if (pCtx->start.key == pCtx->startTs) {
assert(pCtx->start.key != INT64_MIN);
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val);
goto interp_success_exit;
} else if (pCtx->end.key == pCtx->startTs && pCtx->end.key != INT64_MIN && fillType == TSDB_FILL_NEXT) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val);
goto interp_success_exit;
}
switch (fillType) {
case TSDB_FILL_NULL:
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
break;
case TSDB_FILL_SET_VALUE:
tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true);
break;
case TSDB_FILL_LINEAR:
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
goto interp_exit;
}
double v1 = -1, v2 = -1;
GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val);
GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val);
SPoint point1 = {.key = pCtx->start.key, .val = &v1};
SPoint point2 = {.key = pCtx->end.key, .val = &v2};
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
int32_t srcType = pCtx->inputType;
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
bool exceedMax = false, exceedMin = false;
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin);
if (exceedMax || exceedMin) {
__compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0);
if (func(&pCtx->start.val, &pCtx->end.val) <= 0) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val);
} else {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val);
}
}
}
break;
case TSDB_FILL_PREV:
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs) {
goto interp_exit;
}
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val);
break;
case TSDB_FILL_NEXT:
if (pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
goto interp_exit;
}
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val);
break;
case TSDB_FILL_NONE:
// do nothing
default:
goto interp_exit;
}
interp_success_exit:
*(TSKEY*)pCtx->ptsOutputBuf = pCtx->startTs;
INC_INIT_VAL(pCtx, 1);
interp_exit:
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
pCtx->endTs = pCtx->startTs;
#endif
return TSDB_CODE_SUCCESS;
}
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册