提交 2ef8cd9a 编写于 作者: M Minglei Jin

fix: invalidate last cache when it's covered by delete range

上级 1711d646
......@@ -155,7 +155,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
/* tsdbCacheInsertLastrow(pCache, uid, row, dup); */
}
}
} else {
} /*else {
if (dup) {
cacheRow = tdRowDup(row);
} else {
......@@ -168,7 +168,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
}*/
return code;
}
......@@ -992,7 +992,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = maxKey});
// if (taosArrayPush(pColArray, pColVal) == NULL) {
if (taosArrayPush(pColArray, &(SLastCol){.ts = TSKEY_MAX, .colVal = *pColVal}) == NULL) {
if (taosArrayPush(pColArray, &(SLastCol){.ts = maxKey, .colVal = *pColVal}) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......@@ -1127,7 +1127,7 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
} else {
STSRow *pRow = NULL;
bool dup = false;
bool dup = false; // which is always false for now
code = mergeLastRow(uid, pTsdb, &dup, &pRow);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
......@@ -1139,7 +1139,14 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
return 0;
}
tsdbCacheInsertLastrow(pCache, pTsdb, uid, pRow, dup);
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
// tsdbCacheInsertLastrow(pCache, pTsdb, uid, pRow, dup);
h = taosLRUCacheLookup(pCache, key, keyLen);
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
}
......@@ -1202,7 +1209,7 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
/* tsdbCacheInsertLast(pCache, uid, pRow); */
h = taosLRUCacheLookup(pCache, key, keyLen);
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
}
......@@ -1235,9 +1242,21 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
getTableCacheKey(uid, 1, key, &keyLen);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
// clear last cache anyway, no matter where eKey ends.
taosLRUCacheRelease(pCache, h, true);
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册