提交 f7c77172 编写于 作者: H Hongze Cheng

Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact

......@@ -137,6 +137,7 @@ void tBlockDataReset(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
// SDelIdx
int32_t tCmprDelIdx(void const *lhs, void const *rhs);
int32_t tPutDelIdx(uint8_t *p, void *ph);
int32_t tGetDelIdx(uint8_t *p, void *ph);
// SDelData
......@@ -225,7 +226,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(SLRUCache *pCache);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow);
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid);
// structs =======================
......
......@@ -81,7 +81,247 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
return code;
}
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow) {
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
tb_uid_t suid = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, uid) < 0) {
metaReaderClear(&mr); // table not esist
return 0;
}
if (mr.me.type == TSDB_CHILD_TABLE) {
suid = mr.me.ctbEntry.suid;
} else if (mr.me.type == TSDB_NORMAL_TABLE) {
suid = 0;
} else {
suid = 0;
}
metaReaderClear(&mr);
return suid;
}
/*
static int32_t getMemLastRow(SMemTable *mem, tb_uid_t suid, tb_uid_t uid, STSRow **ppRow) {
int32_t code = 0;
if (mem) {
STbData *pMem = NULL;
STbDataIter* iter; // mem buffer skip list iterator
tsdbGetTbDataFromMemTable(mem, suid, uid, &pMem);
if (pMem != NULL) {
tsdbTbDataIterCreate(pMem, NULL, 1, &iter);
if (iter != NULL) {
TSDBROW *row = tsdbTbDataIterGet(iter);
tsdbTbDataIterDestroy(iter);
}
}
} else {
*ppRow = NULL;
}
return code;
}
*/
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
int32_t code = 0;
SMapData delDataMap;
SDelData delData;
if (pDelIdx) {
tMapDataReset(&delDataMap);
code = tsdbReadDelData(pDelReader, pDelIdx, &delDataMap, NULL);
if (code) goto _err;
for (int32_t iDelData = 0; iDelData < delDataMap.nItem; ++iDelData) {
code = tMapDataGetItemByIdx(&delDataMap, iDelData, &delData, tGetDelData);
if (code) goto _err;
taosArrayPush(aDelData, &delData);
}
}
_err:
return code;
}
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
int32_t code = 0;
SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
for (; pDelData; pDelData = pDelData->pNext) {
taosArrayPush(aDelData, pDelData);
}
return code;
}
static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
int32_t code = 0;
if (pMem) {
code = getTableDelDataFromTbData(pMem, aDelData);
if (code) goto _err;
}
if (pIMem) {
code = getTableDelDataFromTbData(pIMem, aDelData);
if (code) goto _err;
}
if (pDelIdx) {
code = getTableDelDataFromDelIdx(pDelReader, pDelIdx, aDelData);
if (code) goto _err;
}
_err:
return code;
}
static int32_t getTableDelSkyline(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aSkyline) {
int32_t code = 0;
SArray *aDelData = taosArrayInit(32, sizeof(SDelData));
code = getTableDelData(pMem, pIMem, pDelReader, pDelIdx, aDelData);
if (code) goto _err;
size_t nDelData = taosArrayGetSize(aDelData);
code = tsdbBuildDeleteSkyline(aDelData, 0, (int32_t)(nDelData - 1), aSkyline);
if (code) goto _err;
taosArrayDestroy(aDelData);
_err:
return code;
}
static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t uid, SDelIdx *pDelIdx) {
int32_t code = 0;
SMapData delIdxMap;
SDelIdx idx = {.suid = suid, .uid = uid};
tMapDataReset(&delIdxMap);
code = tsdbReadDelIdx(pDelFReader, &delIdxMap, NULL);
if (code) goto _err;
code = tMapDataSearch(&delIdxMap, &idx, tGetDelIdx, tCmprDelIdx, pDelIdx);
if (code) goto _err;
_err:
return code;
}
static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFileSet *pFileSet,
SArray *pSkyline,
STsdb *pTsdb,
STSRow **pLastRow) {
int32_t code = 0;
TSDBROW *pMemRow = NULL;
TSDBROW *pIMemRow = NULL;
if (iter != NULL) {
pMemRow = tsdbTbDataIterGet(iter);
}
if (iter != NULL) {
pIMemRow = tsdbTbDataIterGet(iiter);
}
SDataFReader *pDataFReader;
code = tsdbDataFReaderOpen(&pDataFReader, pTsdb, pFileSet);
if (code) goto _err;
SMapData blockIdxMap;
tMapDataReset(&blockIdxMap);
code = tsdbReadBlockIdx(pDataFReader, &blockIdxMap, NULL);
if (code) goto _err;
SBlockData *pBlockData;
tsdbDataFReaderClose(pDataFReader);
_err:
return code;
}
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
int32_t code = 0;
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
STbData *pMem = NULL;
STbData *pIMem = NULL;
STbDataIter iter; // mem buffer skip list iterator
STbDataIter iiter; // imem buffer skip list iterator
if (pTsdb->mem) {
tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
if (pMem != NULL) {
tsdbTbDataIterOpen(pMem, NULL, 1, &iter);
}
}
if (pTsdb->imem) {
tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
if (pIMem != NULL) {
tsdbTbDataIterOpen(pIMem, NULL, 1, &iiter);
}
}
*ppRow = NULL;
SDelFReader *pDelFReader;
//code = tsdbDelFReaderOpen(&pDelFReader, pTsdb->fs->cState->pDelFile, pTsdb, NULL);
if (code) goto _err;
SDelIdx delIdx;
code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
if (code) goto _err;
SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline);
if (code) goto _err;
/*
SFSIter fsiter;
bool fsHasNext = false;
tsdbFSIterOpen(pTsdb->fs, TSDB_FS_ITER_BACKWARD, &fsiter);
do {
*/
SDFileSet *pFileSet = NULL;
//pFileSet = tsdbFSIterGet(fsiter);
code = mergeLastRowFileSet(&iter, &iiter, pFileSet, pSkyline, pTsdb, ppRow);
if (code < 0) {
goto _err;
}
if (*ppRow != NULL) {
//break;
}
/*
} while (fsHasNext = tsdbFSIterNext(fsiter))
*/
tsdbDelFReaderClose(pDelFReader);
return code;
_err:
tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
......@@ -91,9 +331,12 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow) {
if (h) {
*ppRow = (STSRow *) taosLRUCacheValue(pCache, h);
} else {
// TODO: load lastrow from mem, imem, and files
// if table's empty, return code of -1
code = -1;
STSRow *pRow = NULL;
code = mergeLastRow(uid, pTsdb, &pRow);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
return -1;
}
}
return code;
......
......@@ -52,7 +52,7 @@ int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t ty
for (int32_t i = 0; i < numOfTables; ++i) {
tb_uid_t* uid = taosArrayGet(pTableIdList, i);
int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, &pRow);
int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, pv->pTsdb, &pRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -77,7 +77,7 @@ int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t ty
for (int32_t i = 0; i < numOfTables; ++i) {
tb_uid_t* uid = taosArrayGet(pTableIdList, i);
int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, &pRow);
int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, pv->pTsdb, &pRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -440,6 +440,25 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) {
}
// SDelIdx ======================================================
int32_t tCmprDelIdx(void const *lhs, void const *rhs) {
SDelIdx *lDelIdx = *(SDelIdx **)lhs;
SDelIdx *rDelIdx = *(SDelIdx **)rhs;
if (lDelIdx->suid < lDelIdx->suid) {
return -1;
} else if (lDelIdx->suid > lDelIdx->suid) {
return 1;
}
if (lDelIdx->uid < lDelIdx->uid) {
return -1;
} else if (lDelIdx->uid > lDelIdx->uid) {
return 1;
}
return 0;
}
int32_t tPutDelIdx(uint8_t *p, void *ph) {
SDelIdx *pDelIdx = (SDelIdx *)ph;
int32_t n = 0;
......@@ -1125,4 +1144,4 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
_err:
return code;
}
\ No newline at end of file
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册