提交 d6af5559 编写于 作者: H Haojun Liao

[td-13039] refactor.

上级 afcc2219
......@@ -60,16 +60,10 @@ typedef struct SDataBlockInfo {
int16_t numOfCols;
int16_t hasVarCol;
union {int64_t uid; int64_t blockId;};
int64_t groupId; // no need to serialize
} SDataBlockInfo;
//typedef struct SConstantItem {
// SColumnInfo info;
// int32_t startRow; // run-length-encoding to save the space for multiple rows
// int32_t endRow;
// SVariant value;
//} SConstantItem;
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
// info.numOfCols = taosArrayGetSize(pDataBlock)
typedef struct SSDataBlock {
SColumnDataAgg *pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData>
......
......@@ -545,6 +545,7 @@ typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo;
int32_t colIndex;
char* prevData; // previous group by value
SGroupResInfo groupResInfo;
} SGroupbyOperatorInfo;
typedef struct SSessionAggOperatorInfo {
......@@ -649,8 +650,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
......@@ -673,7 +673,6 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput);
// SSDataBlock* doSLimit(void* param, bool* newgroup);
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p);
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
......@@ -697,11 +696,8 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win);
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win);
int32_t buildArithmeticExprFromMsg(SExprInfo* pArithExprInfo, void* pQueryMsg);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);
bool checkNeedToCompressQueryCol(SQInfo* pQInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo);
......
......@@ -336,27 +336,6 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
return res;
}
SSDataBlock* createOutputBuf_rv(SArray* pExprInfo, int32_t numOfRows) {
size_t numOfOutput = taosArrayGetSize(pExprInfo);
SSDataBlock *res = calloc(1, sizeof(SSDataBlock));
res->info.numOfCols = numOfOutput;
res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData idata = {{0}};
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
idata.info.type = pExpr->base.resSchema.type;
idata.info.bytes = pExpr->base.resSchema.bytes;
idata.info.colId = pExpr->base.resSchema.colId;
taosArrayPush(res->pDataBlock, &idata);
}
blockDataEnsureCapacity(res, numOfRows);
return res;
}
SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
int32_t numOfCols = LIST_LENGTH(pNode->pSlots);
......@@ -3327,41 +3306,6 @@ int32_t initResultRow(SResultRow *pResultRow) {
* +------------+--------------------------------------------+--------------------------------------------+
* offset[0] offset[1] offset[2]
*/
void setDefaultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid, int32_t stage) {
SqlFunctionCtx* pCtx = pInfo->pCtx;
SSDataBlock* pDataBlock = pInfo->pRes;
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset;
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
int64_t tid = 0;
pRuntimeEnv->keyBuf = realloc(pRuntimeEnv->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES);
SResultRow* pRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
/*
* set the output buffer information and intermediate buffer
* not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc.
*/
struct SResultRowEntryInfo* pEntry = getResultCell(pRow, i, rowCellInfoOffset);
cleanupResultRowEntry(pEntry);
pCtx[i].resultInfo = pEntry;
pCtx[i].pOutput = pData->pData;
pCtx[i].currentStage = stage;
assert(pCtx[i].pOutput != NULL);
// set the timestamp output buffer for top/bottom/diff query
int32_t fid = pCtx[i].functionId;
if (fid == FUNCTION_TOP || fid == FUNCTION_BOTTOM || fid == FUNCTION_DIFF || fid == FUNCTION_DERIVATIVE) {
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
}
}
initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols);
}
// TODO refactor: some function move away
void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, SExecTaskInfo* pTaskInfo) {
SqlFunctionCtx* pCtx = pInfo->pCtx;
......@@ -4024,81 +3968,6 @@ static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows,
colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) {
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
SSDataBlock* pRes = pRuntimeEnv->outputBuf;
int32_t *compSizes = NULL;
int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput;
if (compressed) {
compSizes = calloc(numOfCols, sizeof(int32_t));
}
if (pQueryAttr->pExpr2 == NULL) {
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
if (compressed) {
compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, data, compressed);
data += compSizes[col];
*compLen += compSizes[col];
compSizes[col] = htonl(compSizes[col]);
} else {
memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows);
data += pColRes->info.bytes * pRes->info.rows;
}
}
} else {
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
if (compressed) {
compSizes[col] = htonl(compressQueryColData(pColRes, numOfRows, data, compressed));
data += compSizes[col];
*compLen += compSizes[col];
compSizes[col] = htonl(compSizes[col]);
} else {
memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows);
data += pColRes->info.bytes * numOfRows;
}
}
}
if (compressed) {
memmove(data, (char *)compSizes, numOfCols * sizeof(int32_t));
data += numOfCols * sizeof(int32_t);
tfree(compSizes);
}
int32_t numOfTables = (int32_t) taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap);
*(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t);
int32_t total = 0;
STableIdInfo* item = taosHashIterate(pRuntimeEnv->pTableRetrieveTsMap, NULL);
while(item) {
STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(item->uid);
pDst->key = htobe64(item->key);
data += sizeof(STableIdInfo);
total++;
//qDebug("QInfo:0x%"PRIx64" set subscribe info, tid:%d, uid:%"PRIu64", skey:%"PRId64, pQInfo->qId, item->tid, item->uid, item->key);
item = taosHashIterate(pRuntimeEnv->pTableRetrieveTsMap, item);
}
//qDebug("QInfo:0x%"PRIx64" set %d subscribe info", pQInfo->qId, total);
// Check if query is completed or not for stable query or normal table query respectively.
if (Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED) && pRuntimeEnv->proot->status == OP_EXEC_DONE) {
// setTaskStatus(pOperator->pTaskInfo, QUERY_OVER);
}
}
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity, void** p) {
// for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
// SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i);
......@@ -4119,10 +3988,10 @@ void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType e
event.operatorType = operatorInfo->operatorType;
if (operatorInfo->pRuntimeEnv) {
SQInfo* pQInfo = operatorInfo->pRuntimeEnv->qinfo;
if (pQInfo->summary.queryProfEvents) {
taosArrayPush(pQInfo->summary.queryProfEvents, &event);
}
// SQInfo* pQInfo = operatorInfo->pRuntimeEnv->qinfo;
// if (pQInfo->summary.queryProfEvents) {
// taosArrayPush(pQInfo->summary.queryProfEvents, &event);
// }
}
}
......@@ -6787,10 +6656,6 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
SQInfo* pQInfo = pRuntimeEnv->qinfo;
pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st);
return pIntervalInfo->binfo.pRes;
}
......@@ -6883,8 +6748,8 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo *pOperator, bool* newgr
pOperator->status = OP_EXEC_DONE;
}
SQInfo* pQInfo = pRuntimeEnv->qinfo;
pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st);
// SQInfo* pQInfo = pRuntimeEnv->qinfo;
// pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st);
return pIntervalInfo->binfo.pRes;
}
......@@ -7075,19 +6940,16 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
}
SGroupbyOperatorInfo *pInfo = pOperator->info;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
if (pInfo->binfo.pRes->info.rows == 0/*|| !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) {
pOperator->status = OP_EXEC_DONE;
}
return pInfo->binfo.pRes;
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
int32_t order = TSDB_ORDER_ASC;
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
......@@ -7098,10 +6960,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQueryAttr->order.order);
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
if (pInfo->colIndex == -1) {
pInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock);
pInfo->colIndex = getGroupbyColumnIndex(NULL/*pInfo->pGroupbyExpr*/, pBlock);
}
doHashGroupbyAgg(pOperator, pInfo, pBlock);
......@@ -7109,21 +6971,20 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo);
// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows
// if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows
finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput);
} else {
updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
}
// } else {
// updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
// }
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo);
if (!pRuntimeEnv->pQueryAttr->stableQuery) {
sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
}
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
// if (!pRuntimeEnv->pQueryAttr->stableQuery) {
// sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
// }
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
if (pInfo->binfo.pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) {
pOperator->status = OP_EXEC_DONE;
}
......@@ -7802,19 +7663,16 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
return pOperator;
}
SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo));
pInfo->colIndex = -1; // group by column index
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
pQueryAttr->resultRowSize = (pQueryAttr->resultRowSize *
(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)));
// pQueryAttr->resultRowSize = (pQueryAttr->resultRowSize *
// (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
pInfo->binfo.pRes = pResBlock;
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -7825,11 +7683,11 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupbyOperatorInfo;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupbyOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
}
......@@ -8330,12 +8188,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, int32_t* numOfExprs) {
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
SDataType* pType = &pFuncNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
pType->precision, pFuncNode->node.aliasName);
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
pExp->pExpr->_function.pFunctNode = pFuncNode;
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName,
tListLen(pExp->pExpr->_function.functionName));
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName));
// TODO: value parameter needs to be handled
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
......@@ -8933,30 +8789,6 @@ int32_t checkForQueryBuf(size_t numOfTables) {
return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}
bool checkNeedToCompressQueryCol(SQInfo *pQInfo) {
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
SSDataBlock* pRes = pRuntimeEnv->outputBuf;
if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) <= 0) {
return false;
}
int32_t numOfRows = pQueryAttr->pExpr2 ? GET_NUM_OF_RESULTS(pRuntimeEnv) : pRes->info.rows;
int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput;
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
int32_t colSize = pColRes->info.bytes * numOfRows;
if (NEEDTO_COMPRESS_QUERY(colSize)) {
return true;
}
}
return false;
}
void releaseQueryBuf(size_t numOfTables) {
if (tsQueryBufferSizeBytes < 0) {
return;
......
......@@ -52,6 +52,8 @@ sql insert into ct3 values('2021-01-01 00:00:00.000', 10, 2.0, 3.0)
#===================================================================
print =============== query data from child table
sql select * from ct1
print ========> value is : $data00
if $rows != 4 then # after fix bug, modify 4 to 7
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册