提交 da9df994 编写于 作者: H Haojun Liao

fix(query): opt last query.

上级 2c656a2c
......@@ -78,6 +78,27 @@ typedef struct {
int32_t exprIdx;
} STupleKey;
typedef struct STuplePos {
union {
struct {
int32_t pageId;
int32_t offset;
};
STupleKey streamTupleKey;
};
} STuplePos;
typedef struct SFirstLastRes {
bool hasResult;
// used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So,
// this attribute is required
bool isNull;
int32_t bytes;
int64_t ts;
STuplePos pos;
char buf[];
} SFirstLastRes;
static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
STupleKey* pTuple1 = (STupleKey*)pKey1;
STupleKey* pTuple2 = (STupleKey*)pKey2;
......
......@@ -306,6 +306,11 @@ int32_t tsdbMerge(STsdb *pTsdb);
#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0)
// tsdbCache ==============================================================================================
typedef struct {
TSKEY ts;
SColVal colVal;
} SLastCol;
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb);
......
......@@ -15,11 +15,6 @@
#include "tsdb.h"
typedef struct {
TSKEY ts;
SColVal colVal;
} SLastCol;
int32_t tsdbOpenCache(STsdb *pTsdb) {
int32_t code = 0;
SLRUCache *pCache = NULL;
......
......@@ -29,7 +29,7 @@ typedef struct SCacheRowsReader {
SArray* pTableList; // table id list
} SCacheRowsReader;
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) {
static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) {
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
int32_t numOfRows = pBlock->info.rows;
......@@ -37,20 +37,35 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
SFirstLastRes *pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + TSDB_KEYSIZE);
SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, i);
if (slotIds[i] == -1) {
colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false);
pRes->ts = pColVal->ts;
pRes->bytes = TSDB_KEYSIZE;
pRes->isNull = false;
pRes->hasResult = true;
colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false);
} else {
int32_t slotId = slotIds[i];
tTSRowGetVal(pRow, pReader->pSchema, slotId, &colVal);
int32_t bytes = pReader->pSchema->columns[slotId].bytes;
pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes);
pRes->bytes = bytes;
pRes->hasResult = true;
if (IS_VAR_DATA_TYPE(colVal.type)) {
if (!COL_VAL_IS_VALUE(&colVal)) {
colDataAppendNULL(pColInfoData, numOfRows);
pRes->isNull = true;
pRes->ts = pColVal->ts;
colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false);
} else {
varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData);
memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData);
colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
varDataSetLen(pRes->buf, colVal.value.nData);
memcpy(varDataVal(pRes->buf), colVal.value.pData, colVal.value.nData);
pRes->bytes = colVal.value.nData;
colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false);
}
} else {
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&colVal));
......@@ -117,7 +132,7 @@ void* tsdbCacherowsReaderClose(void* pReader) {
return NULL;
}
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, STSRow** pRow,
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
LRUHandle** h) {
int32_t code = TSDB_CODE_SUCCESS;
if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) {
......@@ -127,9 +142,8 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint
}
// no data in the table of Uid
if (*h != NULL) {
//*pRow = (STSRow*)taosLRUCacheValue(lruCache, *h);
SArray* pLastrow = (SArray*)taosLRUCacheValue(lruCache, *h);
if (*h != NULL) { // todo convert to SArray
*pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
}
} else {
code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h);
......@@ -139,8 +153,7 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint
// no data in the table of Uid
if (*h != NULL) {
SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, *h);
tsdbCacheLastArray2Row(pLast, pRow, pr->pSchema);
*pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
}
}
......@@ -157,13 +170,17 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
int32_t code = TSDB_CODE_SUCCESS;
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
LRUHandle* h = NULL;
STSRow* pRow = NULL;
SArray* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList);
int64_t* lastTs = taosMemoryMalloc(TSDB_KEYSIZE * pr->pSchema->numOfCols);
for(int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
lastTs[i] = INT64_MIN;
}
// retrieve the only one last row of all tables in the uid list.
if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) {
int64_t lastKey = INT64_MIN;
bool internalResult = false;
bool internalResult = false;
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
......@@ -176,7 +193,53 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
continue;
}
{
SFirstLastRes** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
for(int32_t j = 0; j < pr->numOfCols; ++j) {
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes);
pRes[j]->ts = INT64_MIN;
}
for (int32_t k = 0; k < pr->numOfCols; ++k) {
SColumnInfoData* pColInfoData = taosArrayGet(pResBlock->pDataBlock, k);
if (slotIds[k] == -1) { // the primary timestamp
SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, k);
if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) {
pRes[k]->hasResult = true;
pRes[k]->ts = pColVal->ts;
memcpy(pRes[k]->buf, &pColVal->ts, TSDB_KEYSIZE);
colDataAppend(pColInfoData, 1, (const char*)pRes[k], false);
}
} else {
int32_t slotId = slotIds[k];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) {
pRes[k]->hasResult = true;
pRes[k]->ts = pColVal->ts;
pRes[k]->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
if (!pRes[k]->isNull) {
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
varDataSetLen(pRes[k]->buf, pColVal->colVal.value.nData);
memcpy(varDataVal(pRes[k]->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
} else {
memcpy(pRes[k]->buf, &pColVal->colVal.value, pr->pSchema->columns[slotId].bytes);
}
}
colDataAppend(pColInfoData, 1, (const char*)pRes[k], false);
}
}
}
}
/*
if (pRow->ts > lastKey) {
printf("qualified:%ld, old Value:%ld\n", pRow->ts, lastKey);
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not.
if (internalResult) {
......@@ -189,7 +252,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
internalResult = true;
lastKey = pRow->ts;
}
*/
tsdbCacheRelease(lruCache, h);
}
} else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
......
......@@ -57,16 +57,6 @@ typedef struct SAvgRes {
int16_t type; // store the original input type, used in merge function
} SAvgRes;
typedef struct STuplePos {
union {
struct {
int32_t pageId;
int32_t offset;
};
STupleKey streamTupleKey;
};
} STuplePos;
typedef struct SMinmaxResInfo {
bool assign; // assign the first value or not
int64_t v;
......@@ -93,17 +83,6 @@ typedef struct STopBotRes {
STopBotResItem* pItems;
} STopBotRes;
typedef struct SFirstLastRes {
bool hasResult;
// used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So,
// this attribute is required
bool isNull;
int32_t bytes;
int64_t ts;
STuplePos pos;
char buf[];
} SFirstLastRes;
typedef struct SStddevRes {
double result;
int64_t count;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册