提交 6a978be7 编写于 作者: dengyihao's avatar dengyihao

opt mem

上级 5abeacc2
......@@ -157,6 +157,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTab
const char *idstr);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
......@@ -3775,7 +3775,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
return false;
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
bool tsdbTableNextDataBlock(STsdbReader* pReader, int64_t uid) {
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
if (pBlockScanInfo == NULL) { // no data block for the table of given uid
return false;
......@@ -347,6 +347,8 @@ typedef struct STableMergeScanInfo {
bool hasGroupId;
uint64_t groupId;
SArray* dataReaders; // array of tsdbReaderT*
SArray* queryConds; // array of queryTableDataCond
STsdbReader* pReader;
SReadHandle readHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
......@@ -42,7 +42,7 @@ static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const S
static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time", "columns",
"ttl", "stable_name", "vgroup_id', 'uid", "type"};
static char* SYSTABLE_IDX_EXCEPT[] = {"db_name", "vgroup_id"};
static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"};
typedef int32_t (*__sys_filte)(void* pMeta, SNode* cond, SArray* result);
typedef int32_t (*__sys_check)(SNode* cond);
......@@ -363,7 +363,8 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pLimitInfo->remainOffset -= pBlock->info.rows;
pBlock->info.rows = 0;
qDebug("current block ignore due to offset, current:%"PRId64", %s", pLimitInfo->remainOffset, GET_TASKID(pTaskInfo));
qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset,
} else {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
pLimitInfo->remainOffset = 0;
......@@ -376,9 +377,9 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
int32_t keep = pBlock->info.rows - overflowRows;
blockDataKeepFirstNRows(pBlock, keep);
qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -3248,18 +3249,80 @@ static int tableUidCompare(const void* a, const void* b) {
return u1 < u2 ? -1 : 1;
typedef struct MergeIndex {
int idx;
int len;
} MergeIndex;
static FORCE_INLINE int optSysBinarySearch(SArray* arr, int s, int e, uint64_t k) {
uint64_t v;
int32_t m;
while (s <= e) {
m = s + (e - s) / 2;
v = *(uint64_t*)taosArrayGet(arr, m);
if (v >= k) {
e = m - 1;
} else {
s = m + 1;
return s;
void optSysIntersection(SArray* in, SArray* out) {
int32_t sz = (int32_t)taosArrayGetSize(in);
if (sz <= 0) {
MergeIndex* mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
for (int i = 0; i < sz; i++) {
SArray* t = taosArrayGetP(in, i);
mi[i].len = (int32_t)taosArrayGetSize(t);
mi[i].idx = 0;
SArray* base = taosArrayGetP(in, 0);
for (int i = 0; i < taosArrayGetSize(base); i++) {
uint64_t tgt = *(uint64_t*)taosArrayGet(base, i);
bool has = true;
for (int j = 1; j < taosArrayGetSize(in); j++) {
SArray* oth = taosArrayGetP(in, j);
int mid = optSysBinarySearch(oth, mi[j].idx, mi[j].len - 1, tgt);
if (mid >= 0 && mid < mi[j].len) {
uint64_t val = *(uint64_t*)taosArrayGet(oth, mid);
has = (val == tgt ? true : false);
mi[j].idx = mid;
} else {
has = false;
if (has == true) {
taosArrayPush(out, &tgt);
static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt) {
// TODO, find comm mem from mRslt
for (int i = 0; i < taosArrayGetSize(mRslt); i++) {
SArray* aRslt = taosArrayGetP(mRslt, i);
taosArrayAddAll(rslt, aRslt);
SArray* arslt = taosArrayGetP(mRslt, i);
taosArraySort(arslt, tableUidCompare);
optSysIntersection(mRslt, rslt);
return 0;
static int32_t optSysSpecialColumn(SNode* cond) {
SOperatorNode* pOper = (SOperatorNode*)cond;
SColumnNode* pCol = (SColumnNode*)pOper->pLeft;
for (int i = 0; i < sizeof(SYSTABLE_SPECIAL_COL) / sizeof(SYSTABLE_SPECIAL_COL[0]); i++) {
if (0 == strcmp(pCol->colName, SYSTABLE_SPECIAL_COL[i])) {
return 1;
taosArraySort(rslt, tableUidCompare);
taosArrayRemoveDuplicate(rslt, tableUidCompare, NULL);
return 0;
static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
int ret = -1;
if (nodeType(cond) == QUERY_NODE_OPERATOR) {
......@@ -3283,7 +3346,6 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
SNodeList* pList = (SNodeList*)pNode->pParameterList;
int32_t len = LIST_LENGTH(pList);
if (len <= 0) return ret;
bool hasIdx = false;
bool hasRslt = true;
......@@ -3294,12 +3356,16 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
if (cell == NULL) break;
SArray* aRslt = taosArrayInit(16, sizeof(int64_t));
ret = optSysTabFilteImpl(arg, cell->pNode, aRslt);
if (ret == 0) {
// has index
hasIdx = true;
if (optSysSpecialColumn(cell->pNode) == 0) {
taosArrayPush(mRslt, &aRslt);
} else {
// db_name/vgroup not result
} else if (ret == -2) {
// current vg
hasIdx = true;
......@@ -4169,6 +4235,131 @@ int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle*
int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle,
STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx,
STsdbReader** ppReader, const char* idstr) {
STsdbReader* pReader = NULL;
SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo));
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i));
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, &pReader, idstr);
if (code != 0) {
return code;
*ppReader = pReader;
static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
SSDataBlock* pBlock, uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableMergeScanInfo* pInfo = pOperator->info;
uint64_t uid = pBlock->info.uid;
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
pCost->totalBlocks += 1;
pCost->totalRows += pBlock->info.rows;
*status = pInfo->dataBlockLoadFlag;
if (pTableScanInfo->pFilterNode != NULL ||
overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
SDataBlockInfo* pBlockInfo = &pBlock->info;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1;
// clear all data in pBlock that are set when handing the previous block
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
pcol->pData = NULL;
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
pCost->loadBlockStatis += 1;
bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL;
STsdbReader* reader = pTableScanInfo->pReader;
tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg);
if (allColumnsHaveAgg == true) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
// todo create this buffer during creating operator
if (pBlock->pBlockAgg == NULL) {
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i);
if (!pColMatchInfo->needOutput) {
pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i];
} else { // failed to load the block sma data, data block statistics does not exist, load data block instead
pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1;
STsdbReader* reader = pTableScanInfo->pReader;
SArray* pCols = tsdbRetrieveDataBlock(reader, NULL);
if (pCols == NULL) {
return terrno;
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
// currently only the tbname pseudo column
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
if (pTableScanInfo->pFilterNode != NULL) {
int64_t st = taosGetTimestampMs();
doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL);
double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += el;
if (pBlock->info.rows == 0) {
pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
} else {
qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
// todo refactor
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
......@@ -4291,9 +4482,137 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
typedef struct STableMergeScanSortSourceParam {
SOperatorInfo* pOperator;
int32_t readerIdx;
int64_t uid;
SSDataBlock* inputBlock;
} STableMergeScanSortSourceParam;
static SSDataBlock* getTableDataBlockTemp(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t readIdx = source->readerIdx;
SSDataBlock* pBlock = source->inputBlock;
STableMergeScanInfo* pTableScanInfo = pOperator->info;
SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx);
int64_t st = taosGetTimestampUs();
SArray* subTable = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(subTable, taosArrayGet(pInfo->tableListInfo->pTableList, readIdx + pInfo->tableStartIndex));
SReadHandle* pHandle = &pInfo->readHandle;
tsdbReaderOpen(pHandle->vnode, pQueryCond, subTable, &pInfo->pReader, GET_TASKID(pTaskInfo));
STsdbReader* reader = pInfo->pReader;
while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
// process this data block based on the probabilities
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
if (!processThisBlock) {
SDataBlockInfo binfo = pBlock->info;
tsdbRetrieveDataBlockInfo(reader, &binfo);
blockDataEnsureCapacity(pBlock, binfo.rows);
pBlock->info.type = binfo.type;
pBlock->info.uid = binfo.uid;
pBlock->info.window = binfo.window;
pBlock->info.rows = binfo.rows;
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
uint32_t status = 0;
int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readIdx, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pOperator->pTaskInfo->env, code);
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
if (groupId) {
pBlock->info.groupId = *groupId;
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock;
return NULL;
static SSDataBlock* getTableDataBlock2(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
int64_t uid = source->uid;
SSDataBlock* pBlock = source->inputBlock;
STableMergeScanInfo* pTableScanInfo = pOperator->info;
int64_t st = taosGetTimestampUs();
STsdbReader* reader = pTableScanInfo->pReader;
while (tsdbTableNextDataBlock(reader, uid)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
// process this data block based on the probabilities
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
if (!processThisBlock) {
SDataBlockInfo binfo = pBlock->info;
tsdbRetrieveDataBlockInfo(reader, &binfo);
blockDataEnsureCapacity(pBlock, binfo.rows);
pBlock->info.type = binfo.type;
pBlock->info.uid = binfo.uid;
pBlock->info.window = binfo.window;
pBlock->info.rows = binfo.rows;
uint32_t status = 0;
int32_t code = loadDataBlockFromOneTable2(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pOperator->pTaskInfo->env, code);
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
if (groupId) {
pBlock->info.groupId = *groupId;
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock;
return NULL;
static SSDataBlock* getTableDataBlock(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
......@@ -4372,6 +4691,14 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
return pList;
int32_t dumpSQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
for (int i = 0; i < src->numOfCols; i++) {
dst->colList[i] = src->colList[i];
return 0;
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -4392,9 +4719,9 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t tableEndIdx = pInfo->tableEndIndex;
STableListInfo* tableListInfo = pInfo->tableListInfo;
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
pInfo->dataReaders, GET_TASKID(pTaskInfo));
pInfo->pReader = NULL;
// todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result
......@@ -4403,18 +4730,27 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockTemp, NULL, NULL);
// one table has one data block
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));
for (int32_t i = 0; i < numOfTable; ++i) {
STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i + tableStartIdx);
size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
for (int32_t i = 0; i < numReaders; ++i) {
STableMergeScanSortSourceParam param = {0};
param.readerIdx = i;
param.pOperator = pOperator;
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
taosArrayPush(pInfo->sortSourceParams, &param);
SQueryTableDataCond cond;
dumpSQueryTableCond(&pInfo->cond, &cond);
taosArrayPush(pInfo->queryConds, &cond);
for (int32_t i = 0; i < numReaders; ++i) {
for (int32_t i = 0; i < numOfTable; ++i) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
ps->param = param;
......@@ -4548,6 +4884,8 @@ void destroyTableMergeScanOperatorInfo(void* param) {
if (pTableScanInfo->matchInfo.pList != NULL) {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册