提交 6f11fd21 编写于 作者: H Haojun Liao

[td-13039] support group by multi-columns.

上级 2748f84c
......@@ -541,10 +541,20 @@ typedef struct SFillOperatorInfo {
bool multigroupResult;
} SFillOperatorInfo;
typedef struct SGroupKeys {
char *pData;
bool isNull;
int16_t type;
int32_t bytes;
}SGroupKeys;
typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo;
SArray* pGroupCols;
char* prevData; // previous group by value
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
SAggSupporter aggSup;
} SGroupbyOperatorInfo;
......
......@@ -1623,67 +1623,142 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
}
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pBlock) {
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
SColumnDataAgg* pColAgg = NULL;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) {
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
}
// TODO multiple group by columns
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, 0);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
//qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
return;
SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i);
if (pkey->isNull && isNull) {
continue;
}
if (isNull || pkey->isNull) {
return false;
}
char* val = colDataGetData(pColInfoData, rowIndex);
if (IS_VAR_DATA_TYPE(pkey->type)) {
int32_t len = varDataLen(val);
if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
continue;
} else {
return false;
}
} else {
if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
return false;
}
}
}
STimeWindow w = TSWINDOW_INITIALIZER;
return true;
}
int32_t num = 0;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (colDataIsNull(pColInfoData, pBlock->info.rows, j, NULL)) { // TODO
static void keepGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
SColumnDataAgg* pColAgg = NULL;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) {
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
}
SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i);
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
pkey->isNull = true;
} else {
char* val = colDataGetData(pColInfoData, rowIndex);
if (IS_VAR_DATA_TYPE(pkey->type)) {
memcpy(pkey->pData, val, varDataTLen(val));
} else {
memcpy(pkey->pData, val, pkey->bytes);
}
}
}
}
static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVals) {
ASSERT(pKey != NULL);
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
char* isNull = (char*) pKey;
char* pStart = (char*) pKey + sizeof(int8_t) * numOfGroupCols;
for(int32_t i = 0; i < numOfGroupCols; ++i) {
SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
if (pkey->isNull) {
isNull[i] = 1;
continue;
}
char* val = colDataGetData(pColInfoData, j);
isNull[i] = 0;
if (IS_VAR_DATA_TYPE(pkey->type)) {
varDataCopy(pStart, pkey->pData);
pStart += varDataTLen(pkey->pData);
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
} else {
memcpy(pStart, pkey->pData, pkey->bytes);
pStart += pkey->bytes;
}
}
*length = (pStart - (char*) pKey);
return 0;
}
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) {
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
SGroupbyOperatorInfo *pInfo = pOperator->info;
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
// if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
//qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
// return;
// }
int32_t len = 0;
STimeWindow w = TSWINDOW_INITIALIZER;
int32_t num = 0;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes);
memcpy(pInfo->prevData, val, bytes);
if (!pInfo->isInit) {
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols);
pInfo->isInit = true;
num++;
continue;
}
if (IS_VAR_DATA_TYPE(type)) {
int32_t len = varDataLen(val);
if(len == varDataLen(pInfo->prevData) && memcmp(varDataVal(pInfo->prevData), varDataVal(val), len) == 0) {
num++;
continue;
}
} else {
if (memcmp(pInfo->prevData, val, bytes) == 0) {
num++;
continue;
}
bool equal = groupKeyCompare(pInfo, pBlock, j, numOfGroupCols);
if (equal) {
num++;
continue;
}
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, 0,
pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
/*int32_t ret = */generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0,
pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pInfo->binfo.pCtx, &w, j - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols);
num = 1;
memcpy(pInfo->prevData, val, bytes);
}
if (num > 0) {
char* val = ((char*)pColInfoData->pData) + bytes * (pBlock->info.rows - num);
memcpy(pInfo->prevData, val, bytes);
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, 0,
/*int32_t ret = */generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0,
pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
......@@ -1691,8 +1766,6 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
doApplyFunctions(pInfo->binfo.pCtx, &w, pBlock->info.rows - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
}
tfree(pInfo->prevData);
}
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
......@@ -1783,20 +1856,11 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupId,
SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup) {
int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset;
SResultRowInfo *pResultRowInfo = &binfo->resultRowInfo;
SqlFunctionCtx *pCtx = binfo->pCtx;
// not assign result buffer yet, add new result buffer, TODO remove it
char* d = pData;
int16_t len = bytes;
if (IS_VAR_DATA_TYPE(type)) {
d = varDataVal(pData);
len = varDataLen(pData);
}
int64_t tid = 0;
SResultRow *pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, groupId, (char *)pData, TSDB_KEYSIZE, true, groupId, pTaskInfo, true, pAggSup);
SResultRow *pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, groupId, (char *)pData, bytes, true, groupId,
pTaskInfo, true, pAggSup);
assert (pResultRow != NULL);
setResultRowKey(pResultRow, pData, type);
......@@ -6969,7 +7033,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
doHashGroupbyAgg(pOperator, pInfo, pBlock);
doHashGroupbyAgg(pOperator, pBlock);
}
pOperator->status = OP_RES_TO_RETURN;
......@@ -7281,7 +7345,9 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
tfree(pInfo->prevData);
tfree(pInfo->keyBuf);
taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroy(pInfo->pGroupColVals);
}
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -7664,6 +7730,39 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
return pOperator;
}
static int32_t initGroupOptrInfo(SGroupbyOperatorInfo *pInfo, SArray* pGroupColList) {
pInfo->pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
if (pInfo->pGroupColVals == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
for(int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pGroupColList, i);
pInfo->groupKeyLen += pCol->bytes;
struct SGroupKeys key = {0};
key.bytes = pCol->bytes;
key.type = pCol->type;
key.isNull = false;
key.pData = calloc(1, pCol->bytes);
if (key.pData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pInfo->pGroupColVals, &key);
}
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
pInfo->keyBuf = calloc(1, pInfo->groupKeyLen + nullFlagSize);
if (pInfo->keyBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo));
......@@ -7674,9 +7773,13 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pInfo->pGroupCols = pGroupColList;
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
int32_t code = initGroupOptrInfo(pInfo, pGroupColList);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
......@@ -7688,7 +7791,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupbyOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册