diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index a566a1eb14a7e1797352cf8b87bf93a1ced7ce5e..6132a47ac393b3b3f73aebe2a9abc1be9337ca8e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -18,7 +18,8 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { int32_t code = 0; SLRUCache *pCache = NULL; - size_t cfgCapacity = 1024 * 1024; // TODO: get cfg from tsdb config + // TODO: get cfg from vnode config: pTsdb->pVnode->config.lruCapacity + size_t cfgCapacity = 1024 * 1024; pCache = taosLRUCacheInit(cfgCapacity, -1, .5); if (pCache == NULL) { @@ -61,8 +62,11 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { if (h) { cacheRow = (STSRow *)taosLRUCacheValue(pCache, h); if (row->ts >= cacheRow->ts) { - if (row->ts > cacheRow->ts) { + if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) { tdRowCpy(cacheRow, row); + } else { + tsdbCacheDeleteLastrow(pCache, uid); + tsdbCacheInsertLastrow(pCache, uid, row); } } } else { @@ -475,13 +479,39 @@ static STSRow *tsRowFromTsdbRow(TSDBROW *pRow) { // TODO: new tsrow from tsdbrow STSRow *ret = NULL; if (pRow->type == 0) { - return pRow->pTSRow; + return tdRowDup(pRow->pTSRow); } else { } return ret; } +static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) { + bool deleted = false; + while (*iSkyline > 0) { + TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline); + TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1); + + if (key->ts > pItemBack->ts) { + return false; + } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) { + if ((key->version <= pItemFront->version || key->ts == pItemBack->ts && key->version <= pItemBack->version)) { + return true; + } else { + return false; + } + } else { + if (*iSkyline > 1) { + --*iSkyline; + } else { + return false; + } + } + } + + return deleted; +} + typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow); typedef struct TsdbNextRowState { @@ -598,8 +628,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { for (int i = 0; i < nMax; ++i) { TSDBKEY maxKey = TSDBROW_KEY(max[i]); - bool deleted = false; - // bool deleted = tsdbKeyDeleted(maxKey, pSkyline, &iSkyline); + // bool deleted = false; + bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline); if (!deleted) { merge[nMerge++] = max[i]; }