提交 c84afedc 编写于 作者: H Haojun Liao

[td-14493] refactor.

上级 f4047aa1
......@@ -625,24 +625,6 @@ typedef struct SSortOperatorInfo {
uint64_t totalElapsed; // total elapsed time
} SSortOperatorInfo;
typedef struct SDistinctDataInfo {
int32_t index;
int32_t type;
int32_t bytes;
} SDistinctDataInfo;
typedef struct SDistinctOperatorInfo {
SHashObj* pSet;
SSDataBlock* pRes;
bool recordNullVal; // has already record the null value, no need to try again
// int64_t threshold; // todo remove it
// int64_t outputCapacity;// todo remove it
// int32_t totalBytes; // todo remove it
SResultInfo resInfo;
char* buf;
SArray* pDistinctDataInfo;
} SDistinctOperatorInfo;
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
void operatorDummyCloseFn(void* param, int32_t numOfCols);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
......@@ -682,8 +664,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
......@@ -705,6 +689,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput);
#endif
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
......
......@@ -1610,7 +1610,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false);
updateTimeWindowInfo(&pInfo->timeWindowData, &win, true);
updateTimeWindowInfo(&pInfo->timeWindowData, &nextWin, true);
doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
......@@ -3276,6 +3276,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
SFilterInfo* filter = NULL;
// todo move to the initialization function
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
......
......@@ -131,7 +131,7 @@ static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock,
}
}
static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVals) {
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
ASSERT(pKey != NULL);
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
......@@ -155,8 +155,7 @@ static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVa
}
}
*length = (pStart - (char*)pKey);
return 0;
return (int32_t) (pStart - (char*)pKey);
}
// assign the group keys or user input constant values if required
......@@ -217,7 +216,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
continue;
}
/*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
......@@ -233,7 +232,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
if (num > 0) {
/*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret =
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
......@@ -351,164 +350,40 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
return NULL;
}
#define MULTI_KEY_DELIM "-"
static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param;
taosHashCleanup(pInfo->pSet);
taosMemoryFreeClear(pInfo->buf);
taosArrayDestroy(pInfo->pDistinctDataInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
static void buildMultiDistinctKey(SDistinctOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowId) {
char* p = pInfo->buf;
// memset(p, 0, pInfo->totalBytes);
for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) {
SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo*)taosArrayGet(pInfo->pDistinctDataInfo, i);
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index);
char* val = ((char*)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId;
if (isNull(val, pDistDataInfo->type)) {
p += pDistDataInfo->bytes;
continue;
}
if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) {
memcpy(p, varDataVal(val), varDataLen(val));
p += varDataLen(val);
} else {
memcpy(p, val, pDistDataInfo->bytes);
p += pDistDataInfo->bytes;
}
memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM));
p += strlen(MULTI_KEY_DELIM);
}
}
static bool initMultiDistinctInfo(SDistinctOperatorInfo* pInfo, SOperatorInfo* pOperator) {
for (int i = 0; i < pOperator->numOfOutput; i++) {
// pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
}
#if 0
for (int i = 0; i < pOperator->numOfOutput; i++) {
int numOfCols = (int)(taosArrayGetSize(pBlock->pDataBlock));
assert(i < numOfCols);
for (int j = 0; j < numOfCols; j++) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j);
if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resSchema.colId) {
SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes};
taosArrayInsert(pInfo->pDistinctDataInfo, i, &item);
}
}
}
#endif
// pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput);
// pInfo->buf = taosMemoryCalloc(1, pInfo->totalBytes);
return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false;
}
static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SDistinctOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes;
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pDownstream->getNextFn(pDownstream, newgroup);
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
break;
}
// ensure result output buf
if (pRes->info.rows + pBlock->info.rows > pInfo->resInfo.capacity) {
int32_t newSize = pRes->info.rows + pBlock->info.rows;
for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) {
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i);
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i);
// char* tmp = taosMemoryRealloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes);
// if (tmp == NULL) {
// return NULL;
// } else {
// pResultColInfoData->pData = tmp;
// }
}
pInfo->resInfo.capacity = newSize;
}
for (int32_t i = 0; i < pBlock->info.rows; i++) {
buildMultiDistinctKey(pInfo, pBlock, i);
if (taosHashGet(pInfo->pSet, pInfo->buf, 0) == NULL) {
taosHashPut(pInfo->pSet, pInfo->buf, 0, NULL, 0);
for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) {
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); // src
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist
char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i;
char* start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows;
memcpy(start, val, pDistDataInfo->bytes);
}
pRes->info.rows += 1;
}
}
if (pRes->info.rows >= pInfo->resInfo.threshold) {
break;
}
}
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
}
SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SDistinctOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDistinctOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pOperator->resultInfo.capacity = 4096; // todo extract function.
// pInfo->totalBytes = 0;
pInfo->buf = NULL;
pInfo->pGroupCols = pGroupColList;
pInfo->pCondition = pCondition;
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
pInfo->pDistinctDataInfo = taosArrayInit(numOfCols, sizeof(SDistinctDataInfo));
initMultiDistinctInfo(pInfo, pOperator);
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
int32_t code = initGroupOptrInfo(pInfo, pGroupColList);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->name = "DistinctOperator";
pOperator->name = "PartitionByOperator";
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = DISTINCT;
pOperator->pExpr = pExpr;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->getNextFn = hashDistinct;
pOperator->closeFn = destroyDistinctOperatorInfo;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupbyOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册