提交 bd2caded 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 d88664a3
......@@ -181,6 +181,11 @@ typedef struct SDataBlockInfo {
int32_t tid;
} SDataBlockInfo;
typedef struct SFileBlockInfo {
int32_t len;
int32_t numOfRows;
} SFileBlockInfo;
typedef struct {
void *pTable;
TSKEY lastKey;
......@@ -341,6 +346,8 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond);
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList);
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, SArray* pBlockInfo);
/**
* get the statistics of repo usage
* @param repo. point to the tsdbrepo
......
......@@ -55,7 +55,6 @@ static FORCE_INLINE char *getPosInResultPage(SQuery *pQuery, tFilePage* page, in
assert(rowOffset >= 0 && pQuery != NULL);
int32_t numOfRows = GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery);
// return ((char *)page->data) + offset * numOfRowsPerPage + bytes * realRowId;
return ((char *)page->data) + rowOffset + offset * numOfRows;
}
......@@ -71,8 +70,6 @@ void* destroyResultRowPool(SResultRowPool* p);
int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
int32_t getNumOfUsedResultRows(SResultRowPool* p);
bool isPointInterpoQuery(SQuery *pQuery);
typedef struct {
SArray* pResult; // SArray<SResPair>
int32_t colId;
......@@ -82,6 +79,16 @@ void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen);
SArray* interResFromBinary(const char* data, int32_t len);
void freeInterResult(void* param);
typedef struct {
int64_t numOfTables;
SArray *dataBlockInfos;
int64_t firstSeekTimeUs;
int64_t numOfRowsInMemTable;
} STableBlockDist;
void blockDistInfoToBinary(SBufferWriter* bw, STableBlockDist* pDist);
void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist);
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo);
......
......@@ -61,13 +61,6 @@ typedef struct {
STSCursor cur;
} SQueryStatusInfo;
typedef struct {
SArray *dataBlockInfos;
int64_t firstSeekTimeUs;
int64_t numOfRowsInMemTable;
char *result;
} STableBlockDist;
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = rand();
......@@ -199,6 +192,7 @@ static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResult
static void destroyOperatorInfo(SOperatorInfo* pOperator);
void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size);
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
static bool isPointInterpoQuery(SQuery *pQuery);
// setup the output buffer for each operator
static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) {
......@@ -3738,7 +3732,6 @@ void queryCostStatis(SQInfo *pQInfo) {
static void freeTableBlockDist(STableBlockDist *pTableBlockDist) {
if (pTableBlockDist != NULL) {
taosArrayDestroy(pTableBlockDist->dataBlockInfos);
free(pTableBlockDist->result);
free(pTableBlockDist);
}
}
......@@ -3765,12 +3758,13 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) {
if (pTableBlockDist == NULL) {
return;
}
#if 0
int64_t min = INT64_MAX, max = INT64_MIN, avg = 0;
SArray* blockInfos= pTableBlockDist->dataBlockInfos;
int64_t totalRows = 0, totalBlocks = taosArrayGetSize(blockInfos);
for (size_t i = 0; i < taosArrayGetSize(blockInfos); i++) {
SDataBlockInfo *blockInfo = taosArrayGet(blockInfos, i);
int64_t rows = blockInfo->rows;
SFileBlockInfo *blockInfo = taosArrayGet(blockInfos, i);
int64_t rows = blockInfo->numOfRows;
min = MIN(min, rows);
max = MAX(max, rows);
totalRows += rows;
......@@ -3778,18 +3772,16 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) {
avg = totalBlocks > 0 ? (int64_t)(totalRows/totalBlocks) : 0;
taosArraySort(blockInfos, compareBlockInfo);
int sz = sprintf(pTableBlockDist->result,
"summery: \n\t 5th=[%d], 25th=[%d], 50th=[%d],75th=[%d], 95th=[%d], 99th=[%d] \n\t min=[%"PRId64"], max=[%"PRId64"], avg = [%"PRId64"] \n\t totalRows=[%"PRId64"], totalBlocks=[%"PRId64"] \n\t seekHeaderTimeCost=[%"PRId64"(us)] \n\t rowsInMem=[%"PRId64"]",
getPercentileFromSortedArray(blockInfos, 0.05), getPercentileFromSortedArray(blockInfos, 0.25), getPercentileFromSortedArray(blockInfos, 0.50),
getPercentileFromSortedArray(blockInfos, 0.75), getPercentileFromSortedArray(blockInfos, 0.95), getPercentileFromSortedArray(blockInfos, 0.99),
min, max, avg,
totalRows, totalBlocks,
pTableBlockDist->firstSeekTimeUs,
pTableBlockDist->numOfRowsInMemTable);
UNUSED(sz);
return;
}
#endif
// int sz = sprintf(pTableBlockDist->result,
// "summary: \n\t 5th=[%d], 25th=[%d], 50th=[%d],75th=[%d], 95th=[%d], 99th=[%d] \n\t min=[%"PRId64"], max=[%"PRId64"], avg = [%"PRId64"] \n\t totalRows=[%"PRId64"], totalBlocks=[%"PRId64"] \n\t seekHeaderTimeCost=[%"PRId64"(us)] \n\t rowsInMem=[%"PRId64"]",
// getPercentileFromSortedArray(blockInfos, 0.05), getPercentileFromSortedArray(blockInfos, 0.25), getPercentileFromSortedArray(blockInfos, 0.50),
// getPercentileFromSortedArray(blockInfos, 0.75), getPercentileFromSortedArray(blockInfos, 0.95), getPercentileFromSortedArray(blockInfos, 0.99),
// min, max, avg,
// totalRows, totalBlocks,
// pTableBlockDist->firstSeekTimeUs,
// pTableBlockDist->numOfRowsInMemTable);
}
//void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
// SQuery *pQuery = pRuntimeEnv->pQuery;
//
......@@ -4412,6 +4404,26 @@ static SSDataBlock* doSeqTableBlocksScan(void* param) {
pTableScanInfo->pQueryHandle = pRuntimeEnv->pQueryHandle;
pTableScanInfo->externalLoaded = false;
}
}
static SSDataBlock* doBlockInfoScan(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo*)param;
STableScanInfo *pTableScanInfo = pOperator->info;
STableBlockDist tableBlockDist = {0};
tableBlockDist.dataBlockInfos = taosArrayInit(512, sizeof(SFileBlockInfo));
tsdbGetFileBlocksDistInfo(pTableScanInfo->pQueryHandle, tableBlockDist.dataBlockInfos);
tableBlockDist.numOfRowsInMemTable = tsdbGetNumOfRowsInMemTable(pTableScanInfo->pQueryHandle);
SSDataBlock* pBlock = &pTableScanInfo->block;
pBlock->info.rows = 1;
pBlock->info.numOfCols = 1;
}
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) {
......@@ -5390,40 +5402,23 @@ void tableQueryImpl(SQInfo *pQInfo) {
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
}
void buildTableBlockDistResult(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
// pQuery->pos = 0;
STableBlockDist *pTableBlockDist = calloc(1, sizeof(STableBlockDist));
pTableBlockDist->dataBlockInfos = taosArrayInit(512, sizeof(SDataBlockInfo));
pTableBlockDist->result = (char *)malloc(512);
STableBlockDist *pTableBlockDist = calloc(1, sizeof(STableBlockDist));
pTableBlockDist->dataBlockInfos = taosArrayInit(512, sizeof(SFileBlockInfo));
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
SSchema blockDistSchema = tGetBlockDistColumnSchema();
int64_t startTime = taosGetTimestampUs();
while (tsdbNextDataBlockWithoutMerge(pQueryHandle)) {
if (isQueryKilled(pRuntimeEnv->qinfo)) {
freeTableBlockDist(pTableBlockDist);
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
if (pTableBlockDist->firstSeekTimeUs == 0) {
pTableBlockDist->firstSeekTimeUs = taosGetTimestampUs() - startTime;
}
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
taosArrayPush(pTableBlockDist->dataBlockInfos, &blockInfo);
}
if (terrno != TSDB_CODE_SUCCESS) {
freeTableBlockDist(pTableBlockDist);
longjmp(pRuntimeEnv->env, terrno);
}
pTableBlockDist->numOfRowsInMemTable = tsdbGetNumOfRowsInMemTable(pQueryHandle);
// int64_t startTime = taosGetTimestampUs();
tsdbGetFileBlocksDistInfo(pQueryHandle, pTableBlockDist->dataBlockInfos);
pTableBlockDist->numOfRowsInMemTable = tsdbGetNumOfRowsInMemTable(pQueryHandle);
generateBlockDistResult(pTableBlockDist);
// generateBlockDistResult(pTableBlockDist);
int type = -1;
assert(pQuery->numOfOutput == 1);
......@@ -5432,6 +5427,7 @@ void buildTableBlockDistResult(SQInfo *pQInfo) {
if (pExprInfo[j].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
type = blockDistSchema.type;
}
assert(type == TSDB_DATA_TYPE_BINARY);
// STR_WITH_SIZE_TO_VARSTR(pQuery->sdata[j]->data, pTableBlockDist->result, (VarDataLenT)strlen(pTableBlockDist->result));
}
......
......@@ -243,6 +243,7 @@ void* destroyResultRowPool(SResultRowPool* p) {
return NULL;
}
// TODO refactor
void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen) {
uint32_t numOfGroup = (uint32_t) taosArrayGetSize(pRes);
tbufWriteUint32(bw, numOfGroup);
......@@ -572,10 +573,6 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu
incNextGroup(pGroupResInfo);
}
// if (pGroupResInfo->currentGroup >= pGroupResInfo->totalGroup && !hasRemainData(pGroupResInfo)) {
// SET_STABLE_QUERY_OVER(pRuntimeEnv);
// }
int64_t elapsedTime = taosGetTimestampUs() - st;
qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pRuntimeEnv->qinfo,
pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
......@@ -583,3 +580,26 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu
// pQInfo->summary.firstStageMergeTime += elapsedTime;
return TSDB_CODE_SUCCESS;
}
/*
* typedef struct {
int64_t numOfTables;
SArray *dataBlockInfos;
int64_t firstSeekTimeUs;
int64_t numOfRowsInMemTable;
} STableBlockDist;
*
*/
SBufferWriter* blockDistInfoToBinary(STableBlockDist* pDist) {
SBufferWriter bw = tbufInitWriter(NULL, false);
tbufWriteUint64(&bw, pDist->numOfTables);
tbufWriteUint64(&bw, pDist->numOfRowsInMemTable);
tbufWriteUint64(&bw, taosArrayGetSize(pDist->dataBlockInfos));
pDist->dataBlockInfos->pData
}
void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist) {
}
......@@ -78,8 +78,8 @@ typedef struct STableCheckInfo {
} STableCheckInfo;
typedef struct STableBlockInfo {
SBlock* compBlock;
STableCheckInfo* pTableCheckInfo;
SBlock *compBlock;
STableCheckInfo *pTableCheckInfo;
} STableBlockInfo;
typedef struct SBlockOrderSupporter {
......@@ -2006,6 +2006,89 @@ static void moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) {
cur->blockCompleted = false;
}
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, SArray* pBlockInfo) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle;
STsdbFS* pFileHandle = REPO_FS(pQueryHandle->pTsdb);
// find the start data block in file
pQueryHandle->locateStart = true;
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
tsdbRLockFS(pFileHandle);
tsdbFSIterInit(&pQueryHandle->fileIter, pFileHandle, pQueryHandle->order);
tsdbFSIterSeek(&pQueryHandle->fileIter, fid);
tsdbUnLockFS(pFileHandle);
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfBlocks = 0;
int32_t numOfTables = (int32_t)taosArrayGetSize(pQueryHandle->pTableCheckInfo);
STimeWindow win = TSWINDOW_INITIALIZER;
while (true) {
numOfBlocks = 0;
tsdbRLockFS(REPO_FS(pQueryHandle->pTsdb));
if ((pQueryHandle->pFileGroup = tsdbFSIterNext(&pQueryHandle->fileIter)) == NULL) {
tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb));
break;
}
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) {
tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb));
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %p", pQueryHandle,
pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo);
pQueryHandle->pFileGroup = NULL;
break;
}
if (tsdbSetAndOpenReadFSet(&pQueryHandle->rhelper, pQueryHandle->pFileGroup) < 0) {
tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb));
code = terrno;
break;
}
tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb));
if (tsdbLoadBlockIdx(&pQueryHandle->rhelper) < 0) {
code = terrno;
break;
}
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
break;
}
tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks, numOfTables,
pQueryHandle->pFileGroup->fid, pQueryHandle->qinfo);
if (numOfBlocks == 0) {
continue;
}
SFileBlockInfo info = {0};
for (int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
info.numOfRows = pBlock[j].numOfRows;
info.len = pBlock[j].len;
taosArrayPush(pBlockInfo, &info);
}
}
}
return code;
}
static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
STsdbFS* pFileHandle = REPO_FS(pQueryHandle->pTsdb);
SQueryFilePos* cur = &pQueryHandle->cur;
......@@ -2284,41 +2367,6 @@ bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT* pHandle) {
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables > 0);
if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST && pQueryHandle->cachelastrow) {
// the last row is cached in buffer, return it directly.
// here note that the pQueryHandle->window must be the TS_INITIALIZER
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
SQueryFilePos* cur = &pQueryHandle->cur;
SDataRow pRow = NULL;
TSKEY key = TSKEY_INITIAL_VAL;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
if (++pQueryHandle->activeIndex < numOfTables) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
if (ret != TSDB_CODE_SUCCESS) {
return false;
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL);
tfree(pRow);
// update the last key value
pCheckInfo->lastKey = key + step;
cur->rows = 1; // only one row
cur->lastKey = key + step;
cur->mixBlock = true;
cur->win.skey = key;
cur->win.ekey = key;
return true;
}
return false;
}
if (pQueryHandle->checkFiles) {
// check if the query range overlaps with the file data block
bool exists = true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册