未验证 提交 511ff713 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #21445 from taosdata/fix/TD-24373

fix(cache/read): associate tbname with primary ts column
......@@ -404,16 +404,47 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
for (int i = 0; i < num_keys; ++i) {
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
if (!COL_VAL_IS_NONE(pColVal)) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
// if (!COL_VAL_IS_NONE(pColVal)) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen);
pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
taosMemoryFree(value);
}
if (COL_VAL_IS_VALUE(pColVal)) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen);
SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
rocksdb_writebatch_put(wb, (char *)&key, ROCKS_KEY_LEN, value, vlen);
pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
......@@ -434,39 +465,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(value);
}
if (COL_VAL_IS_VALUE(pColVal)) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
rocksdb_writebatch_put(wb, (char *)&key, ROCKS_KEY_LEN, value, vlen);
pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge,
tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
taosMemoryFree(value);
}
}
}
//}
rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]);
......@@ -474,7 +474,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes);
rocksMayWrite(pTsdb, false, false, false);
rocksMayWrite(pTsdb, true, false, false);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
_exit:
......@@ -3010,17 +3010,17 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
if (COL_VAL_IS_NONE(pColVal)) {
/*if (COL_VAL_IS_NONE(pColVal)) {
if (!setNoneCol) {
noneCol = iCol;
setNoneCol = true;
}
} else {
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
if (aColIndex >= 0) {
taosArrayRemove(aColArray, aColIndex);
}
} else {*/
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
if (aColIndex >= 0) {
taosArrayRemove(aColArray, aColIndex);
}
//}
}
if (!setNoneCol) {
// done, goto return pColArray
......
......@@ -343,6 +343,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
hasRes = true;
p->ts = pColVal->ts;
if (k == 0) {
if (TARRAY_SIZE(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid);
} else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
}
}
if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
singleTableLastTs = pColVal->ts;
}
......@@ -373,12 +381,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
}
if (TARRAY_SIZE(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid);
} else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
}
// taosArrayClearEx(pRow, freeItem);
taosArrayClear(pRow);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册