提交 23fc0f00 编写于 作者: M Minglei Jin

tsdbCache/last: save ts info of each column

上级 f9c89fdd
......@@ -245,6 +245,8 @@ int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
// structs =======================
typedef struct {
int minFid;
......
......@@ -59,6 +59,8 @@ static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) { taosMemoryFree(value); }
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) { taosArrayDestroy(value); }
static int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
char key[32] = {0};
......@@ -761,7 +763,6 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
for (int i = 0; i < nMax; ++i) {
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
// bool deleted = false;
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
if (!deleted) {
// iMerge[nMerge] = i;
......@@ -818,12 +819,22 @@ _err:
return code;
}
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
int32_t code = 0;
typedef struct {
TSKEY ts;
SColVal colVal;
} SLastCol;
// static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
int32_t code = 0;
SArray *pSkyline = NULL;
STSRow *pRow = NULL;
STSRow **ppRow = &pRow;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
int16_t nCol = pTSchema->numOfCols;
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
// SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
SArray *pColArray = taosArrayInit(nCol, sizeof(SLastCol));
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
......@@ -837,9 +848,9 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
}
*ppRow = NULL;
*ppLastArray = NULL;
SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
SDelIdx delIdx;
......@@ -943,7 +954,6 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
for (int i = 0; i < nMax; ++i) {
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
// bool deleted = false;
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
if (!deleted) {
iMerge[nMerge] = iMax[i];
......@@ -970,8 +980,9 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
tRowMergerClear(&merger);
}
} else {
*ppRow = NULL;
return code;
/* *ppRow = NULL; */
/* return code; */
continue;
}
if (iCol == 0) {
......@@ -980,7 +991,8 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = maxKey});
if (taosArrayPush(pColArray, pColVal) == NULL) {
// if (taosArrayPush(pColArray, pColVal) == NULL) {
if (taosArrayPush(pColArray, &(SLastCol){.ts = TSKEY_MAX, .colVal = *pColVal}) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......@@ -991,7 +1003,8 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
for (int16_t i = iCol; i < nCol; ++i) {
// tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal);
tTSRowGetVal(*ppRow, pTSchema, i, pColVal);
if (taosArrayPush(pColArray, pColVal) == NULL) {
// if (taosArrayPush(pColArray, pColVal) == NULL) {
if (taosArrayPush(pColArray, &(SLastCol){.ts = maxKey, .colVal = *pColVal}) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......@@ -1012,11 +1025,11 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
--nilColCount;
}
}
/*
if (*ppRow) {
taosMemoryFreeClear(*ppRow);
}
*/
continue;
}
......@@ -1024,12 +1037,16 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
for (int16_t i = iCol; i < nCol; ++i) {
SColVal colVal = {0};
tTSRowGetVal(*ppRow, pTSchema, i, &colVal);
TSKEY rowTs = (*ppRow)->ts;
SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, i);
// SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, i);
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pColArray, i);
SColVal *tColVal = &tTsVal->colVal;
if (!colVal.isNone && !colVal.isNull) {
if (tColVal->isNull || tColVal->isNone) {
taosArraySet(pColArray, i, &colVal);
// taosArraySet(pColArray, i, &colVal);
taosArraySet(pColArray, i, &(SLastCol){.ts = rowTs, .colVal = colVal});
--nilColCount;
}
} else {
......@@ -1054,16 +1071,45 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
} while (nilColCount > 0);
// if () new ts row from pColArray if non empty
if (taosArrayGetSize(pColArray) == nCol) {
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
if (code) goto _err;
/* if (taosArrayGetSize(pColArray) == nCol) { */
/* code = tdSTSRowNew(pColArray, pTSchema, ppRow); */
/* if (code) goto _err; */
/* } */
/* taosArrayDestroy(pColArray); */
if (taosArrayGetSize(pColArray) <= 0) {
*ppLastArray = NULL;
taosArrayDestroy(pColArray);
} else {
*ppLastArray = pColArray;
}
if (*ppRow) {
taosMemoryFreeClear(*ppRow);
}
for (int i = 0; i < 3; ++i) {
if (input[i].nextRowClearFn) {
input[i].nextRowClearFn(input[i].iter);
}
}
if (pSkyline) {
taosArrayDestroy(pSkyline);
}
taosArrayDestroy(pColArray);
taosMemoryFreeClear(pTSchema);
return code;
_err:
taosArrayDestroy(pColArray);
if (*ppRow) {
taosMemoryFreeClear(*ppRow);
}
for (int i = 0; i < 3; ++i) {
if (input[i].nextRowClearFn) {
input[i].nextRowClearFn(input[i].iter);
}
}
if (pSkyline) {
taosArrayDestroy(pSkyline);
}
taosMemoryFreeClear(pTSchema);
tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
......@@ -1103,6 +1149,30 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
return code;
}
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pTSchema) {
int32_t code = 0;
int16_t nCol = taosArrayGetSize(pLastArray);
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLastArray, iCol);
SColVal *tColVal = &tTsVal->colVal;
taosArrayPush(pColArray, tColVal);
}
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
if (code) goto _err;
taosArrayDestroy(pColArray);
return code;
_err:
taosArrayDestroy(pColArray);
return code;
}
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) {
int32_t code = 0;
char key[32] = {0};
......@@ -1115,17 +1185,20 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
} else {
STSRow *pRow = NULL;
code = mergeLast(uid, pTsdb, &pRow);
// STSRow *pRow = NULL;
// code = mergeLast(uid, pTsdb, &pRow);
SArray *pLastArray = NULL;
code = mergeLast(uid, pTsdb, &pLastArray);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
// if (code < 0 || pRow == NULL) {
if (code < 0 || pLastArray == NULL) {
*handle = NULL;
return 0;
}
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
_taos_lru_deleter_t deleter = deleteTableCacheLast;
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW);
taosLRUCacheInsert(pCache, key, keyLen, pLastArray, pLastArray->capacity, deleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
......
......@@ -128,6 +128,8 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
}
pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
// SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h);
// tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema);
if (pRow->ts > lastKey) {
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not.
......@@ -140,6 +142,7 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
lastKey = pRow->ts;
}
// taosMemoryFree(pRow);
tsdbCacheRelease(lruCache, h);
}
} else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) {
......@@ -158,8 +161,12 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
}
pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
// SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h);
// tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema);
saveOneRow(pRow, pResBlock, pr, slotIds);
// taosMemoryFree(pRow);
tsdbCacheRelease(lruCache, h);
pr->tableIndex += 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册