提交 a127e915 编写于 作者: M Minglei Jin

tsdb/cache skyline: first round implementation of cache skyline

上级 9f9efe9a
...@@ -853,6 +853,10 @@ typedef struct STsdbReaderInfo { ...@@ -853,6 +853,10 @@ typedef struct STsdbReaderInfo {
int16_t order; int16_t order;
} STsdbReaderInfo; } STsdbReaderInfo;
typedef struct {
SArray *pTombData;
} STableLoadInfo;
typedef struct SCacheRowsReader { typedef struct SCacheRowsReader {
STsdb *pTsdb; STsdb *pTsdb;
STsdbReaderInfo info; STsdbReaderInfo info;
...@@ -869,6 +873,8 @@ typedef struct SCacheRowsReader { ...@@ -869,6 +873,8 @@ typedef struct SCacheRowsReader {
int32_t tableIndex; // currently returned result tables int32_t tableIndex; // currently returned result tables
STableKeyInfo *pTableList; // table id list STableKeyInfo *pTableList; // table id list
int32_t numOfTables; int32_t numOfTables;
uint64_t *uidList;
SSHashObj *pTableMap;
SArray *pLDataIterArray; SArray *pLDataIterArray;
STsdbReadSnap *pReadSnap; STsdbReadSnap *pReadSnap;
char *idstr; char *idstr;
......
...@@ -1694,7 +1694,90 @@ _err: ...@@ -1694,7 +1694,90 @@ _err:
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader, static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
bool isFile) { bool isFile) {
int32_t code = 0; int32_t code = 0;
uint64_t *uidList = pReader->uidList;
int32_t numOfTables = pReader->numOfTables;
int64_t suid = pReader->info.suid;
for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
STombBlk *pTombBlk = &pTombBlkArray->data[i];
if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
continue;
}
if (pTombBlk->minTbid.suid > suid ||
(pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
break;
}
STombBlock block = {0};
code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
: tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
uint64_t uid = uidList[j];
STableLoadInfo *pInfo = *(STableLoadInfo **)tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
if (pInfo->pTombData == NULL) {
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
}
STombRecord record = {0};
bool finished = false;
for (int32_t k = 0; k < TARRAY2_SIZE(block.suid); ++k) {
code = tTombBlockGet(&block, k, &record);
if (code != TSDB_CODE_SUCCESS) {
finished = true;
break;
}
if (record.suid < suid) {
continue;
}
if (record.suid > suid) {
finished = true;
break;
}
bool newTable = false;
if (uid < record.uid) {
while (j < numOfTables && uidList[j] < record.uid) {
++j;
newTable = true;
}
if (j >= numOfTables) {
finished = true;
break;
}
uid = uidList[j];
}
if (record.uid < uid) {
continue;
}
if (newTable) {
pInfo = *(STableLoadInfo **)tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
if (pInfo->pTombData == NULL) {
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
}
}
if (record.version <= pReader->info.verRange.maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pInfo->pTombData, &delData);
}
}
tTombBlockDestroy(&block);
if (finished) {
return code;
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1911,7 +1994,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -1911,7 +1994,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
state->pLastIter = &state->lastIter; state->pLastIter = &state->lastIter;
// TODO: load tomb data from data and stt loadDataTomb(state->pr, state->pFileReader);
if (!state->pIndexList) { if (!state->pIndexList) {
state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk)); state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
...@@ -2524,16 +2607,17 @@ typedef struct { ...@@ -2524,16 +2607,17 @@ typedef struct {
} TsdbNextRowState; } TsdbNextRowState;
typedef struct CacheNextRowIter { typedef struct CacheNextRowIter {
SArray *pMemDelData; SArray *pMemDelData;
SArray *pSkyline; SArray *pSkyline;
int64_t iSkyline; int64_t iSkyline;
SBlockIdx idx; SBlockIdx idx;
SMemNextRowIter memState; SMemNextRowIter memState;
SMemNextRowIter imemState; SMemNextRowIter imemState;
SFSNextRowIter fsState; SFSNextRowIter fsState;
TSDBROW memRow, imemRow, fsLastRow, fsRow; TSDBROW memRow, imemRow, fsLastRow, fsRow;
TsdbNextRowState input[4]; TsdbNextRowState input[3];
STsdb *pTsdb; SCacheRowsReader *pr;
STsdb *pTsdb;
} CacheNextRowIter; } CacheNextRowIter;
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
...@@ -2555,9 +2639,9 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -2555,9 +2639,9 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->pMemDelData = NULL; pIter->pMemDelData = NULL;
loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer); loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer);
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
#if 0 #if 0
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
SDelFile *pDelFile = pReadSnap->fs.pDelFile; SDelFile *pDelFile = pReadSnap->fs.pDelFile;
if (pDelFile) { if (pDelFile) {
SDelFReader *pDelFReader; SDelFReader *pDelFReader;
...@@ -2589,22 +2673,11 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -2589,22 +2673,11 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pIter->pSkyline); code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pIter->pSkyline);
if (code) goto _err; if (code) goto _err;
} }
#endif
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
#endif
pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
/*
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
pIter->fsLastState.pTsdb = pTsdb;
pIter->fsLastState.aDFileSet = pReadSnap->pfSetArray;
pIter->fsLastState.pTSchema = pTSchema;
pIter->fsLastState.suid = suid;
pIter->fsLastState.uid = uid;
pIter->fsLastState.pLoadInfo = pLoadInfo;
pIter->fsLastState.pDataFReader = pDataFReaderLast;
pIter->fsLastState.lastTs = lastTs;
pIter->fsLastState.pDataIter = pLDataIter;
*/
pIter->fsState.pRowIter = pIter; pIter->fsState.pRowIter = pIter;
pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.state = SFSNEXTROW_FS;
pIter->fsState.pTsdb = pTsdb; pIter->fsState.pTsdb = pTsdb;
...@@ -2613,7 +2686,6 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -2613,7 +2686,6 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsState.pTSchema = pTSchema; pIter->fsState.pTSchema = pTSchema;
pIter->fsState.suid = suid; pIter->fsState.suid = suid;
pIter->fsState.uid = uid; pIter->fsState.uid = uid;
// pIter->fsState.pDataFReader = pDataFReader;
pIter->fsState.lastTs = lastTs; pIter->fsState.lastTs = lastTs;
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL}; pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
...@@ -2641,7 +2713,7 @@ pIter->input[2] = (TsdbNextRowState){ ...@@ -2641,7 +2713,7 @@ pIter->input[2] = (TsdbNextRowState){
pIter->input[1].next = true; pIter->input[1].next = true;
} }
return code; pIter->pr = pr;
_err: _err:
return code; return code;
} }
...@@ -2724,7 +2796,19 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI ...@@ -2724,7 +2796,19 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI
for (int i = 0; i < nMax; ++i) { for (int i = 0; i < nMax; ++i) {
TSDBKEY maxKey1 = TSDBROW_KEY(max[i]); TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
// TODO: build skyline here if (!pIter->pSkyline) {
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
uint64_t uid = pIter->idx.uid;
STableLoadInfo *pInfo = *(STableLoadInfo **)tSimpleHashGet(pIter->pr->pTableMap, &uid, sizeof(uid));
SArray *pTombData = pInfo->pTombData;
taosArrayAddAll(pTombData, pIter->pMemDelData);
code = tsdbBuildDeleteSkyline(pTombData, 0, (int32_t)(TARRAY_SIZE(pTombData) - 1), pIter->pSkyline);
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
}
bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline); bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
if (!deleted) { if (!deleted) {
iMerge[nMerge] = iMax[i]; iMerge[nMerge] = iMax[i];
......
...@@ -132,6 +132,21 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf ...@@ -132,6 +132,21 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t uidComparFunc(const void* p1, const void* p2) {
uint64_t pu1 = *(uint64_t*)p1;
uint64_t pu2 = *(uint64_t*)p2;
if (pu1 == pu2) {
return 0;
} else {
return (pu1 < pu2) ? -1 : 1;
}
}
static void freeTableInfoFunc(void* param) {
void** p = (void**)param;
taosMemoryFreeClear(*p);
}
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr) { SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr) {
*pReader = NULL; *pReader = NULL;
...@@ -157,6 +172,27 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, ...@@ -157,6 +172,27 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
p->pTableList = pTableIdList; p->pTableList = pTableIdList;
p->numOfTables = numOfTables; p->numOfTables = numOfTables;
p->pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (p->pTableMap == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
p->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
if (p->uidList == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfTables; ++i) {
uint64_t uid = p->pTableList[i].uid;
p->uidList[i] = uid;
STableLoadInfo* pInfo = taosMemoryMalloc(sizeof(STableLoadInfo));
tSimpleHashPut(p->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
}
tSimpleHashSetFreeFp(p->pTableMap, freeTableInfoFunc);
taosSort(p->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
int32_t code = setTableSchema(p, suid, idstr); int32_t code = setTableSchema(p, suid, idstr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbCacherowsReaderClose(p); tsdbCacherowsReaderClose(p);
...@@ -220,6 +256,20 @@ void* tsdbCacherowsReaderClose(void* pReader) { ...@@ -220,6 +256,20 @@ void* tsdbCacherowsReaderClose(void* pReader) {
taosMemoryFree((void*)p->idstr); taosMemoryFree((void*)p->idstr);
taosThreadMutexDestroy(&p->readerMutex); taosThreadMutexDestroy(&p->readerMutex);
if (p->pTableMap) {
void* pe = NULL;
int32_t iter = 0;
while ((pe = tSimpleHashIterate(p->pTableMap, pe, &iter)) != NULL) {
STableLoadInfo* pInfo = *(STableLoadInfo**)pe;
pInfo->pTombData = taosArrayDestroy(pInfo->pTombData);
}
tSimpleHashCleanup(p->pTableMap);
}
if (p->uidList) {
taosMemoryFree(p->uidList);
}
taosMemoryFree(pReader); taosMemoryFree(pReader);
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册