提交 7221f5a6 编写于 作者: H Haojun Liao

[td-227]

上级 7f514df7
...@@ -326,8 +326,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -326,8 +326,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u #define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type #define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_IMPORT 0x200u // import data #define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x800u
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) #define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) #define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
......
...@@ -403,23 +403,24 @@ static bool isTopBottomQuery(SQuery *pQuery) { ...@@ -403,23 +403,24 @@ static bool isTopBottomQuery(SQuery *pQuery) {
return false; return false;
} }
static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, int32_t index) { static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, int32_t numOfCols, int32_t index) {
// for a tag column, no corresponding field info // for a tag column, no corresponding field info
SColIndex *pColIndexEx = &pQuery->pSelectExpr[index].base.colInfo; SColIndex *pColIndex = &pQuery->pSelectExpr[index].base.colInfo;
if (TSDB_COL_IS_TAG(pColIndexEx->flag)) { if (TSDB_COL_IS_TAG(pColIndex->flag)) {
return NULL; return NULL;
} }
/* /*
* Choose the right column field info by field id, since the file block may be out of date, * Choose the right column field info by field id, since the file block may be out of date,
* which means the newest table schema is not equalled to the schema of this block. * which means the newest table schema is not equalled to the schema of this block.
* TODO: speedup by using bsearch
*/ */
for (int32_t i = 0; i < pDataBlockInfo->numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
if (pColIndexEx->colId == pStatis[i].colId) { if (pColIndex->colId == pStatis[i].colId) {
return &pStatis[i]; return &pStatis[i];
} }
} }
return NULL; return NULL;
} }
...@@ -431,8 +432,7 @@ static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlo ...@@ -431,8 +432,7 @@ static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlo
* @param pColStatis * @param pColStatis
* @return * @return
*/ */
static bool hasNullValue(SQuery *pQuery, int32_t col, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, static bool hasNullValue(SQuery *pQuery, int32_t col, int32_t numOfCols, SDataStatis *pStatis, SDataStatis **pColStatis) {
SDataStatis **pColStatis) {
SColIndex *pColIndex = &pQuery->pSelectExpr[col].base.colInfo; SColIndex *pColIndex = &pQuery->pSelectExpr[col].base.colInfo;
if (TSDB_COL_IS_TAG(pColIndex->flag)) { if (TSDB_COL_IS_TAG(pColIndex->flag)) {
return false; return false;
...@@ -444,7 +444,7 @@ static bool hasNullValue(SQuery *pQuery, int32_t col, SDataBlockInfo *pDataBlock ...@@ -444,7 +444,7 @@ static bool hasNullValue(SQuery *pQuery, int32_t col, SDataBlockInfo *pDataBlock
} }
if (pStatis != NULL) { if (pStatis != NULL) {
*pColStatis = getStatisInfo(pQuery, pStatis, pDataBlockInfo, col); *pColStatis = getStatisInfo(pQuery, pStatis, numOfCols, col);
} else { } else {
*pColStatis = NULL; *pColStatis = NULL;
} }
...@@ -936,7 +936,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -936,7 +936,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
SDataStatis *tpField = NULL; SDataStatis *tpField = NULL;
bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo, pStatis, &tpField); bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo->numOfCols, pStatis, &tpField);
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, tpField, hasNull, setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, tpField, hasNull,
...@@ -1157,7 +1157,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1157,7 +1157,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
SDataStatis *pColStatis = NULL; SDataStatis *pColStatis = NULL;
bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo, pStatis, &pColStatis); bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo->numOfCols, pStatis, &pColStatis);
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, pColStatis, hasNull, setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, pColStatis, hasNull,
...@@ -2455,9 +2455,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2455,9 +2455,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
} }
SDataStatis *pStatis = NULL; SDataStatis *pStatis = NULL;
SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv), qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv),
...@@ -5610,18 +5610,18 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { ...@@ -5610,18 +5610,18 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
continue; continue;
} }
SColIndex *pColIndexEx = &pSqlExprMsg->colInfo; SColIndex *pColIndex = &pSqlExprMsg->colInfo;
if (!TSDB_COL_IS_TAG(pColIndexEx->flag)) { if (!TSDB_COL_IS_TAG(pColIndex->flag)) {
for (int32_t f = 0; f < pQuery->numOfCols; ++f) { for (int32_t f = 0; f < pQuery->numOfCols; ++f) {
if (pColIndexEx->colId == pQuery->colList[f].colId) { if (pColIndex->colId == pQuery->colList[f].colId) {
pColIndexEx->colIndex = f; pColIndex->colIndex = f;
break; break;
} }
} }
} else { } else {
for (int32_t f = 0; f < pQuery->numOfTags; ++f) { for (int32_t f = 0; f < pQuery->numOfTags; ++f) {
if (pColIndexEx->colId == pQuery->tagColList[f].colId) { if (pColIndex->colId == pQuery->tagColList[f].colId) {
pColIndexEx->colIndex = f; pColIndex->colIndex = f;
break; break;
} }
} }
......
...@@ -40,10 +40,6 @@ enum { ...@@ -40,10 +40,6 @@ enum {
TSDB_QUERY_TYPE_EXTERNAL = 3, TSDB_QUERY_TYPE_EXTERNAL = 3,
}; };
typedef struct SField {
// todo need the definition
} SField;
typedef struct SQueryFilePos { typedef struct SQueryFilePos {
int32_t fid; int32_t fid;
int32_t slot; int32_t slot;
...@@ -68,66 +64,56 @@ typedef struct SLoadCompBlockInfo { ...@@ -68,66 +64,56 @@ typedef struct SLoadCompBlockInfo {
} SLoadCompBlockInfo; } SLoadCompBlockInfo;
typedef struct STableCheckInfo { typedef struct STableCheckInfo {
STableId tableId; STableId tableId;
TSKEY lastKey; TSKEY lastKey;
STable* pTableObj; STable* pTableObj;
int32_t start; SCompInfo* pCompInfo;
SCompInfo* pCompInfo; int32_t compSize;
int32_t compSize; int32_t numOfBlocks; // number of qualified data blocks not the original blocks
int32_t numOfBlocks; // number of qualified data blocks not the original blocks SDataCols* pDataCols;
SDataCols* pDataCols; bool initBuf; // whether to initialize the in-memory skip list iterator or not
SSkipListIterator* iter; // mem buffer skip list iterator
SSkipListIterator* iter; // skip list iterator SSkipListIterator* iiter; // imem buffer skip list iterator
SSkipListIterator* iiter; // imem iterator
bool initBuf; // if we should initialize the in-memory skip list iterator
} STableCheckInfo; } STableCheckInfo;
typedef struct {
SCompBlock* compBlock;
SField* fields;
} SCompBlockFields;
typedef struct STableBlockInfo { typedef struct STableBlockInfo {
SCompBlockFields pBlock; SCompBlock* compBlock;
STableCheckInfo* pTableCheckInfo; STableCheckInfo* pTableCheckInfo;
int32_t blockIndex; // int32_t blockIndex;
int32_t groupIdx; /* number of group is less than the total number of tables */ // int32_t groupIdx; /* number of group is less than the total number of tables */
} STableBlockInfo; } STableBlockInfo;
typedef struct SBlockOrderSupporter { typedef struct SBlockOrderSupporter {
int32_t numOfTables; int32_t numOfTables;
STableBlockInfo** pDataBlockInfo; STableBlockInfo** pDataBlockInfo;
int32_t* blockIndexArray; int32_t* blockIndexArray;
int32_t* numOfBlocksPerTable; int32_t* numOfBlocksPerTable;
} SBlockOrderSupporter; } SBlockOrderSupporter;
typedef struct STsdbQueryHandle { typedef struct STsdbQueryHandle {
STsdbRepo* pTsdb; STsdbRepo* pTsdb;
SQueryFilePos cur; // current position SQueryFilePos cur; // current position
int16_t order;
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ STimeWindow window; // the primary query time window that applies to all queries
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ SCompBlock* pBlock;
SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time
int16_t order; int32_t numOfBlocks;
STimeWindow window; // the primary query time window that applies to all queries SArray* pColumns; // column list, SColumnInfoData array list
SCompBlock* pBlock; bool locateStart;
int32_t numOfBlocks; int32_t outputCapacity;
SField** pFields; int32_t realNumOfRows;
SArray* pColumns; // column list, SColumnInfoData array list SArray* pTableCheckInfo; //SArray<STableCheckInfo>
bool locateStart; int32_t activeIndex;
int32_t outputCapacity; bool checkFiles; // check file stage
int32_t realNumOfRows; void* qinfo; // query info handle, for debug purpose
SArray* pTableCheckInfo; //SArray<STableCheckInfo> int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
int32_t activeIndex;
bool checkFiles; // check file stage
void* qinfo; // query info handle, for debug purpose
int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
STableBlockInfo* pDataBlockInfo;
SFileGroup* pFileGroup; SFileGroup* pFileGroup;
SFileGroupIter fileIter; SFileGroupIter fileIter;
SRWHelper rhelper; SRWHelper rhelper;
STableBlockInfo* pDataBlockInfo;
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
} STsdbQueryHandle; } STsdbQueryHandle;
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
...@@ -148,23 +134,43 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable ...@@ -148,23 +134,43 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
// todo 2. add the reference count for each table that is involved in query // todo 2. add the reference count for each table that is involved in query
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
pQueryHandle->window = pCond->twindow; pQueryHandle->window = pCond->twindow;
pQueryHandle->pTsdb = tsdb; pQueryHandle->pTsdb = tsdb;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = -1;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = true;//ASCENDING_TRAVERSE(pQueryHandle->order);
pQueryHandle->activeIndex = 0; // current active table index
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb); tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
pQueryHandle->cur.fid = -1;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
// allocate buffer in order to load data blocks from file
int32_t numOfCols = pCond->numOfCols;
pQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array?
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0};
colInfo.info = pCond->colList[i];
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
taosArrayPush(pQueryHandle->pColumns, &colInfo);
pQueryHandle->statis[i].colId = colInfo.info.colId;
}
pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo)); pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo));
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
assert(pMeta != NULL);
for (int32_t i = 0; i < sizeOfGroup; ++i) { for (int32_t i = 0; i < sizeOfGroup; ++i) {
SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i); SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
size_t gsize = taosArrayGetSize(group); size_t gsize = taosArrayGetSize(group);
assert(gsize > 0); assert(gsize > 0);
...@@ -174,35 +180,18 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable ...@@ -174,35 +180,18 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
STableCheckInfo info = { STableCheckInfo info = {
.lastKey = pQueryHandle->window.skey, .lastKey = pQueryHandle->window.skey,
.tableId = *id, .tableId = *id,
.pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid), .pTableObj = tsdbGetTableByUid(pMeta, id->uid),
}; };
assert(info.pTableObj != NULL && info.pTableObj->tableId.tid == id->tid); assert(info.pTableObj != NULL && info.pTableObj->tableId.tid == id->tid);
taosArrayPush(pQueryHandle->pTableCheckInfo, &info); taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
} }
} }
uTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
/*
* For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place
* in case of descending timestamp order query.
*/
pQueryHandle->checkFiles = true;//ASCENDING_TRAVERSE(pQueryHandle->order);
pQueryHandle->activeIndex = 0;
// allocate buffer in order to load data blocks from file
int32_t numOfCols = pCond->numOfCols;
pQueryHandle->outputCapacity = 4096;
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0};
colInfo.info = pCond->colList[i]; for(int32_t i = 0; i < numOfCols; ++i) {
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
taosArrayPush(pQueryHandle->pColumns, &colInfo);
} }
uTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
...@@ -499,8 +488,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -499,8 +488,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
bool blockLoaded = false; bool blockLoaded = false;
SArray* sa = getDefaultLoadColumns(pQueryHandle, true); SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
if (pCheckInfo->pDataCols == NULL) { // todo: why not the real data? if (pCheckInfo->pDataCols == NULL) {
pCheckInfo->pDataCols = tdNewDataCols(pRepo->tsdbMeta->maxRowBytes, pRepo->tsdbMeta->maxCols, pRepo->config.maxRowsPerFileBlock); STsdbMeta* pMeta = tsdbGetMeta(pRepo);
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
} }
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj)); tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj));
...@@ -522,8 +512,6 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -522,8 +512,6 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
} }
static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock); SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock);
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
...@@ -591,9 +579,12 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* ...@@ -591,9 +579,12 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
cur->blockCompleted = false; cur->blockCompleted = false;
return; return;
} }
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
taosArrayDestroy(sa);
} else { } else {
pQueryHandle->realNumOfRows = binfo.rows; pQueryHandle->realNumOfRows = binfo.rows;
...@@ -1074,7 +1065,9 @@ static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t n ...@@ -1074,7 +1065,9 @@ static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t n
tfree(pSupporter->blockIndexArray); tfree(pSupporter->blockIndexArray);
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
tfree(pSupporter->pDataBlockInfo[i]); STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
// tfree(pBlockInfo->statis);
tfree(pBlockInfo);
} }
tfree(pSupporter->pDataBlockInfo); tfree(pSupporter->pDataBlockInfo);
...@@ -1100,14 +1093,14 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* ...@@ -1100,14 +1093,14 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void*
STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex]; STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex]; STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex];
// assert(pLeftBlockInfoEx->pBlock.compBlock->offset != pRightBlockInfoEx->pBlock.compBlock->offset); // assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
if (pLeftBlockInfoEx->pBlock.compBlock->offset == pRightBlockInfoEx->pBlock.compBlock->offset && if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
pLeftBlockInfoEx->pBlock.compBlock->last == pRightBlockInfoEx->pBlock.compBlock->last) { pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
// todo add more information // todo add more information
uError("error in header file, two block with same offset:%p", pLeftBlockInfoEx->pBlock.compBlock->offset); uError("error in header file, two block with same offset:%p", pLeftBlockInfoEx->compBlock->offset);
} }
return pLeftBlockInfoEx->pBlock.compBlock->offset > pRightBlockInfoEx->pBlock.compBlock->offset ? 1 : -1; return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
} }
static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) { static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
...@@ -1116,7 +1109,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1116,7 +1109,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
pQueryHandle->pDataBlockInfo = (STableBlockInfo*)tmp; pQueryHandle->pDataBlockInfo = (STableBlockInfo*) tmp;
memset(pQueryHandle->pDataBlockInfo, 0, sizeof(STableBlockInfo) * numOfBlocks); memset(pQueryHandle->pDataBlockInfo, 0, sizeof(STableBlockInfo) * numOfBlocks);
*numOfAllocBlocks = numOfBlocks; *numOfAllocBlocks = numOfBlocks;
...@@ -1132,9 +1125,10 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1132,9 +1125,10 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
cleanBlockOrderSupporter(&sup, 0); cleanBlockOrderSupporter(&sup, 0);
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
int32_t cnt = 0; int32_t cnt = 0;
int32_t numOfQualTables = 0; int32_t numOfQualTables = 0;
for (int32_t j = 0; j < numOfTables; ++j) { for (int32_t j = 0; j < numOfTables; ++j) {
STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j); STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j);
if (pTableCheck->numOfBlocks <= 0) { if (pTableCheck->numOfBlocks <= 0) {
...@@ -1153,14 +1147,12 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1153,14 +1147,12 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf; sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) { for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
STableBlockInfo* pBlockInfoEx = &sup.pDataBlockInfo[numOfQualTables][k]; STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k];
pBlockInfoEx->pBlock.compBlock = &pBlock[k];
pBlockInfoEx->pBlock.fields = NULL;
pBlockInfoEx->pTableCheckInfo = pTableCheck; pBlockInfo->compBlock = &pBlock[k];
// pBlockInfoEx->groupIdx = pTableCheckInfo[j]->groupIdx; // set the group index pBlockInfo->pTableCheckInfo = pTableCheck;
// pBlockInfoEx->blockIndex = pTableCheckInfo[j]->start + k; // set the block index in original table // pBlockInfo->groupIdx = pTableCheckInfo[j]->groupIdx; // set the group index
// pBlockInfo->blockIndex = pTableCheckInfo[j]->start + k; // set the block index in original table
cnt++; cnt++;
} }
...@@ -1185,8 +1177,8 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1185,8 +1177,8 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
int32_t pos = pTree->pNode[0].index; int32_t pos = pTree->pNode[0].index;
int32_t index = sup.blockIndexArray[pos]++; int32_t index = sup.blockIndexArray[pos]++;
STableBlockInfo* pBlocksInfoEx = sup.pDataBlockInfo[pos]; STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
pQueryHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfoEx[index]; pQueryHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index];
// set data block index overflow, in order to disable the offset comparator // set data block index overflow, in order to disable the offset comparator
if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) { if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
...@@ -1199,7 +1191,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1199,7 +1191,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
/* /*
* available when no import exists * available when no import exists
* for(int32_t i = 0; i < cnt - 1; ++i) { * for(int32_t i = 0; i < cnt - 1; ++i) {
* assert((*pDataBlockInfo)[i].pBlock.compBlock->offset < (*pDataBlockInfo)[i+1].pBlock.compBlock->offset); * assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset);
* } * }
*/ */
...@@ -1255,7 +1247,7 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) { ...@@ -1255,7 +1247,7 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
cur->fid = pQueryHandle->pFileGroup->fileId; cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pBlockInfo->pBlock.compBlock, pBlockInfo->pTableCheckInfo); return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo);
} }
static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
...@@ -1291,10 +1283,10 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { ...@@ -1291,10 +1283,10 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
cur->blockCompleted = false; cur->blockCompleted = false;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pNext->pBlock.compBlock, pNext->pTableCheckInfo); return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo);
} }
} else { } else {
handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->pBlock.compBlock, pCheckInfo); handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
return pQueryHandle->realNumOfRows > 0; return pQueryHandle->realNumOfRows > 0;
} }
} }
...@@ -1484,35 +1476,33 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { ...@@ -1484,35 +1476,33 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
// there are data in file // there are data in file
if (pHandle->cur.fid >= 0) { if (pHandle->cur.fid >= 0) {
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot]; STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; STable* pTable = pBlockInfo->pTableCheckInfo->pTableObj;
STable* pTable = pCheckInfo->pTableObj;
if (pHandle->cur.mixBlock) { SDataBlockInfo blockInfo = {
SDataBlockInfo blockInfo = { .uid = pTable->tableId.uid,
.uid = pTable->tableId.uid, .tid = pTable->tableId.tid,
.tid = pTable->tableId.tid, .rows = pHandle->cur.rows,
.rows = pHandle->cur.rows, .window = pHandle->cur.win,
.window = pHandle->cur.win, .numOfCols = QH_GET_NUM_OF_COLS(pHandle),
}; };
return blockInfo; return blockInfo;
} else {
return getTrueDataBlockInfo(pCheckInfo, pBlockInfo->pBlock.compBlock);
}
} else { } else {
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
SQueryFilePos* cur = &pHandle->cur;
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
if (pTable->mem != NULL) { // create mem table iterator if it is not created yet if (pTable->mem != NULL) { // create mem table iterator if it is not created yet
assert(pCheckInfo->iter != NULL); assert(pCheckInfo->iter != NULL);
STimeWindow* win = &pHandle->cur.win; STimeWindow* win = &cur->win;
pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey, pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey,
pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API
// update the last key value // update the last key value
pCheckInfo->lastKey = win->ekey + step; pCheckInfo->lastKey = win->ekey + step;
cur->lastKey = win->ekey + step;
cur->mixBlock = true;
} }
if (!ASCENDING_TRAVERSE(pHandle->order)) { if (!ASCENDING_TRAVERSE(pHandle->order)) {
...@@ -1524,15 +1514,34 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { ...@@ -1524,15 +1514,34 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
.tid = pTable->tableId.tid, .tid = pTable->tableId.tid,
.rows = pHandle->cur.rows, .rows = pHandle->cur.rows,
.window = pHandle->cur.win, .window = pHandle->cur.win,
.numOfCols = QH_GET_NUM_OF_COLS(pHandle),
}; };
return blockInfo; return blockInfo;
} }
} }
// return null for data block in cache /*
* return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
*/
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) { int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) {
*pBlockStatis = NULL; STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
SQueryFilePos* cur = &pHandle->cur;
if (cur->mixBlock) {
*pBlockStatis = NULL;
return TSDB_CODE_SUCCESS;
}
assert((cur->slot >= 0 && cur->slot < pHandle->numOfBlocks) ||
((cur->slot == pHandle->numOfBlocks) && (cur->slot == 0)));
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, QH_GET_NUM_OF_COLS(pHandle));
*pBlockStatis = pHandle->statis;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1546,13 +1555,13 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { ...@@ -1546,13 +1555,13 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
if (pHandle->cur.fid < 0) { if (pHandle->cur.fid < 0) {
return pHandle->pColumns; return pHandle->pColumns;
} else { } else {
STableBlockInfo* pBlockInfoEx = &pHandle->pDataBlockInfo[pHandle->cur.slot]; STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
STableCheckInfo* pCheckInfo = pBlockInfoEx->pTableCheckInfo; STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
if (pHandle->cur.mixBlock) { if (pHandle->cur.mixBlock) {
return pHandle->pColumns; return pHandle->pColumns;
} else { } else {
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfoEx->pBlock.compBlock); SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfo->compBlock);
assert(pHandle->realNumOfRows <= binfo.rows); assert(pHandle->realNumOfRows <= binfo.rows);
// data block has been loaded, todo extract method // data block has been loaded, todo extract method
...@@ -1562,7 +1571,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { ...@@ -1562,7 +1571,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) { pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) {
return pHandle->pColumns; return pHandle->pColumns;
} else { // only load the file block } else { // only load the file block
SCompBlock* pBlock = pBlockInfoEx->pBlock.compBlock; SCompBlock* pBlock = pBlockInfo->compBlock;
doLoadFileDataBlock(pHandle, pBlock, pCheckInfo); doLoadFileDataBlock(pHandle, pBlock, pCheckInfo);
// todo refactor // todo refactor
...@@ -2006,8 +2015,9 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -2006,8 +2015,9 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
} }
taosArrayDestroy(pQueryHandle->pColumns); taosArrayDestroy(pQueryHandle->pColumns);
tfree(pQueryHandle->pDataBlockInfo); tfree(pQueryHandle->pDataBlockInfo);
tfree(pQueryHandle->statis);
tsdbDestroyHelper(&pQueryHandle->rhelper); tsdbDestroyHelper(&pQueryHandle->rhelper);
tfree(pQueryHandle); tfree(pQueryHandle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册