提交 2b9e364f 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 1b230a98
......@@ -74,14 +74,14 @@ static bool allSubqueryDone(SSqlObj *pParentSql) {
SSubqueryState *subState = &pParentSql->subState;
//lock in caller
tscDebug("%p total subqueries: %d", pParentSql, subState->numOfSub);
for (int i = 0; i < subState->numOfSub; i++) {
if (0 == subState->states[i]) {
tscDebug("%p subquery:%p,%d is NOT finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
tscDebug("%p subquery:%p, index: %d NOT finished, abort query completion check", pParentSql, pParentSql->pSubs[i], i);
done = false;
break;
} else {
tscDebug("%p subquery:%p,%d is finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
tscDebug("%p subquery:%p, index: %d finished", pParentSql, pParentSql->pSubs[i], i);
}
}
......
......@@ -112,13 +112,11 @@ STSBuf* tsBufClone(STSBuf* pTSBuf);
STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id);
void tsBufFlush(STSBuf* pTSBuf);
void tsBufFlush(STSBuf* pTSBuf);
void tsBufResetPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, tVariant* tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf);
......
......@@ -144,7 +144,7 @@ static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, S
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
static bool hasMainOutput(SQuery *pQuery);
//static int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, STableQueryInfo *pTableQueryInfo);
static int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag, STableQueryInfo *pTableQueryInfo);
static void releaseQueryBuf(size_t numOfTables);
static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win);
......@@ -186,6 +186,9 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOp
static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size);
static void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
static bool isPointInterpoQuery(SQuery *pQuery);
static void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo);
static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
// setup the output buffer for each operator
static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) {
......@@ -574,7 +577,6 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow
// not assign result buffer yet, add new result buffer
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->pQuery->intermediateResultRowSize);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
......@@ -1201,7 +1203,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
// goto _end;
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t forwardStep = 0;
......@@ -1223,7 +1225,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, groupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0],
-1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
......@@ -1238,7 +1244,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset);
assert(ret == TSDB_CODE_SUCCESS);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
// window start key interpolation
......@@ -1258,7 +1266,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, groupId,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
break;
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
ekey = reviseWindowEkey(pQuery, &nextWin);
......@@ -1313,7 +1321,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
}
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
pInfo->binfo.pCtx[k].size = 1; // TODO refactor: extract from here
pInfo->binfo.pCtx[k].size = 1;
int32_t functionId = pInfo->binfo.pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pInfo->binfo.pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], j);
......@@ -1609,8 +1617,6 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
return NULL;
}
static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo);
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables) {
qDebug("QInfo:%p setup runtime env", pRuntimeEnv->qinfo);
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -1618,10 +1624,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->pQuery = pQuery;
pQuery->interBufSize = getOutputInterResultBufSize(pQuery);
calResultBufSize(pQuery, &pRuntimeEnv->resultInfo);
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pRuntimeEnv->keyBuf = malloc(pQuery->maxSrcColumnSize + sizeof(int64_t));
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
......@@ -2122,7 +2124,7 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
static bool doFilterOnBlockStatistics(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) {
static bool doFilterByBlockStatistics(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) {
SQuery* pQuery = pRuntimeEnv->pQuery;
if (pDataStatis == NULL || pQuery->numOfFilterCols == 0) {
......@@ -2390,7 +2392,43 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf
static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId);
static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes);
//TODO refactor
static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
SQLFunctionCtx* pCtx = pTableScanInfo->pCtx;
uint32_t status = BLK_DATA_NO_NEEDED;
int32_t numOfOutput = pTableScanInfo->numOfOutput;
for (int32_t i = 0; i < numOfOutput; ++i) {
int32_t functionId = pCtx[i].functionId;
int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId;
// group by + first/last should not apply the first/last block filter
status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
return status;
}
}
return status;
}
static void doSetFilterColumnInfo(SQuery* pQuery, SSDataBlock* pBlock) {
if (pQuery->numOfFilterCols > 0 && pQuery->pFilterInfo[0].pData != NULL) {
return;
}
// set the initial static data value filter expression
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, j);
if (pQuery->pFilterInfo[i].info.colId == pColInfo->info.colId) {
pQuery->pFilterInfo[i].pData = pColInfo->pData;
break;
}
}
}
}
int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
*status = BLK_DATA_NO_NEEDED;
......@@ -2399,14 +2437,15 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
SQuery* pQuery = pRuntimeEnv->pQuery;
int64_t groupId = pQuery->current->groupIndex;
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
SQInfo* pQInfo = pRuntimeEnv->qinfo;
SQueryCostInfo* pCost = &pQInfo->summary;
if (pRuntimeEnv->pTsBuf != NULL) {
*status = BLK_DATA_ALL_NEEDED;
(*status) = BLK_DATA_ALL_NEEDED;
if (pQuery->stableQuery) {
if (pQuery->stableQuery) { // todo refactor
SExprInfo* pExprInfo = &pTableScanInfo->pExpr[0];
int16_t tagId = (int16_t)pExprInfo->base.arg->argValue.i64;
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagId);
......@@ -2414,6 +2453,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
// compare tag first
tVariant t = {0};
doSetTagValueInParam(pQuery->current->pTable, tagId, &t, pColInfo->type, pColInfo->bytes);
setTimestampListJoinInfo(pRuntimeEnv, &t, pQuery->current);
STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf);
if (!tsBufIsValidElem(&elem) || (tsBufIsValidElem(&elem) && (tVariantCompare(&t, elem.tag) != 0))) {
......@@ -2423,58 +2463,35 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
}
}
if (pQuery->numOfFilterCols > 0) {
*status = BLK_DATA_ALL_NEEDED;
} else { // check if this data block is required to load
// Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block.
if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, &pBlock->info)) {
*status = BLK_DATA_ALL_NEEDED;
}
if ((*status) != BLK_DATA_ALL_NEEDED) {
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
SResultRow* pResult = NULL;
// Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block.
if (pQuery->numOfFilterCols > 0 || pQuery->groupbyColumn ||
(QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, &pBlock->info))) {
(*status) = BLK_DATA_ALL_NEEDED;
}
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
TSKEY k = QUERY_IS_ASC_QUERY(pQuery) ? pBlock->info.window.skey : pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
// todo handle error in set result for timewindow
}
}
// check if this data block is required to load
if ((*status) != BLK_DATA_ALL_NEEDED) {
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
SResultRow* pResult = NULL;
int32_t numOfOutput = pTableScanInfo->numOfOutput;
SQLFunctionCtx* pCtx = pTableScanInfo->pCtx;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
if (pQuery->groupbyColumn) {
(*status) = BLK_DATA_ALL_NEEDED;
} else {
for (int32_t i = 0; i < numOfOutput; ++i) {
int32_t functionId = pCtx[i].functionId;
int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId;
// group by + first/last should not apply the first/last block filter
if (functionId != TSDB_FUNC_FIRST_DST && functionId != TSDB_FUNC_LAST_DST) {
(*status) |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
break;
}
} else {
(*status) |= BLK_DATA_ALL_NEEDED;
break;
}
}
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
(*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
}
SDataBlockInfo* pBlockInfo = &pBlock->info;
if ((*status) == BLK_DATA_NO_NEEDED) {
qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo, pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows);
......@@ -2502,8 +2519,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockStatis[i].min),
(char*)&(pBlock->pBlockStatis[i].max));
if (!load) {
// current block has been discard due to filter applied
if (!load) { // current block has been discard due to filter applied
pCost->discardBlocks += 1;
qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo,
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
......@@ -2515,7 +2531,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
}
// current block has been discard due to filter applied
if (!doFilterOnBlockStatistics(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
if (!doFilterByBlockStatistics(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
pCost->discardBlocks += 1;
qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo, pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows);
......@@ -2530,23 +2546,10 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
return terrno;
}
if (pQuery->numOfFilterCols > 0 && pQuery->pFilterInfo[0].pData == NULL) {
// set the initial static data value filter expression
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, j);
if (pQuery->pFilterInfo[i].info.colId == pColInfo->info.colId) {
pQuery->pFilterInfo[i].pData = pColInfo->pData;
break;
}
}
}
}
doSetFilterColumnInfo(pQuery, pBlock);
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL) {
filterRowsInDataBlock(pRuntimeEnv, pQuery->pFilterInfo, pQuery->numOfFilterCols, pBlock, pRuntimeEnv->pTsBuf,
QUERY_IS_ASC_QUERY(pQuery));
ascQuery);
}
}
......@@ -2623,33 +2626,23 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes) {
tVariantDestroy(tag);
char* val = NULL;
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
char* val = tsdbGetTableName(pTable);
val = tsdbGetTableName(pTable);
assert(val != NULL);
tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), TSDB_DATA_TYPE_BINARY);
} else {
char* val = tsdbGetTableTagVal(pTable, tagColId, type, bytes);
if (val == NULL) {
tag->nType = TSDB_DATA_TYPE_NULL;
return;
}
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
if (isNull(val, type)) {
tag->nType = TSDB_DATA_TYPE_NULL;
return;
}
val = tsdbGetTableTagVal(pTable, tagColId, type, bytes);
}
tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), type);
} else {
if (isNull(val, type)) {
tag->nType = TSDB_DATA_TYPE_NULL;
return;
}
if (val == NULL || isNull(val, type)) {
tag->nType = TSDB_DATA_TYPE_NULL;
return;
}
tVariantCreateFromBinary(tag, val, bytes, type);
}
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), type);
} else {
tVariantCreateFromBinary(tag, val, bytes, type);
}
}
......@@ -2679,6 +2672,7 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes);
return;
} else {
// set tag value, by which the results are aggregated.
int32_t offset = 0;
......@@ -2705,58 +2699,15 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt
offset += pLocalExprInfo->bytes;
}
//todo : use index to avoid iterator all possible output columns
if (pQuery->stableQuery && pQuery->stabledev && (pRuntimeEnv->prevResult != NULL)) {
//todo : use index to avoid iterator all possible output columns
for(int32_t i = 0; i < numOfOutput; ++i) {
if(pExpr[i].base.functionId != TSDB_FUNC_STDDEV_DST) {
continue;
}
SSqlFuncMsg* pFuncMsg = &pExpr[i].base;
pCtx[i].param[0].arr = NULL;
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// TODO use hash to speedup this loop
int32_t numOfGroup = (int32_t) taosArrayGetSize(pRuntimeEnv->prevResult);
for(int32_t j = 0; j < numOfGroup; ++j) {
SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, j);
if (pQuery->tagLen == 0 || memcmp(p->tags, pRuntimeEnv->tagVal, pQuery->tagLen) == 0) {
int32_t numOfCols = (int32_t) taosArrayGetSize(p->pResult);
for(int32_t k = 0; k < numOfCols; ++k) {
SStddevInterResult* pres = taosArrayGet(p->pResult, k);
if (pres->colId == pFuncMsg->colInfo.colId) {
pCtx[i].param[0].arr = pres->pResult;
break;
}
}
}
}
}
setParamForStableStddev(pRuntimeEnv, pCtx, numOfOutput, pExprInfo);
}
}
// set the join tag for first column
SSqlFuncMsg* pFuncMsg = &pExprInfo->base;
if (pQuery->stableQuery &&
(pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) &&
(pRuntimeEnv->pTsBuf != NULL) &&
pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
assert(pFuncMsg->numOfParams == 1);
int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64;
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes);
int16_t tagType = pCtx[0].tag.nType;
if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pRuntimeEnv->qinfo,
pExprInfo->base.arg->argValue.i64, pCtx[0].tag.pz);
} else {
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pRuntimeEnv->qinfo,
pExprInfo->base.arg->argValue.i64, pCtx[0].tag.i64);
}
}
// set the tsBuf start position before check each data block
if (pRuntimeEnv->pTsBuf != NULL) {
setCtxTagForJoin(pRuntimeEnv, &pCtx[0], pExprInfo, pTable);
}
}
......@@ -3224,78 +3175,95 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
}
}
//int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
// SQuery* pQuery = pRuntimeEnv->pQuery;
//
// assert(pRuntimeEnv->pTsBuf != NULL);
//
// // both the master and supplement scan needs to set the correct ts comp start position
// tVariant* pTag = &pRuntimeEnv->pCtx[0].tag;
//
// if (pTableQueryInfo->cur.vgroupIndex == -1) {
// tVariantAssign(&pTableQueryInfo->tag, pTag);
//
// STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, &pTableQueryInfo->tag);
//
// // failed to find data with the specified tag value and vnodeId
// if (!tsBufIsValidElem(&elem)) {
// if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
// qError("QInfo:%p failed to find tag:%s in ts_comp", pRuntimeEnv->qinfo, pTag->pz);
// } else {
// qError("QInfo:%p failed to find tag:%" PRId64 " in ts_comp", pRuntimeEnv->qinfo, pTag->i64);
// }
//
// return false;
// }
//
// // keep the cursor info of current meter
// pTableQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
// if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
// qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
// } else {
// qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
// }
//
// } else {
// tsBufSetCursor(pRuntimeEnv->pTsBuf, &pTableQueryInfo->cur);
//
// if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
// qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
// } else {
// qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
// }
// }
//
// return 0;
//}
void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) {
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput,
SExprInfo* pExpr) {
SSqlFuncMsg* pFuncMsg = &pExprInfo->base;
if (pQuery->stableQuery && (pRuntimeEnv->pTsBuf != NULL) &&
(pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) &&
(pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX)) {
assert(pFuncMsg->numOfParams == 1);
int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64;
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
doSetTagValueInParam(pTable, tagColId, &pCtx->tag, pColInfo->type, pColInfo->bytes);
int16_t tagType = pCtx[0].tag.nType;
if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pRuntimeEnv->qinfo,
pExprInfo->base.arg->argValue.i64, pCtx[0].tag.pz);
} else {
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pRuntimeEnv->qinfo,
pExprInfo->base.arg->argValue.i64, pCtx[0].tag.i64);
}
}
}
int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag, STableQueryInfo *pTableQueryInfo) {
SQuery* pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->prevResult == NULL || pQuery->groupbyColumn) {
return TSDB_CODE_SUCCESS;
assert(pRuntimeEnv->pTsBuf != NULL);
// both the master and supplement scan needs to set the correct ts comp start position
if (pTableQueryInfo->cur.vgroupIndex == -1) {
tVariantAssign(&pTableQueryInfo->tag, pTag);
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, &pTableQueryInfo->tag);
// failed to find data with the specified tag value and vnodeId
if (!tsBufIsValidElem(&elem)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pRuntimeEnv->qinfo, pTag->pz);
} else {
qError("QInfo:%p failed to find tag:%" PRId64 " in ts_comp", pRuntimeEnv->qinfo, pTag->i64);
}
return -1;
}
// Keep the cursor info of current table
pTableQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
} else {
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
}
} else {
tsBufSetCursor(pRuntimeEnv->pTsBuf, &pTableQueryInfo->cur);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
} else {
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
}
}
return 0;
}
void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr) {
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t numOfExprs = pQuery->numOfOutput;
for(int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo = &(pExpr[i]);
if(pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) {
if (pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) {
continue;
}
SSqlFuncMsg* pFuncMsg = &pExprInfo->base;
pCtx[i].param[0].arr = NULL;
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// TODO use hash to speedup this loop
int32_t numOfGroup = (int32_t) taosArrayGetSize(pRuntimeEnv->prevResult);
for(int32_t j = 0; j < numOfGroup; ++j) {
SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, j);
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
for (int32_t j = 0; j < numOfGroup; ++j) {
SInterResult* p = taosArrayGet(pRuntimeEnv->prevResult, j);
if (pQuery->tagLen == 0 || memcmp(p->tags, pRuntimeEnv->tagVal, pQuery->tagLen) == 0) {
int32_t numOfCols = (int32_t) taosArrayGetSize(p->pResult);
for(int32_t k = 0; k < numOfCols; ++k) {
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
for (int32_t k = 0; k < numOfCols; ++k) {
SStddevInterResult* pres = taosArrayGet(p->pResult, k);
if (pres->colId == pFuncMsg->colInfo.colId) {
pCtx[i].param[0].arr = pres->pResult;
......@@ -3306,7 +3274,6 @@ int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32
}
}
return 0;
}
/*
......@@ -3895,15 +3862,16 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pQuery->vgId = vgId;
pQuery->stableQuery = isSTableQuery;
pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr);
pQuery->interBufSize = getOutputInterResultBufSize(pQuery);
pRuntimeEnv->groupResInfo.totalGroup = (int32_t) (isSTableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0);
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTsBuf = pTsBuf;
pRuntimeEnv->cur.vgroupIndex = -1;
setResultBufSize(pQuery, &pRuntimeEnv->resultInfo);
if (onlyQueryTags(pQuery)) {
// TODO refactor.
pRuntimeEnv->resultInfo.capacity = 4096;
pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput);
} else if (pQuery->queryBlockDist) {
......@@ -5888,23 +5856,28 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
}
}
static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo) {
const int32_t RESULT_MSG_MIN_SIZE = 1024 * (1024 + 512); // bytes
const int32_t RESULT_MSG_MIN_ROWS = 8192;
const float RESULT_THRESHOLD_RATIO = 0.85f;
void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo) {
const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512);
// the minimum number of rows for projection query
const int32_t MIN_ROWS_FOR_PRJ_QUERY = 8192;
const int32_t DEFAULT_MIN_ROWS = 4096;
const float THRESHOLD_RATIO = 0.85f;
if (isProjQuery(pQuery)) {
int32_t numOfRes = RESULT_MSG_MIN_SIZE / pQuery->resultRowSize;
if (numOfRes < RESULT_MSG_MIN_ROWS) {
numOfRes = RESULT_MSG_MIN_ROWS;
int32_t numOfRes = DEFAULT_RESULT_MSG_SIZE / pQuery->resultRowSize;
if (numOfRes < MIN_ROWS_FOR_PRJ_QUERY) {
numOfRes = MIN_ROWS_FOR_PRJ_QUERY;
}
pResultInfo->capacity = numOfRes;
pResultInfo->threshold = (int32_t)(numOfRes * RESULT_THRESHOLD_RATIO);
} else { // in case of non-prj query, a smaller output buffer will be used.
pResultInfo->capacity = 4096;
pResultInfo->threshold = (int32_t)(pResultInfo->capacity * RESULT_THRESHOLD_RATIO);
pResultInfo->capacity = DEFAULT_MIN_ROWS;
}
pResultInfo->threshold = (int32_t)(pResultInfo->capacity * THRESHOLD_RATIO);
pResultInfo->total = 0;
}
SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册