未验证 提交 bb04d43a 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #10939 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
...@@ -541,10 +541,20 @@ typedef struct SFillOperatorInfo { ...@@ -541,10 +541,20 @@ typedef struct SFillOperatorInfo {
bool multigroupResult; bool multigroupResult;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct SGroupKeys {
char *pData;
bool isNull;
int16_t type;
int32_t bytes;
}SGroupKeys;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SArray* pGroupCols; SArray* pGroupCols;
char* prevData; // previous group by value SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SAggSupporter aggSup; SAggSupporter aggSup;
} SGroupbyOperatorInfo; } SGroupbyOperatorInfo;
......
...@@ -1623,67 +1623,142 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1623,67 +1623,142 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); // updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
} }
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pBlock) { static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; SColumnDataAgg* pColAgg = NULL;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
// TODO multiple group by columns SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i);
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, 0);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) {
int16_t bytes = pColInfoData->info.bytes; pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
int16_t type = pColInfoData->info.type;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
//qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
return;
} }
STimeWindow w = TSWINDOW_INITIALIZER; bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);
int32_t num = 0; SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i);
for (int32_t j = 0; j < pBlock->info.rows; ++j) { if (pkey->isNull && isNull) {
if (colDataIsNull(pColInfoData, pBlock->info.rows, j, NULL)) { // TODO
continue; continue;
} }
char* val = colDataGetData(pColInfoData, j); if (isNull || pkey->isNull) {
return false;
}
// Compare with the previous row of this column, and do not set the output buffer again if they are identical. char* val = colDataGetData(pColInfoData, rowIndex);
if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes); if (IS_VAR_DATA_TYPE(pkey->type)) {
memcpy(pInfo->prevData, val, bytes); int32_t len = varDataLen(val);
num++; if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
continue; continue;
} else {
return false;
}
} else {
if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
return false;
}
}
} }
if (IS_VAR_DATA_TYPE(type)) { return true;
int32_t len = varDataLen(val); }
if(len == varDataLen(pInfo->prevData) && memcmp(varDataVal(pInfo->prevData), varDataVal(val), len) == 0) {
num++; static void keepGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
SColumnDataAgg* pColAgg = NULL;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) {
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
}
SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i);
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
pkey->isNull = true;
} else {
char* val = colDataGetData(pColInfoData, rowIndex);
if (IS_VAR_DATA_TYPE(pkey->type)) {
memcpy(pkey->pData, val, varDataTLen(val));
} else {
memcpy(pkey->pData, val, pkey->bytes);
}
}
}
}
static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVals) {
ASSERT(pKey != NULL);
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
char* isNull = (char*) pKey;
char* pStart = (char*) pKey + sizeof(int8_t) * numOfGroupCols;
for(int32_t i = 0; i < numOfGroupCols; ++i) {
SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
if (pkey->isNull) {
isNull[i] = 1;
continue; continue;
} }
isNull[i] = 0;
if (IS_VAR_DATA_TYPE(pkey->type)) {
varDataCopy(pStart, pkey->pData);
pStart += varDataTLen(pkey->pData);
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
} else { } else {
if (memcmp(pInfo->prevData, val, bytes) == 0) { memcpy(pStart, pkey->pData, pkey->bytes);
pStart += pkey->bytes;
}
}
*length = (pStart - (char*) pKey);
return 0;
}
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) {
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
SGroupbyOperatorInfo *pInfo = pOperator->info;
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
// if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
//qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
// return;
// }
int32_t len = 0;
STimeWindow w = TSWINDOW_INITIALIZER;
int32_t num = 0;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (!pInfo->isInit) {
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols);
pInfo->isInit = true;
num++; num++;
continue; continue;
} }
bool equal = groupKeyCompare(pInfo, pBlock, j, numOfGroupCols);
if (equal) {
num++;
continue;
} }
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, 0, /*int32_t ret = */generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0,
pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
doApplyFunctions(pInfo->binfo.pCtx, &w, j - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &w, j - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols);
num = 1; num = 1;
memcpy(pInfo->prevData, val, bytes);
} }
if (num > 0) { if (num > 0) {
char* val = ((char*)pColInfoData->pData) + bytes * (pBlock->info.rows - num); /*int32_t ret = */generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
memcpy(pInfo->prevData, val, bytes); int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0,
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, 0,
pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
...@@ -1691,8 +1766,6 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1691,8 +1766,6 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
doApplyFunctions(pInfo->binfo.pCtx, &w, pBlock->info.rows - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &w, pBlock->info.rows - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
} }
tfree(pInfo->prevData);
} }
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
...@@ -1783,20 +1856,11 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { ...@@ -1783,20 +1856,11 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupId, static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupId,
SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup) { SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup) {
int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset;
SResultRowInfo *pResultRowInfo = &binfo->resultRowInfo; SResultRowInfo *pResultRowInfo = &binfo->resultRowInfo;
SqlFunctionCtx *pCtx = binfo->pCtx; SqlFunctionCtx *pCtx = binfo->pCtx;
// not assign result buffer yet, add new result buffer, TODO remove it SResultRow *pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, groupId, (char *)pData, bytes, true, groupId,
char* d = pData; pTaskInfo, true, pAggSup);
int16_t len = bytes;
if (IS_VAR_DATA_TYPE(type)) {
d = varDataVal(pData);
len = varDataLen(pData);
}
int64_t tid = 0;
SResultRow *pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, groupId, (char *)pData, TSDB_KEYSIZE, true, groupId, pTaskInfo, true, pAggSup);
assert (pResultRow != NULL); assert (pResultRow != NULL);
setResultRowKey(pResultRow, pData, type); setResultRowKey(pResultRow, pData, type);
...@@ -6969,7 +7033,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou ...@@ -6969,7 +7033,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
doHashGroupbyAgg(pOperator, pInfo, pBlock); doHashGroupbyAgg(pOperator, pBlock);
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
...@@ -7281,7 +7345,9 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -7281,7 +7345,9 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param; SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
tfree(pInfo->prevData); tfree(pInfo->keyBuf);
taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroy(pInfo->pGroupColVals);
} }
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -7664,6 +7730,39 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun ...@@ -7664,6 +7730,39 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
return pOperator; return pOperator;
} }
static int32_t initGroupOptrInfo(SGroupbyOperatorInfo *pInfo, SArray* pGroupColList) {
pInfo->pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
if (pInfo->pGroupColVals == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
for(int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pGroupColList, i);
pInfo->groupKeyLen += pCol->bytes;
struct SGroupKeys key = {0};
key.bytes = pCol->bytes;
key.type = pCol->type;
key.isNull = false;
key.pData = calloc(1, pCol->bytes);
if (key.pData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pInfo->pGroupColVals, &key);
}
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
pInfo->keyBuf = calloc(1, pInfo->groupKeyLen + nullFlagSize);
if (pInfo->keyBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo));
...@@ -7674,9 +7773,13 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -7674,9 +7773,13 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pInfo->pGroupCols = pGroupColList; pInfo->pGroupCols = pGroupColList;
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
int32_t code = initGroupOptrInfo(pInfo, pGroupColList);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->name = "GroupbyAggOperator"; pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
...@@ -7688,7 +7791,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -7688,7 +7791,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator->getNextFn = hashGroupbyAggregate; pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupbyOperatorInfo; pOperator->closeFn = destroyGroupbyOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
_error: _error:
...@@ -8345,7 +8448,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa ...@@ -8345,7 +8448,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, .intervalUnit = 'a', .slidingUnit = 'a'}; SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit, .offset = pIntervalPhyNode->offset};
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo); return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo);
} }
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册