未验证 提交 13c713d3 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #22290 from taosdata/3.0_main

merge 3.0 into branch main
...@@ -841,48 +841,6 @@ typedef enum { ...@@ -841,48 +841,6 @@ typedef enum {
READ_MODE_ALL, READ_MODE_ALL,
} EReadMode; } EReadMode;
typedef struct STsdbReaderInfo {
uint64_t suid;
STSchema *pSchema;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window;
SVersionRange verRange;
int16_t order;
} STsdbReaderInfo;
typedef struct {
SArray *pTombData;
} STableLoadInfo;
struct SDataFileReader;
typedef struct SCacheRowsReader {
STsdb *pTsdb;
STsdbReaderInfo info;
TdThreadMutex readerMutex;
SVnode *pVnode;
STSchema *pSchema;
STSchema *pCurrSchema;
uint64_t uid;
char **transferBuf; // todo remove it soon
int32_t numOfCols;
SArray *pCidList;
int32_t *pSlotIds;
int32_t type;
int32_t tableIndex; // currently returned result tables
STableKeyInfo *pTableList; // table id list
int32_t numOfTables;
uint64_t *uidList;
SSHashObj *pTableMap;
SArray *pLDataIterArray;
struct SDataFileReader *pFileReader;
STFileSet *pCurFileSet;
STsdbReadSnap *pReadSnap;
char *idstr;
int64_t lastTs;
} SCacheRowsReader;
typedef struct { typedef struct {
TSKEY ts; TSKEY ts;
int8_t dirty; int8_t dirty;
...@@ -892,14 +850,10 @@ typedef struct { ...@@ -892,14 +850,10 @@ typedef struct {
int32_t tsdbOpenCache(STsdb *pTsdb); int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype);
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype);
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle); int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle);
...@@ -909,8 +863,6 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); ...@@ -909,8 +863,6 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
// int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
// ========== inline functions ========== // ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1; TSDBKEY *pKey1 = (TSDBKEY *)p1;
......
...@@ -1020,11 +1020,11 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache ...@@ -1020,11 +1020,11 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, pr, ltype); code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, pr, ltype);
taosThreadMutexUnlock(&pTsdb->lruMutex); taosThreadMutexUnlock(&pTsdb->lruMutex);
}
if (remainCols) { if (remainCols) {
taosArrayDestroy(remainCols); taosArrayDestroy(remainCols);
} }
}
return code; return code;
} }
...@@ -1592,10 +1592,51 @@ _err: ...@@ -1592,10 +1592,51 @@ _err:
return code; return code;
} }
static void freeTableInfoFunc(void *param) {
void **p = (void **)param;
taosMemoryFreeClear(*p);
}
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
if (!pReader->pTableMap) {
pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
}
STableLoadInfo *pInfo = NULL;
STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
if (!ppInfo) {
pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
return pInfo;
}
return *ppInfo;
}
static uint64_t *getUidList(SCacheRowsReader *pReader) {
if (!pReader->uidList) {
int32_t numOfTables = pReader->numOfTables;
pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
for (int32_t i = 0; i < numOfTables; ++i) {
uint64_t uid = pReader->pTableList[i].uid;
pReader->uidList[i] = uid;
}
taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
}
return pReader->uidList;
}
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader, static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
bool isFile) { bool isFile) {
int32_t code = 0; int32_t code = 0;
uint64_t *uidList = pReader->uidList; uint64_t *uidList = getUidList(pReader);
int32_t numOfTables = pReader->numOfTables; int32_t numOfTables = pReader->numOfTables;
int64_t suid = pReader->info.suid; int64_t suid = pReader->info.suid;
...@@ -1618,7 +1659,7 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea ...@@ -1618,7 +1659,7 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
} }
uint64_t uid = uidList[j]; uint64_t uid = uidList[j];
STableLoadInfo *pInfo = *(STableLoadInfo **)tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid)); STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
if (pInfo->pTombData == NULL) { if (pInfo->pTombData == NULL) {
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData)); pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
} }
...@@ -1660,13 +1701,16 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea ...@@ -1660,13 +1701,16 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
} }
if (newTable) { if (newTable) {
pInfo = *(STableLoadInfo **)tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid)); pInfo = getTableLoadInfo(pReader, uid);
if (pInfo->pTombData == NULL) { if (pInfo->pTombData == NULL) {
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData)); pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
} }
} }
if (record.version <= pReader->info.verRange.maxVer) { if (record.version <= pReader->info.verRange.maxVer) {
tsdbError("tomb xx load/cache: vgId:%d fid:%d commit %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pInfo->pTombData, &delData); taosArrayPush(pInfo->pTombData, &delData);
} }
...@@ -1792,12 +1836,10 @@ struct CacheNextRowIter; ...@@ -1792,12 +1836,10 @@ struct CacheNextRowIter;
typedef struct SFSNextRowIter { typedef struct SFSNextRowIter {
SFSNEXTROWSTATES state; // [input] SFSNEXTROWSTATES state; // [input]
STsdb *pTsdb; // [input]
SBlockIdx *pBlockIdxExp; // [input] SBlockIdx *pBlockIdxExp; // [input]
STSchema *pTSchema; // [input] STSchema *pTSchema; // [input]
tb_uid_t suid; tb_uid_t suid;
tb_uid_t uid; tb_uid_t uid;
int32_t nFileSet;
int32_t iFileSet; int32_t iFileSet;
STFileSet *pFileSet; STFileSet *pFileSet;
TFileSetArray *aDFileSet; TFileSetArray *aDFileSet;
...@@ -1828,10 +1870,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -1828,10 +1870,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
int nCols) { int nCols) {
SFSNextRowIter *state = (SFSNextRowIter *)iter; SFSNextRowIter *state = (SFSNextRowIter *)iter;
int32_t code = 0; int32_t code = 0;
STsdb *pTsdb = state->pr->pTsdb;
if (SFSNEXTROW_FS == state->state) { if (SFSNEXTROW_FS == state->state) {
state->nFileSet = TARRAY2_SIZE(state->aDFileSet); state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
state->iFileSet = state->nFileSet;
state->state = SFSNEXTROW_FILESET; state->state = SFSNEXTROW_FILESET;
} }
...@@ -1850,7 +1892,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -1850,7 +1892,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
STFileObj **pFileObj = state->pFileSet->farr; STFileObj **pFileObj = state->pFileSet->farr;
if (pFileObj[0] != NULL || pFileObj[3] != NULL) { if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
if (state->pFileSet != state->pr->pCurFileSet) { if (state->pFileSet != state->pr->pCurFileSet) {
SDataFileReaderConfig conf = {.tsdb = state->pTsdb, .szPage = state->pTsdb->pVnode->config.tsdbPageSize}; SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
const char *filesName[4] = {0}; const char *filesName[4] = {0};
if (pFileObj[0] != NULL) { if (pFileObj[0] != NULL) {
conf.files[0].file = *pFileObj[0]->f; conf.files[0].file = *pFileObj[0]->f;
...@@ -1880,6 +1922,11 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -1880,6 +1922,11 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
state->pr->pCurFileSet = state->pFileSet; state->pr->pCurFileSet = state->pFileSet;
loadDataTomb(state->pr, state->pr->pFileReader); loadDataTomb(state->pr, state->pr->pFileReader);
int32_t code = tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
} }
if (!state->pIndexList) { if (!state->pIndexList) {
...@@ -1887,12 +1934,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -1887,12 +1934,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
} else { } else {
taosArrayClear(state->pIndexList); taosArrayClear(state->pIndexList);
} }
const TBrinBlkArray *pBlkArray = NULL;
int32_t code = tsdbDataFileReadBrinBlk(state->pr->pFileReader, &pBlkArray); const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) { for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
SBrinBlk *pBrinBlk = &pBlkArray->data[i]; SBrinBlk *pBrinBlk = &pBlkArray->data[i];
...@@ -1920,8 +1963,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -1920,8 +1963,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
state->pr->pCurFileSet = state->pFileSet; state->pr->pCurFileSet = state->pFileSet;
} }
code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid, code = lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid, state->pr,
state->pr, state->lastTs, aCols, nCols); state->lastTs, aCols, nCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _err;
} }
...@@ -2341,7 +2384,6 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -2341,7 +2384,6 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsState.pRowIter = pIter; pIter->fsState.pRowIter = pIter;
pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.state = SFSNEXTROW_FS;
pIter->fsState.pTsdb = pTsdb;
pIter->fsState.aDFileSet = pReadSnap->pfSetArray; pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
pIter->fsState.pBlockIdxExp = &pIter->idx; pIter->fsState.pBlockIdxExp = &pIter->idx;
pIter->fsState.pTSchema = pTSchema; pIter->fsState.pTSchema = pTSchema;
...@@ -2458,14 +2500,17 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI ...@@ -2458,14 +2500,17 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
uint64_t uid = pIter->idx.uid; uint64_t uid = pIter->idx.uid;
STableLoadInfo *pInfo = *(STableLoadInfo **)tSimpleHashGet(pIter->pr->pTableMap, &uid, sizeof(uid)); STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
SArray *pTombData = pInfo->pTombData; if (pInfo->pTombData == NULL) {
if (pTombData) { pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
taosArrayAddAll(pTombData, pIter->pMemDelData);
code = tsdbBuildDeleteSkyline(pTombData, 0, (int32_t)(TARRAY_SIZE(pTombData) - 1), pIter->pSkyline);
} }
taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData);
size_t delSize = TARRAY_SIZE(pInfo->pTombData);
if (delSize > 0) {
code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
}
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1; pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "tcommon.h" #include "tcommon.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbDataFileRW.h" #include "tsdbDataFileRW.h"
#include "tsdbReadUtil.h"
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t)) #define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
...@@ -133,21 +134,6 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf ...@@ -133,21 +134,6 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t uidComparFunc(const void* p1, const void* p2) {
uint64_t pu1 = *(uint64_t*)p1;
uint64_t pu2 = *(uint64_t*)p2;
if (pu1 == pu2) {
return 0;
} else {
return (pu1 < pu2) ? -1 : 1;
}
}
static void freeTableInfoFunc(void* param) {
void** p = (void**)param;
taosMemoryFreeClear(*p);
}
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr) { SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr) {
*pReader = NULL; *pReader = NULL;
...@@ -173,27 +159,6 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, ...@@ -173,27 +159,6 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
p->pTableList = pTableIdList; p->pTableList = pTableIdList;
p->numOfTables = numOfTables; p->numOfTables = numOfTables;
p->pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (p->pTableMap == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
p->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
if (p->uidList == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfTables; ++i) {
uint64_t uid = p->pTableList[i].uid;
p->uidList[i] = uid;
STableLoadInfo* pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
tSimpleHashPut(p->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
}
tSimpleHashSetFreeFp(p->pTableMap, freeTableInfoFunc);
taosSort(p->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
int32_t code = setTableSchema(p, suid, idstr); int32_t code = setTableSchema(p, suid, idstr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbCacherowsReaderClose(p); tsdbCacherowsReaderClose(p);
...@@ -216,14 +181,6 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, ...@@ -216,14 +181,6 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
} }
} }
SVnodeCfg* pCfg = &((SVnode*)pVnode)->config;
int32_t numOfStt = pCfg->sttTrigger;
p->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
if (p->pLDataIterArray == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
p->idstr = taosStrdup(idstr); p->idstr = taosStrdup(idstr);
taosThreadMutexInit(&p->readerMutex, NULL); taosThreadMutexInit(&p->readerMutex, NULL);
...@@ -250,9 +207,11 @@ void* tsdbCacherowsReaderClose(void* pReader) { ...@@ -250,9 +207,11 @@ void* tsdbCacherowsReaderClose(void* pReader) {
taosMemoryFree(p->pCurrSchema); taosMemoryFree(p->pCurrSchema);
if (p->pLDataIterArray) {
int64_t loadBlocks = 0; int64_t loadBlocks = 0;
double elapse = 0; double elapse = 0;
destroySttBlockReader(p->pLDataIterArray, &loadBlocks, &elapse); destroySttBlockReader(p->pLDataIterArray, &loadBlocks, &elapse);
}
if (p->pFileReader) { if (p->pFileReader) {
tsdbDataFileReaderClose(&p->pFileReader); tsdbDataFileReaderClose(&p->pFileReader);
...@@ -318,7 +277,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -318,7 +277,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol)); SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
bool hasRes = false; bool hasRes = false;
SArray* pLastCols = NULL;
void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
if (pRes == NULL) { if (pRes == NULL) {
...@@ -327,14 +285,23 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -327,14 +285,23 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
for (int32_t j = 0; j < pr->numOfCols; ++j) { for (int32_t j = 0; j < pr->numOfCols; ++j) {
pRes[j] = pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE);
taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[/*-1 == slotIds[j] ? 0 : */ slotIds[j]].bytes +
VARSTR_HEADER_SIZE);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
p->ts = INT64_MIN; p->ts = INT64_MIN;
} }
pLastCols = taosArrayInit(pr->numOfCols, sizeof(SLastCol)); taosThreadMutexLock(&pr->readerMutex);
code = tsdbTakeReadSnap2((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
STableKeyInfo* pTableList = pr->pTableList;
// retrieve the only one last row of all tables in the uid list.
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
SArray* pLastCols = taosArrayInit(pr->numOfCols, sizeof(SLastCol));
if (pLastCols == NULL) { if (pLastCols == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _end; goto _end;
...@@ -351,33 +318,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -351,33 +318,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
taosArrayPush(pLastCols, &p); taosArrayPush(pLastCols, &p);
} }
taosThreadMutexLock(&pr->readerMutex);
code = tsdbTakeReadSnap2((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
// retrieve the only one last row of all tables in the uid list.
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
int64_t totalLastTs = INT64_MAX; int64_t totalLastTs = INT64_MAX;
for (int32_t i = 0; i < pr->numOfTables; ++i) { for (int32_t i = 0; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = &pr->pTableList[i]; tb_uid_t uid = pTableList[i].uid;
tsdbCacheGetBatch(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
// tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
if (TARRAY_SIZE(pRow) <= 0) {
taosArrayClearEx(pRow, freeItem);
// taosArrayClear(pRow);
continue;
}
SLastCol* pColVal = taosArrayGet(pRow, 0);
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
taosArrayClearEx(pRow, freeItem); taosArrayClearEx(pRow, freeItem);
// taosArrayClear(pRow);
continue; continue;
} }
...@@ -400,9 +348,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -400,9 +348,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
p->ts = pColVal->ts; p->ts = pColVal->ts;
if (k == 0) { if (k == 0) {
if (TARRAY_SIZE(pTableUidList) == 0) { if (TARRAY_SIZE(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid); taosArrayPush(pTableUidList, &uid);
} else { } else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid); taosArraySet(pTableUidList, 0, &uid);
} }
} }
...@@ -437,32 +385,25 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -437,32 +385,25 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
taosArrayClearEx(pRow, freeItem); taosArrayClearEx(pRow, freeItem);
// taosArrayClear(pRow);
} }
if (hasRes) { if (hasRes) {
saveOneRow(pLastCols, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr); saveOneRow(pLastCols, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
} }
taosArrayDestroyEx(pLastCols, freeItem);
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
tb_uid_t uid = pr->pTableList[i].uid; tb_uid_t uid = pTableList[i].uid;
tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype); tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
if (TARRAY_SIZE(pRow) <= 0) { if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
taosArrayClearEx(pRow, freeItem); taosArrayClearEx(pRow, freeItem);
// taosArrayClear(pRow);
continue;
}
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
taosArrayClearEx(pRow, freeItem);
// taosArrayClear(pRow);
continue; continue;
} }
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr); saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
taosArrayClearEx(pRow, freeItem); taosArrayClearEx(pRow, freeItem);
// taosArrayClear(pRow);
taosArrayPush(pTableUidList, &uid); taosArrayPush(pTableUidList, &uid);
...@@ -478,11 +419,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -478,11 +419,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
_end: _end:
tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true); tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true);
int64_t loadBlocks = 0;
double elapse = 0;
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, &loadBlocks, &elapse);
pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
taosThreadMutexUnlock(&pr->readerMutex); taosThreadMutexUnlock(&pr->readerMutex);
if (pRes != NULL) { if (pRes != NULL) {
...@@ -492,9 +428,7 @@ _end: ...@@ -492,9 +428,7 @@ _end:
} }
taosMemoryFree(pRes); taosMemoryFree(pRes);
// taosArrayDestroyEx(pRow, freeItem);
taosArrayDestroy(pRow); taosArrayDestroy(pRow);
taosArrayDestroyEx(pLastCols, freeItem);
return code; return code;
} }
...@@ -790,7 +790,6 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) { ...@@ -790,7 +790,6 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
pMTree->ignoreEarlierTs = false; pMTree->ignoreEarlierTs = false;
// todo handle other level of stt files, here only deal with the first level stt
int32_t size = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->size; int32_t size = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->size;
if (size == 0) { if (size == 0) {
goto _end; goto _end;
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "tsdbUtil2.h" #include "tsdbUtil2.h"
#include "tsimplehash.h" #include "tsimplehash.h"
static int32_t uidComparFunc(const void* p1, const void* p2) { int32_t uidComparFunc(const void* p1, const void* p2) {
uint64_t pu1 = *(uint64_t*)p1; uint64_t pu1 = *(uint64_t*)p1;
uint64_t pu2 = *(uint64_t*)p2; uint64_t pu2 = *(uint64_t*)p2;
if (pu1 == pu2) { if (pu1 == pu2) {
......
...@@ -36,6 +36,16 @@ typedef enum { ...@@ -36,6 +36,16 @@ typedef enum {
EXTERNAL_ROWS_NEXT = 0x3, EXTERNAL_ROWS_NEXT = 0x3,
} EContentData; } EContentData;
typedef struct STsdbReaderInfo {
uint64_t suid;
STSchema* pSchema;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window;
SVersionRange verRange;
int16_t order;
} STsdbReaderInfo;
typedef struct SBlockInfoBuf { typedef struct SBlockInfoBuf {
int32_t currentIndex; int32_t currentIndex;
SArray* pData; SArray* pData;
...@@ -215,6 +225,8 @@ typedef struct SBrinRecordIter { ...@@ -215,6 +225,8 @@ typedef struct SBrinRecordIter {
SBrinRecord record; SBrinRecord record;
} SBrinRecordIter; } SBrinRecordIter;
int32_t uidComparFunc(const void* p1, const void* p2);
STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id); STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id);
SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
...@@ -241,6 +253,41 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM ...@@ -241,6 +253,41 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM
int32_t loadDataFileTombDataForAll(STsdbReader* pReader); int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo); int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
typedef struct {
SArray* pTombData;
} STableLoadInfo;
struct SDataFileReader;
typedef struct SCacheRowsReader {
STsdb* pTsdb;
STsdbReaderInfo info;
TdThreadMutex readerMutex;
SVnode* pVnode;
STSchema* pSchema;
STSchema* pCurrSchema;
uint64_t uid;
char** transferBuf; // todo remove it soon
int32_t numOfCols;
SArray* pCidList;
int32_t* pSlotIds;
int32_t type;
int32_t tableIndex; // currently returned result tables
STableKeyInfo* pTableList; // table id list
int32_t numOfTables;
uint64_t* uidList;
SSHashObj* pTableMap;
SArray* pLDataIterArray;
struct SDataFileReader* pFileReader;
STFileSet* pCurFileSet;
const TBrinBlkArray* pBlkArray;
STsdbReadSnap* pReadSnap;
char* idstr;
int64_t lastTs;
} SCacheRowsReader;
int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册