提交 1e23f805 编写于 作者: M Minglei Jin

tsdb/cache: use empty array as negative entry to empty table's last/lr

上级 7d26a821
......@@ -91,12 +91,10 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
}
}
taosLRUCacheRelease(pCache, h, invalidate);
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
taosLRUCacheErase(pCache, key, keyLen);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
return code;
......@@ -124,12 +122,10 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
}
}
taosLRUCacheRelease(pCache, h, invalidate);
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
taosLRUCacheErase(pCache, key, keyLen);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
return code;
......@@ -208,6 +204,44 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TS
int16_t nCol = taosArrayGetSize(pLast);
int16_t iCol = 0;
if (nCol <= 0) {
nCol = pTSchema->numOfCols;
STColumn *pTColumn = &pTSchema->columns[0];
SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});
if (taosArrayPush(pLast, &(SLastCol){.ts = keyTs, .colVal = tColVal}) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _invalidate;
}
for (iCol = 1; iCol < nCol; ++iCol) {
SColVal colVal = {0};
tsdbRowGetColVal(row, pTSchema, iCol, &colVal);
SLastCol lastCol = {.ts = keyTs, .colVal = colVal};
if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLast, iCol);
taosMemoryFree(pLastCol->colVal.value.pData);
lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _invalidate;
}
memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData);
}
if (taosArrayPush(pLast, &lastCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _invalidate;
}
}
goto _invalidate;
}
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (keyTs > tTsVal->ts) {
STColumn *pTColumn = &pTSchema->columns[0];
......@@ -279,6 +313,44 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb
int16_t nCol = taosArrayGetSize(pLast);
int16_t iCol = 0;
if (nCol <= 0) {
nCol = pTSchema->numOfCols;
STColumn *pTColumn = &pTSchema->columns[0];
SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});
if (taosArrayPush(pLast, &(SLastCol){.ts = keyTs, .colVal = tColVal}) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _invalidate;
}
for (iCol = 1; iCol < nCol; ++iCol) {
SColVal colVal = {0};
tsdbRowGetColVal(row, pTSchema, iCol, &colVal);
SLastCol lastCol = {.ts = keyTs, .colVal = colVal};
if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLast, iCol);
taosMemoryFree(pLastCol->colVal.value.pData);
lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _invalidate;
}
memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData);
}
if (taosArrayPush(pLast, &lastCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _invalidate;
}
}
goto _invalidate;
}
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (keyTs > tTsVal->ts) {
STColumn *pTColumn = &pTSchema->columns[0];
......@@ -1215,12 +1287,12 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
// build the result ts row here
*dup = false;
if (taosArrayGetSize(pColArray) != nCol) {
*ppColArray = NULL;
taosArrayDestroy(pColArray);
} else {
*ppColArray = pColArray;
}
// if (taosArrayGetSize(pColArray) != nCol) {
//*ppColArray = NULL;
// taosArrayDestroy(pColArray);
//} else {
*ppColArray = pColArray;
//}
nextRowIterClose(&iter);
// taosMemoryFreeClear(pTSchema);
......@@ -1333,12 +1405,12 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
}
} while (setNoneCol);
if (taosArrayGetSize(pColArray) <= 0) {
*ppLastArray = NULL;
taosArrayDestroy(pColArray);
} else {
*ppLastArray = pColArray;
}
// if (taosArrayGetSize(pColArray) <= 0) {
//*ppLastArray = NULL;
// taosArrayDestroy(pColArray);
//} else {
*ppLastArray = pColArray;
//}
nextRowIterClose(&iter);
// taosMemoryFreeClear(pTSchema);
......@@ -1369,8 +1441,8 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *
SArray *pArray = NULL;
bool dup = false; // which is always false for now
code = mergeLastRow(uid, pTsdb, &dup, &pArray, pr);
// if table's empty or error, return code of -1
if (code < 0 || pArray == NULL) {
// if table's empty or error, set handle NULL and return
if (code < 0 /* || pArray == NULL*/) {
if (!dup && pArray) {
taosArrayDestroy(pArray);
}
......@@ -1388,20 +1460,14 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
// taosThreadMutexUnlock(&pTsdb->lruMutex);
// h = taosLRUCacheLookup(pCache, key, keyLen);
} // else {
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
//}
}
*handle = h;
return code;
}
// int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pTSchema) {
// int32_t code = 0;
// int16_t nCol = taosArrayGetSize(pLastArray);
......@@ -1442,8 +1508,8 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr,
if (!h) {
SArray *pLastArray = NULL;
code = mergeLast(uid, pTsdb, &pLastArray, pr);
// if table's empty or error, return code of -1
if (code < 0 || pLastArray == NULL) {
// if table's empty or error, set handle NULL and return
if (code < 0 /* || pLastArray == NULL*/) {
taosThreadMutexUnlock(&pTsdb->lruMutex);
*handle = NULL;
......@@ -1457,13 +1523,8 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr,
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
// taosThreadMutexUnlock(&pTsdb->lruMutex);
// h = taosLRUCacheLookup(pCache, key, keyLen);
} // else {
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
//}
}
*handle = h;
......
......@@ -250,7 +250,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
return code;
}
if (h == NULL) {
if (h == NULL || taosArrayGetSize(pRow) <= 0) {
continue;
}
......@@ -319,7 +319,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
return code;
}
if (h == NULL) {
if (h == NULL || taosArrayGetSize(pRow) <= 0) {
continue;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册