提交 b62fa20b 编写于 作者: M Minglei Jin

tsdb/cache: return last row as array of SLastCol

上级 977fc4b1
...@@ -61,8 +61,6 @@ static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) { ...@@ -61,8 +61,6 @@ static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
*len = sizeof(uint64_t); *len = sizeof(uint64_t);
} }
static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) { taosMemoryFree(value); }
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) { taosArrayDestroy(value); } static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) { taosArrayDestroy(value); }
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
...@@ -75,13 +73,23 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { ...@@ -75,13 +73,23 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
getTableCacheKey(uid, 0, key, &keyLen); getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) { if (h) {
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h); SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
if (pRow->ts <= eKey) { bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true); taosLRUCacheRelease(pCache, h, true);
} else { } else {
taosLRUCacheRelease(pCache, h, false); taosLRUCacheRelease(pCache, h, false);
} }
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
} }
...@@ -130,14 +138,23 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { ...@@ -130,14 +138,23 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
getTableCacheKey(uid, 0, key, &keyLen); getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) { if (h) {
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h); SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
if (pRow->ts <= eKey) { bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true); taosLRUCacheRelease(pCache, h, true);
} else { } else {
taosLRUCacheRelease(pCache, h, false); taosLRUCacheRelease(pCache, h, false);
} }
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
} }
// getTableCacheKey(uid, "l", key, &keyLen); // getTableCacheKey(uid, "l", key, &keyLen);
...@@ -177,6 +194,46 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST ...@@ -177,6 +194,46 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
getTableCacheKey(uid, 0, key, &keyLen); getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) { if (h) {
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
TSKEY keyTs = row->ts;
bool invalidate = false;
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
int16_t nCol = taosArrayGetSize(pLast);
int16_t iCol = 0;
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (keyTs > tTsVal->ts) {
STColumn *pTColumn = &pTSchema->columns[0];
SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal});
}
for (++iCol; iCol < nCol; ++iCol) {
SLastCol *tTsVal1 = (SLastCol *)taosArrayGet(pLast, iCol);
if (keyTs >= tTsVal1->ts) {
SColVal *tColVal = &tTsVal1->colVal;
SColVal colVal = {0};
tTSRowGetVal(row, pTSchema, iCol, &colVal);
if (!COL_VAL_IS_NONE(&colVal)) {
if (keyTs == tTsVal1->ts && !COL_VAL_IS_NONE(tColVal)) {
invalidate = true;
break;
}
} else {
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = colVal});
}
}
}
_invalidate:
taosMemoryFreeClear(pTSchema);
taosLRUCacheRelease(pCache, h, invalidate);
/*
cacheRow = (STSRow *)taosLRUCacheValue(pCache, h); cacheRow = (STSRow *)taosLRUCacheValue(pCache, h);
if (row->ts >= cacheRow->ts) { if (row->ts >= cacheRow->ts) {
if (row->ts == cacheRow->ts) { if (row->ts == cacheRow->ts) {
...@@ -218,9 +275,9 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST ...@@ -218,9 +275,9 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
if (status != TAOS_LRU_STATUS_OK) { if (status != TAOS_LRU_STATUS_OK) {
code = -1; code = -1;
} }
/* tsdbCacheInsertLastrow(pCache, uid, row, dup); */ // tsdbCacheInsertLastrow(pCache, uid, row, dup);
} }
} }*/
} /*else { } /*else {
if (dup) { if (dup) {
cacheRow = tdRowDup(row); cacheRow = tdRowDup(row);
...@@ -1031,7 +1088,7 @@ _err: ...@@ -1031,7 +1088,7 @@ _err:
return code; return code;
} }
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) { static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray) {
int32_t code = 0; int32_t code = 0;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
...@@ -1055,12 +1112,14 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo ...@@ -1055,12 +1112,14 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
break; break;
} }
TSKEY rowTs = TSDBROW_TS(pRow);
if (lastRowTs == TSKEY_MAX) { if (lastRowTs == TSKEY_MAX) {
lastRowTs = TSDBROW_TS(pRow); lastRowTs = rowTs;
STColumn *pTColumn = &pTSchema->columns[0]; STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs}); *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
if (taosArrayPush(pColArray, pColVal) == NULL) { if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
...@@ -1068,7 +1127,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo ...@@ -1068,7 +1127,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
for (iCol = 1; iCol < nCol; ++iCol) { for (iCol = 1; iCol < nCol; ++iCol) {
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (taosArrayPush(pColArray, pColVal) == NULL) { if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
...@@ -1079,15 +1138,15 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo ...@@ -1079,15 +1138,15 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
} }
} }
if (!setNoneCol) { if (!setNoneCol) {
// goto build the result ts row // done, goto return pColArray
break; break;
} else { } else {
continue; continue;
} }
} }
if ((TSDBROW_TS(pRow) < lastRowTs)) { if ((rowTs < lastRowTs)) {
// goto build the result ts row // done, goto return pColArray
break; break;
} }
...@@ -1099,7 +1158,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo ...@@ -1099,7 +1158,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
taosArraySet(pColArray, iCol, pColVal); taosArraySet(pColArray, iCol, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
} else if (COL_VAL_IS_NONE(tColVal) && COL_VAL_IS_NONE(pColVal) && !setNoneCol) { } else if (COL_VAL_IS_NONE(tColVal) && COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
noneCol = iCol; noneCol = iCol;
setNoneCol = true; setNoneCol = true;
...@@ -1110,14 +1169,13 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo ...@@ -1110,14 +1169,13 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
// build the result ts row here // build the result ts row here
*dup = false; *dup = false;
if (taosArrayGetSize(pColArray) == nCol) { if (taosArrayGetSize(pColArray) == nCol) {
code = tdSTSRowNew(pColArray, pTSchema, ppRow); *ppColArray = NULL;
if (code) goto _err; taosArrayDestroy(pColArray);
} else { } else {
*ppRow = NULL; *ppColArray = pColArray;
} }
nextRowIterClose(&iter); nextRowIterClose(&iter);
taosArrayDestroy(pColArray);
taosMemoryFreeClear(pTSchema); taosMemoryFreeClear(pTSchema);
return code; return code;
...@@ -1178,7 +1236,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { ...@@ -1178,7 +1236,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
} }
} }
if (!setNoneCol) { if (!setNoneCol) {
// goto build the result ts row // done, goto return pColArray
break; break;
} else { } else {
continue; continue;
...@@ -1202,7 +1260,6 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { ...@@ -1202,7 +1260,6 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
} }
} while (setNoneCol); } while (setNoneCol);
// build the result ts row here
if (taosArrayGetSize(pColArray) <= 0) { if (taosArrayGetSize(pColArray) <= 0) {
*ppLastArray = NULL; *ppLastArray = NULL;
taosArrayDestroy(pColArray); taosArrayDestroy(pColArray);
...@@ -1233,13 +1290,13 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH ...@@ -1233,13 +1290,13 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
h = taosLRUCacheLookup(pCache, key, keyLen); h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) { if (!h) {
STSRow *pRow = NULL; SArray *pArray = NULL;
bool dup = false; // which is always false for now bool dup = false; // which is always false for now
code = mergeLastRow(uid, pTsdb, &dup, &pRow); code = mergeLastRow(uid, pTsdb, &dup, &pArray);
// if table's empty or error, return code of -1 // if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) { if (code < 0 || pArray == NULL) {
if (!dup && pRow) { if (!dup && pArray) {
taosMemoryFree(pRow); taosArrayDestroy(pArray);
} }
taosThreadMutexUnlock(&pTsdb->lruMutex); taosThreadMutexUnlock(&pTsdb->lruMutex);
...@@ -1249,9 +1306,9 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH ...@@ -1249,9 +1306,9 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
return 0; return 0;
} }
_taos_lru_deleter_t deleter = deleteTableCacheLastrow; _taos_lru_deleter_t deleter = deleteTableCacheLast;
LRUStatus status = LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW); taosLRUCacheInsert(pCache, key, keyLen, pArray, pArray->capacity, deleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) { if (status != TAOS_LRU_STATUS_OK) {
code = -1; code = -1;
} }
...@@ -1309,7 +1366,6 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand ...@@ -1309,7 +1366,6 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
SArray *pLastArray = NULL; SArray *pLastArray = NULL;
code = mergeLast(uid, pTsdb, &pLastArray); code = mergeLast(uid, pTsdb, &pLastArray);
// if table's empty or error, return code of -1 // if table's empty or error, return code of -1
// if (code < 0 || pRow == NULL) {
if (code < 0 || pLastArray == NULL) { if (code < 0 || pLastArray == NULL) {
taosThreadMutexUnlock(&pTsdb->lruMutex); taosThreadMutexUnlock(&pTsdb->lruMutex);
......
...@@ -128,7 +128,8 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint ...@@ -128,7 +128,8 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint
// no data in the table of Uid // no data in the table of Uid
if (*h != NULL) { if (*h != NULL) {
*pRow = (STSRow*)taosLRUCacheValue(lruCache, *h); //*pRow = (STSRow*)taosLRUCacheValue(lruCache, *h);
SArray* pLastrow = (SArray*)taosLRUCacheValue(lruCache, *h);
} }
} else { } else {
code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h); code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册