提交 40bdfdb2 编写于 作者: H Haojun Liao

[td-225] refactor

上级 61cd3ff9
......@@ -253,6 +253,7 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle);
* @return
*/
bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle);
/**
* move to next block if exists but not merge data in memtable
*
......@@ -336,6 +337,8 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo *tsdb, SArray *pTableIdList, STabl
*/
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond);
/**
* get the statistics of repo usage
* @param repo. point to the tsdbrepo
......
......@@ -188,15 +188,15 @@ typedef struct SSDataBlock {
typedef struct SQuery {
SLimitVal limit;
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
bool queryBlockDist; // if query data block distribution
bool stabledev; // super table stddev query
int32_t interBufSize; // intermediate buffer sizse
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
bool queryBlockDist; // if query data block distribution
bool stabledev; // super table stddev query
int32_t interBufSize; // intermediate buffer sizse
SOrderVal order;
......@@ -267,7 +267,7 @@ typedef struct SQueryRuntimeEnv {
char* tagVal; // tag value of current data block
SArithmeticSupport *sasArray;
struct STableScanInfo* pi;
SOperatorInfo* pi;
SSDataBlock *outputBuf;
int32_t groupIndex;
......@@ -329,8 +329,10 @@ typedef struct SQueryParam {
SSqlGroupbyExpr *pGroupbyExpr;
} SQueryParam;
typedef SSDataBlock* (*__operator_fn_t)(void* param);
typedef struct STableScanInfo {
SQueryRuntimeEnv* pRuntimeEnv;
SQueryRuntimeEnv *pRuntimeEnv;
void *pQueryHandle;
int32_t numOfBlocks;
int32_t numOfSkipped;
......@@ -346,15 +348,23 @@ typedef struct STableScanInfo {
SSDataBlock block;
int64_t elapsedTime;
SSDataBlock* (*exec)(void* param);
} STableScanInfo;
typedef struct SOperatorInfo {
char *name;
bool blockingOptr;
void *optInfo;
__operator_fn_t exec;
} SOperatorInfo;
SOperatorInfo optrList[5];
typedef struct SAggOperatorInfo {
SResultRowInfo *pResultRowInfo;
STableQueryInfo *pTableQueryInfo;
STableScanInfo *pTableScanInfo;
SOperatorInfo *prevOptr;
SQueryRuntimeEnv *pRuntimeEnv;
SSDataBlock* (*apply)(void* param);
} SAggOperatorInfo;
void freeParam(SQueryParam *param);
......
......@@ -1767,7 +1767,7 @@ static bool first_last_function_setup(SQLFunctionCtx *pCtx) {
// todo opt for null block
static void first_function(SQLFunctionCtx *pCtx) {
if (pCtx->order == TSDB_ORDER_DESC || pCtx->preAggVals.dataBlockLoaded == false) {
if (pCtx->order == TSDB_ORDER_DESC /*|| pCtx->preAggVals.dataBlockLoaded == false*/) {
return;
}
......@@ -1912,7 +1912,7 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) {
* least one data in this block that is not null.(TODO opt for this case)
*/
static void last_function(SQLFunctionCtx *pCtx) {
if (pCtx->order != pCtx->param[0].i64 || pCtx->preAggVals.dataBlockLoaded == false) {
if (pCtx->order != pCtx->param[0].i64/* || pCtx->preAggVals.dataBlockLoaded == false*/) {
return;
}
......
......@@ -159,7 +159,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
SDataStatis *pStatis, SExprInfo* pExprInfo);
static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, TSKEY *tsCol, SExprInfo* pExprInfo);
static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColInfo);
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
......@@ -174,7 +174,7 @@ static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win);
static STableIdInfo createTableIdInfo(SQuery* pQuery);
static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
static STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
static STableScanInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
static int32_t getNumOfScanTimes(SQuery* pQuery);
static SSDataBlock* createOutputBuf(SQuery* pQuery) {
......@@ -432,6 +432,18 @@ static bool hasTagValOutput(SQuery* pQuery) {
return false;
}
static bool hasNullRv(SColIndex* pColIndex, SDataStatis *pStatis) {
if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return false;
}
if (pStatis != NULL && pStatis->numOfNull == 0) {
return false;
}
return true;
}
/**
* @param pQuery
* @param col
......@@ -1178,27 +1190,28 @@ static void setInputSDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDat
SColumnInfoData *p = taosArrayGet(pSDataBlock->pDataBlock, pColIndex->colIndex);
assert(p->info.colId == pColIndex->colId);
pRuntimeEnv->pCtx[i].pInput = p->pData;
SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[i];
pCtx->pInput = p->pData;
uint32_t status = aAggs[pCtx->functionId].status;
if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) {
SColumnInfoData *tsInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
pCtx->ptsList = tsInfo->pData;
}
}
}
}
}
static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDataBlock) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery * pQuery = pRuntimeEnv->pQuery;
TSKEY *tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData *pColInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
tsCols = (TSKEY *)(pColInfo->pData);
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
setBlockStatisInfo(&pCtx[k], pSDataBlock, tsCols, &pQuery->pExpr1[k]);
}
for (int32_t k = 0; k < pRuntimeEnv->outputBuf->info.numOfCols; ++k) {
setBlockStatisInfo(&pCtx[k], pSDataBlock, &pQuery->pExpr1[k].base.colInfo);
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
pCtx[k].startTs = pQuery->window.skey;
......@@ -1836,28 +1849,24 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
}
void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, TSKEY *tsCol, SExprInfo* pExprInfo) {
SDataStatis *tpField = NULL;
pCtx->hasNull = hasNullValue(&pExprInfo->base.colInfo, pSDataBlock->pBlockStatis, &tpField);
void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex) {
SDataStatis *pStatis = NULL;
if (tpField != NULL) {
if (pSDataBlock->pBlockStatis != NULL && TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
pStatis = &pSDataBlock->pBlockStatis[pColIndex->colIndex];
pCtx->preAggVals.statis = *pStatis;
pCtx->preAggVals.isSet = true;
pCtx->preAggVals.statis = *tpField;
assert(pCtx->preAggVals.statis.numOfNull <= pSDataBlock->info.rows);
} else {
pCtx->preAggVals.isSet = false;
}
pCtx->preAggVals.dataBlockLoaded = (pSDataBlock->pDataBlock != NULL);
pCtx->hasNull = hasNullRv(pColIndex, pStatis);
// limit/offset query will affect this value
pCtx->size = pSDataBlock->info.rows;
uint32_t status = aAggs[pCtx->functionId].status;
if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) {
pCtx->ptsList = tsCol;
}
// set the statistics data for primary time stamp column
// if (pCtx->functionId == TSDB_FUNC_SPREAD &&colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
// pCtx->preAggVals.isSet = true;
......@@ -1869,8 +1878,8 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, TSKEY *t
void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
SDataStatis *pStatis, SExprInfo* pExprInfo) {
int32_t functionId = pExprInfo->base.functionId;
int32_t colId = pExprInfo->base.colInfo.colId;
// int32_t functionId = pExprInfo->base.functionId;
// int32_t colId = pExprInfo->base.colInfo.colId;
SDataStatis *tpField = NULL;
pCtx->hasNull = hasNullValue(&pExprInfo->base.colInfo, pStatis, &tpField);
......@@ -1889,23 +1898,22 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1;
// set the start position in current block
int32_t offset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos: (pQuery->pos - pCtx->size + 1);
if (inputData != NULL) {
pCtx->pInput = (char*)inputData + offset * pCtx->inputBytes;
}
uint32_t status = aAggs[functionId].status;
if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) {
pCtx->ptsList = tsCol + offset;
}
// int32_t offset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos: (pQuery->pos - pCtx->size + 1);
// if (inputData != NULL) {
// pCtx->pInput = (char*)inputData + offset * pCtx->inputBytes;
// }
//
// uint32_t status = aAggs[functionId].status;
// if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) {
// pCtx->ptsList = tsCol + offset;
// }
if (functionId == TSDB_FUNC_SPREAD) { // set the statistics data for primary time stamp column
if (colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pCtx->preAggVals.isSet = true;
pCtx->preAggVals.statis.min = pBlockInfo->window.skey;
pCtx->preAggVals.statis.max = pBlockInfo->window.ekey;
}
}
// set the statistics data for primary time stamp column
// if (functionId == TSDB_FUNC_SPREAD && colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
// pCtx->preAggVals.isSet = true;
// pCtx->preAggVals.statis.min = pBlockInfo->window.skey;
// pCtx->preAggVals.statis.max = pBlockInfo->window.ekey;
// }
}
// set the output buffer for the selectivity + tag query
......@@ -1954,6 +1962,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
SQuery *pQuery = pRuntimeEnv->pQuery;
pRuntimeEnv->prevGroupId = INT32_MIN;
pQuery->interBufSize = getOutputInterResultBufSize(pQuery);
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
......@@ -4821,14 +4831,15 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTsBuf = pTsBuf;
pRuntimeEnv->cur.vgroupIndex = -1;
pQuery->stableQuery = isSTableQuery;
pRuntimeEnv->prevGroupId = INT32_MIN;
pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr);
if (needReverseScan(pQuery)) {
pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1);
} else {
pRuntimeEnv->pi = createTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery));
pRuntimeEnv->pi = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery));
}
if (pTsBuf != NULL) {
......@@ -5798,11 +5809,6 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) {
continue;
}
// if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block
// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL);
// pTableScanInfo->numOfRows += pBlock->info.rows;
// }
return pBlock;
}
......@@ -5821,17 +5827,16 @@ static SSDataBlock* doTableScan(void* param) {
}
if (++pTableScanInfo->current >= pTableScanInfo->times) {
return NULL;
if (pTableScanInfo->reverseTimes <= 0) {
return NULL;
} else {
break;
}
}
// do prepare for the next round table scan operation
tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle);
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
pTableScanInfo->pQueryHandle =
tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pRuntimeEnv->qinfo, &pQuery->memRef);
if (pTableScanInfo->pQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond);
pRuntimeEnv->resultRowInfo.curIndex = 0;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
......@@ -5849,17 +5854,16 @@ static SSDataBlock* doTableScan(void* param) {
if (pTableScanInfo->reverseTimes > 0) {
setEnvBeforeReverseScan_rv(pRuntimeEnv);
tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle);
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
pTableScanInfo->pQueryHandle =
tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pRuntimeEnv->qinfo, &pQuery->memRef);
tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond);
qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
pRuntimeEnv->qinfo, cond.twindow.skey, cond.twindow.ekey);
pTableScanInfo->times = 1;
pTableScanInfo->current = 0;
pTableScanInfo->reverseTimes = 0;
pTableScanInfo->order = cond.order;
SSDataBlock* p = doScanTableImpl(pTableScanInfo);
if (p != NULL) {
......@@ -5870,18 +5874,25 @@ static SSDataBlock* doTableScan(void* param) {
return NULL;
}
static UNUSED_FUNC STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) {
static UNUSED_FUNC SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) {
assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->exec = doTableScan;
pInfo->times = repeatTime;
pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQuery->order.order;
pInfo->current = 0;
pInfo->pRuntimeEnv = pRuntimeEnv;
return pInfo;
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "SeqScanTableOp";
pOptr->blockingOptr = false;
pOptr->optInfo = pInfo;
pOptr->exec = doScanTable;
return pOptr;
}
static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
......@@ -5894,6 +5905,8 @@ static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQ
pInfo->reverseTimes = reverseTime;
pInfo->current = 0;
pInfo->order = pRuntimeEnv->pQuery->order.order;
pInfo->pRuntimeEnv = pRuntimeEnv;
return pInfo;
}
......@@ -5924,7 +5937,7 @@ static SSDataBlock* doAggOperator(void* param) {
break;
}
if (countId != getTableScanId(pTableScanInfo)) {
if (countId != getTableScanId(pTableScanInfo) && order == getTableScanOrder(pTableScanInfo)) {
prepareRepeatTableScan(pRuntimeEnv);
countId = getTableScanId(pTableScanInfo);
}
......@@ -5958,6 +5971,16 @@ static int32_t getNumOfScanTimes(SQuery* pQuery) {
return 1;
}
//void createBasicOperatorInfo() {
// optrList[0].name = "SeqScanTableOp";
// optrList[0].blockingOptr = false;
// optrList[0].exec = doTableScan;
//
// optrList[0].name = "SeqScanTableOp";
// optrList[0].blockingOptr = false;
// optrList[0].exec = doTableScan;
//}
static UNUSED_FUNC SAggOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
......
......@@ -287,6 +287,26 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa
return pTableCheckInfo;
}
static void resetCheckInfo(STsdbQueryHandle* pQueryHandle) {
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables >= 1);
// todo apply the lastkey of table check to avoid to load header file
for (int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, i);
pCheckInfo->lastKey = pQueryHandle->window.skey;
pCheckInfo->iter = tSkipListDestroyIter(pCheckInfo->iter);
pCheckInfo->iiter = tSkipListDestroyIter(pCheckInfo->iiter);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
assert(pCheckInfo->lastKey >= pQueryHandle->window.skey);
} else {
assert(pCheckInfo->lastKey <= pQueryHandle->window.skey);
}
}
}
static SArray* createCheckInfoFromCheckInfo(SArray* pTableCheckInfo, TSKEY skey) {
size_t si = taosArrayGetSize(pTableCheckInfo);
SArray* pNew = taosArrayInit(si, sizeof(STableCheckInfo));
......@@ -405,6 +425,34 @@ TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STable
return (TsdbQueryHandleT) pQueryHandle;
}
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) {
STsdbQueryHandle* pQueryHandle = queryHandle;
pQueryHandle->order = pCond->order;
pQueryHandle->window = pCond->twindow;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = -1;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = true;
pQueryHandle->activeIndex = 0; // current active table index
pQueryHandle->locateStart = false;
pQueryHandle->loadExternalRow = pCond->loadExternalRows;
if (ASCENDING_TRAVERSE(pCond->order)) {
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
} else {
assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey);
}
// allocate buffer in order to load data blocks from file
memset(pQueryHandle->statis, 0, sizeof(SDataStatis));
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
resetCheckInfo(pQueryHandle);
}
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) {
pCond->twindow = updateLastrowForEachGroup(groupList);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册