提交 6863999e 编写于 作者: H Haojun Liao

[td-14493] support having in group by

上级 33bcee62
...@@ -172,9 +172,9 @@ tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo ...@@ -172,9 +172,9 @@ tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef); tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* pReader, STableBlockDistInfo* pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT* pTsdbReadHandle); bool isTsdbCacheLastRow(tsdbReaderT* pReader);
/** /**
* *
......
...@@ -3046,8 +3046,8 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) { ...@@ -3046,8 +3046,8 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) {
// return code; // return code;
//} //}
bool isTsdbCacheLastRow(tsdbReaderT* pTsdbReadHandle) { bool isTsdbCacheLastRow(tsdbReaderT* pReader) {
return ((STsdbReadHandle *)pTsdbReadHandle)->cachelastrow > TSDB_CACHED_TYPE_NONE; return ((STsdbReadHandle *)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE;
} }
int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList) { int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList) {
......
...@@ -551,6 +551,7 @@ typedef struct SGroupbyOperatorInfo { ...@@ -551,6 +551,7 @@ typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SArray* pGroupCols; SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys> SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition;
bool isInit; // denote if current val is initialized or not bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width int32_t groupKeyLen; // total group by column width
...@@ -630,9 +631,10 @@ typedef struct SDistinctOperatorInfo { ...@@ -630,9 +631,10 @@ typedef struct SDistinctOperatorInfo {
SHashObj* pSet; SHashObj* pSet;
SSDataBlock* pRes; SSDataBlock* pRes;
bool recordNullVal; // has already record the null value, no need to try again bool recordNullVal; // has already record the null value, no need to try again
int64_t threshold; // todo remove it // int64_t threshold; // todo remove it
int64_t outputCapacity;// todo remove it // int64_t outputCapacity;// todo remove it
int32_t totalBytes; // todo remove it // int32_t totalBytes; // todo remove it
SResultInfo resInfo;
char* buf; char* buf;
SArray* pDistinctDataInfo; SArray* pDistinctDataInfo;
} SDistinctOperatorInfo; } SDistinctOperatorInfo;
...@@ -651,6 +653,8 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); ...@@ -651,6 +653,8 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows,
char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs,
uint64_t* total, SArray* pColList); uint64_t* total, SArray* pColList);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
...@@ -667,7 +671,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -667,7 +671,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo);
......
...@@ -51,8 +51,6 @@ ...@@ -51,8 +51,6 @@
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#define MULTI_KEY_DELIM "-"
enum { enum {
TS_JOIN_TS_EQUAL = 0, TS_JOIN_TS_EQUAL = 0,
TS_JOIN_TS_NOT_EQUALS = 1, TS_JOIN_TS_NOT_EQUALS = 1,
...@@ -221,12 +219,13 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput); ...@@ -221,12 +219,13 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator); static void destroyOperatorInfo(SOperatorInfo* pOperator);
static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput); static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput);
static void doSetOperatorCompleted(SOperatorInfo* pOperator) { void doSetOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
if (pOperator->pTaskInfo != NULL) { if (pOperator->pTaskInfo != NULL) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
} }
} }
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
...@@ -1245,11 +1244,7 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction ...@@ -1245,11 +1244,7 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
} }
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
size_t num = 0; size_t num = (pPseudoList != NULL)? taosArrayGetSize(pPseudoList):0;
if (pPseudoList != NULL) {
num = taosArrayGetSize(pPseudoList);
}
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i); pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
} }
...@@ -1291,8 +1286,10 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData ...@@ -1291,8 +1286,10 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData
pCtx[k].pOutput = taosArrayGet(pResult->pDataBlock, k); pCtx[k].pOutput = taosArrayGet(pResult->pDataBlock, k);
pCtx[k].offset = pResult->info.rows; // set the start offset pCtx[k].offset = pResult->info.rows; // set the start offset
int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); if (taosArrayGetSize(pPseudoList) > 0) {
pCtx[k].pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
pCtx[k].pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
}
int32_t numOfRows = pCtx[k].fpSet.process(&pCtx[k]); int32_t numOfRows = pCtx[k].fpSet.process(&pCtx[k]);
pResult->info.rows += numOfRows; pResult->info.rows += numOfRows;
...@@ -3272,6 +3269,44 @@ void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, ...@@ -3272,6 +3269,44 @@ void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx,
} }
} }
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
if (pFilterNode == NULL) {
return;
}
SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1);
int8_t* rowRes = NULL;
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
SSDataBlock* px = createOneDataBlock(pBlock);
blockDataEnsureCapacity(px, pBlock->info.rows);
int32_t numOfRow = 0;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
numOfRow = 0;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (rowRes[j] == 0) {
continue;
}
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
numOfRow += 1;
}
*pSrc = *pDst;
}
pBlock->info.rows = numOfRow;
}
void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo) { void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo) {
// for simple group by query without interval, all the tables belong to one group result. // for simple group by query without interval, all the tables belong to one group result.
int64_t uid = 0; int64_t uid = 0;
...@@ -6206,14 +6241,6 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -6206,14 +6241,6 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
} }
static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param;
taosHashCleanup(pInfo->pSet);
taosMemoryFreeClear(pInfo->buf);
taosArrayDestroy(pInfo->pDistinctDataInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
SExchangeInfo* pExInfo = (SExchangeInfo*)param; SExchangeInfo* pExInfo = (SExchangeInfo*)param;
taosArrayDestroy(pExInfo->pSources); taosArrayDestroy(pExInfo->pSources);
...@@ -6785,146 +6812,7 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo ...@@ -6785,146 +6812,7 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo
return pOperator; return pOperator;
} }
static bool initMultiDistinctInfo(SDistinctOperatorInfo* pInfo, SOperatorInfo* pOperator, SSDataBlock* pBlock) {
if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) {
// distinct info already inited
return true;
}
for (int i = 0; i < pOperator->numOfOutput; i++) {
// pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
}
for (int i = 0; i < pOperator->numOfOutput; i++) {
int numOfBlock = (int)(taosArrayGetSize(pBlock->pDataBlock));
assert(i < numOfBlock);
for (int j = 0; j < numOfBlock; j++) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j);
if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resSchema.colId) {
SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes};
taosArrayInsert(pInfo->pDistinctDataInfo, i, &item);
}
}
}
pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput);
pInfo->buf = taosMemoryCalloc(1, pInfo->totalBytes);
return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false;
}
static void buildMultiDistinctKey(SDistinctOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowId) {
char* p = pInfo->buf;
memset(p, 0, pInfo->totalBytes);
for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) {
SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo*)taosArrayGet(pInfo->pDistinctDataInfo, i);
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index);
char* val = ((char*)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId;
if (isNull(val, pDistDataInfo->type)) {
p += pDistDataInfo->bytes;
continue;
}
if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) {
memcpy(p, varDataVal(val), varDataLen(val));
p += varDataLen(val);
} else {
memcpy(p, val, pDistDataInfo->bytes);
p += pDistDataInfo->bytes;
}
memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM));
p += strlen(MULTI_KEY_DELIM);
}
}
static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SDistinctOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes;
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pDownstream->getNextFn(pDownstream, newgroup);
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
break;
}
if (!initMultiDistinctInfo(pInfo, pOperator, pBlock)) {
doSetOperatorCompleted(pOperator);
break;
}
// ensure result output buf
if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) {
int32_t newSize = pRes->info.rows + pBlock->info.rows;
for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) {
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i);
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i);
char* tmp = taosMemoryRealloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes);
if (tmp == NULL) {
return NULL;
} else {
pResultColInfoData->pData = tmp;
}
}
pInfo->outputCapacity = newSize;
}
for (int32_t i = 0; i < pBlock->info.rows; i++) {
buildMultiDistinctKey(pInfo, pBlock, i);
if (taosHashGet(pInfo->pSet, pInfo->buf, pInfo->totalBytes) == NULL) {
int32_t dummy;
taosHashPut(pInfo->pSet, pInfo->buf, pInfo->totalBytes, &dummy, sizeof(dummy));
for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) {
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); // src
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist
char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i;
char* start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows;
memcpy(start, val, pDistDataInfo->bytes);
}
pRes->info.rows += 1;
}
}
if (pRes->info.rows >= pInfo->threshold) {
break;
}
}
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
}
SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SDistinctOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDistinctOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pOperator->resultInfo.capacity = 4096; // todo extract function.
pInfo->totalBytes = 0;
pInfo->buf = NULL;
pInfo->pDistinctDataInfo = taosArrayInit(numOfCols, sizeof(SDistinctDataInfo));
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pOperator->name = "DistinctOperator";
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = DISTINCT;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->getNextFn = hashDistinct;
pOperator->closeFn = destroyDistinctOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
}
static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) { static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) {
int32_t j = 0; int32_t j = 0;
...@@ -7147,7 +7035,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -7147,7 +7035,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t numOfCols = 0; int32_t numOfCols = 0;
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo); pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
...@@ -7157,8 +7044,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -7157,8 +7044,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
queryId, taskId);
SArray* tableIdList = extractTableIdList(pTableGroupInfo); SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
...@@ -7215,7 +7101,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -7215,7 +7101,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (pAggNode->pGroupKeys != NULL) { if (pAggNode->pGroupKeys != NULL) {
SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL);
} else { } else {
return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
} }
...@@ -7306,7 +7192,7 @@ static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STa ...@@ -7306,7 +7192,7 @@ static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STa
continue; continue;
} }
cond.colList[j].type = pColNode->node.resType.type; cond.colList[j].type = pColNode->node.resType.type;
cond.colList[j].bytes = pColNode->node.resType.bytes; cond.colList[j].bytes = pColNode->node.resType.bytes;
cond.colList[j].colId = pColNode->colId; cond.colList[j].colId = pColNode->colId;
j += 1; j += 1;
......
...@@ -245,13 +245,14 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou ...@@ -245,13 +245,14 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
} }
SGroupbyOperatorInfo* pInfo = pOperator->info; SGroupbyOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.rowCellInfoOffset); if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pInfo->binfo.pRes; return pRes;
} }
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
...@@ -283,18 +284,29 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou ...@@ -283,18 +284,29 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
// pInfo->binfo.rowCellInfoOffset); // pInfo->binfo.rowCellInfoOffset);
// } // }
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); blockDataEnsureCapacity(pRes, pInfo->binfo.capacity);
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
pInfo->binfo.rowCellInfoOffset); while(1) {
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity,
pOperator->status = OP_EXEC_DONE; pInfo->binfo.rowCellInfoOffset);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
if (!hasRemain) {
pOperator->status = OP_EXEC_DONE;
break;
}
if (pRes->info.rows > 0) {
break;
}
} }
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo) { const STableGroupInfo* pTableGroupInfo) {
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
...@@ -303,6 +315,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -303,6 +315,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
} }
pInfo->pGroupCols = pGroupColList; pInfo->pGroupCols = pGroupColList;
pInfo->pCondition = pCondition;
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
...@@ -329,4 +342,166 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -329,4 +342,166 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
return NULL; return NULL;
} }
\ No newline at end of file
#define MULTI_KEY_DELIM "-"
static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param;
taosHashCleanup(pInfo->pSet);
taosMemoryFreeClear(pInfo->buf);
taosArrayDestroy(pInfo->pDistinctDataInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
static void buildMultiDistinctKey(SDistinctOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowId) {
char* p = pInfo->buf;
// memset(p, 0, pInfo->totalBytes);
for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) {
SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo*)taosArrayGet(pInfo->pDistinctDataInfo, i);
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index);
char* val = ((char*)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId;
if (isNull(val, pDistDataInfo->type)) {
p += pDistDataInfo->bytes;
continue;
}
if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) {
memcpy(p, varDataVal(val), varDataLen(val));
p += varDataLen(val);
} else {
memcpy(p, val, pDistDataInfo->bytes);
p += pDistDataInfo->bytes;
}
memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM));
p += strlen(MULTI_KEY_DELIM);
}
}
static bool initMultiDistinctInfo(SDistinctOperatorInfo* pInfo, SOperatorInfo* pOperator) {
for (int i = 0; i < pOperator->numOfOutput; i++) {
// pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
}
#if 0
for (int i = 0; i < pOperator->numOfOutput; i++) {
int numOfCols = (int)(taosArrayGetSize(pBlock->pDataBlock));
assert(i < numOfCols);
for (int j = 0; j < numOfCols; j++) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j);
if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resSchema.colId) {
SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes};
taosArrayInsert(pInfo->pDistinctDataInfo, i, &item);
}
}
}
#endif
// pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput);
// pInfo->buf = taosMemoryCalloc(1, pInfo->totalBytes);
return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false;
}
static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SDistinctOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes;
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pDownstream->getNextFn(pDownstream, newgroup);
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
break;
}
// ensure result output buf
if (pRes->info.rows + pBlock->info.rows > pInfo->resInfo.capacity) {
int32_t newSize = pRes->info.rows + pBlock->info.rows;
for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) {
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i);
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i);
// char* tmp = taosMemoryRealloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes);
// if (tmp == NULL) {
// return NULL;
// } else {
// pResultColInfoData->pData = tmp;
// }
}
pInfo->resInfo.capacity = newSize;
}
for (int32_t i = 0; i < pBlock->info.rows; i++) {
buildMultiDistinctKey(pInfo, pBlock, i);
if (taosHashGet(pInfo->pSet, pInfo->buf, 0) == NULL) {
taosHashPut(pInfo->pSet, pInfo->buf, 0, NULL, 0);
for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) {
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); // src
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist
char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i;
char* start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows;
memcpy(start, val, pDistDataInfo->bytes);
}
pRes->info.rows += 1;
}
}
if (pRes->info.rows >= pInfo->resInfo.threshold) {
break;
}
}
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
}
SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SDistinctOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDistinctOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pOperator->resultInfo.capacity = 4096; // todo extract function.
// pInfo->totalBytes = 0;
pInfo->buf = NULL;
pInfo->pDistinctDataInfo = taosArrayInit(numOfCols, sizeof(SDistinctDataInfo));
initMultiDistinctInfo(pInfo, pOperator);
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pOperator->name = "DistinctOperator";
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = DISTINCT;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->getNextFn = hashDistinct;
pOperator->closeFn = destroyDistinctOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
return NULL;
}
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SWITCH_ORDER(pCtx[i].order); SWITCH_ORDER(pCtx[i].order);
...@@ -91,39 +92,7 @@ int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, ...@@ -91,39 +92,7 @@ int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo,
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p);
} }
if (pTableScanInfo->pFilterNode != NULL) { doFilter(pTableScanInfo->pFilterNode, pBlock);
SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode((SNode*)pTableScanInfo->pFilterNode, &filter, 0);
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1);
int8_t* rowRes = NULL;
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
SSDataBlock* px = createOneDataBlock(pBlock);
blockDataEnsureCapacity(px, pBlock->info.rows);
int32_t numOfRow = 0;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
numOfRow = 0;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (rowRes[j] == 0) {
continue;
}
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
numOfRow += 1;
}
*pSrc = *pDst;
}
pBlock->info.rows = numOfRow;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -37,7 +37,7 @@ typedef struct SHNode { ...@@ -37,7 +37,7 @@ typedef struct SHNode {
char data[]; char data[];
} SHNode; } SHNode;
typedef struct SSHashObj { struct SSHashObj {
SHNode **hashList; SHNode **hashList;
size_t capacity; // number of slots size_t capacity; // number of slots
int64_t size; // number of elements in hash table int64_t size; // number of elements in hash table
...@@ -45,7 +45,7 @@ typedef struct SSHashObj { ...@@ -45,7 +45,7 @@ typedef struct SSHashObj {
_equal_fn_t equalFp; // equal function _equal_fn_t equalFp; // equal function
int32_t keyLen; int32_t keyLen;
int32_t dataLen; int32_t dataLen;
} SSHashObj; };
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
int32_t len = MIN(length, HASH_MAX_CAPACITY); int32_t len = MIN(length, HASH_MAX_CAPACITY);
...@@ -107,7 +107,7 @@ static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pDat ...@@ -107,7 +107,7 @@ static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pDat
return pNewNode; return pNewNode;
} }
void taosHashTableResize(SSHashObj *pHashObj) { static void taosHashTableResize(SSHashObj *pHashObj) {
if (!HASH_NEED_RESIZE(pHashObj)) { if (!HASH_NEED_RESIZE(pHashObj)) {
return; return;
} }
......
...@@ -917,7 +917,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { ...@@ -917,7 +917,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
colDataAppendInt32(pOutput, pos, &delta); colDataAppendInt32(pOutput, pos, &delta);
} }
if (tsList != NULL) { if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]); colDataAppendInt64(pTsOutput, pos, &tsList[i]);
} }
} }
......
...@@ -311,10 +311,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo ...@@ -311,10 +311,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
} }
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
return -1;
}
// need the resize process, write lock applied // need the resize process, write lock applied
if (HASH_NEED_RESIZE(pHashObj)) { if (HASH_NEED_RESIZE(pHashObj)) {
...@@ -355,6 +351,11 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo ...@@ -355,6 +351,11 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
if (pNode == NULL) { if (pNode == NULL) {
// no data in hash table with the specified key, add it into hash table // no data in hash table with the specified key, add it into hash table
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
return -1;
}
pushfrontNodeInEntryList(pe, pNewNode); pushfrontNodeInEntryList(pe, pNewNode);
assert(pe->next != NULL); assert(pe->next != NULL);
...@@ -368,9 +369,12 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo ...@@ -368,9 +369,12 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
} else { } else {
// not support the update operation, return error // not support the update operation, return error
if (pHashObj->enableUpdate) { if (pHashObj->enableUpdate) {
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
return -1;
}
doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode); doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode);
} else {
FREE_HASH_NODE(pNewNode);
} }
taosHashEntryWUnlock(pHashObj, pe); taosHashEntryWUnlock(pHashObj, pe);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册