提交 3224c7d2 编写于 作者: Y yihaoDeng

[TD-4335]<feature> group by multi column

上级 d0a15fea
...@@ -3191,8 +3191,8 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd ...@@ -3191,8 +3191,8 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd
const char* msg4 = "join query does not support group by"; const char* msg4 = "join query does not support group by";
const char* msg5 = "not allowed column type for group by"; const char* msg5 = "not allowed column type for group by";
const char* msg6 = "tags not allowed for table query"; const char* msg6 = "tags not allowed for table query";
const char* msg7 = "not support group by expression"; //const char* msg7 = "not support group by expression";
const char* msg8 = "normal column can only locate at the end of group by clause"; //const char* msg8 = "normal column can only locate at the end of group by clause";
// todo : handle two tables situation // todo : handle two tables situation
STableMetaInfo* pTableMetaInfo = NULL; STableMetaInfo* pTableMetaInfo = NULL;
...@@ -3291,14 +3291,14 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd ...@@ -3291,14 +3291,14 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd
// 1. only one normal column allowed in the group by clause // 1. only one normal column allowed in the group by clause
// 2. the normal column in the group by clause can only located in the end position // 2. the normal column in the group by clause can only located in the end position
if (numOfGroupCols > 1) { //if (numOfGroupCols > 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); // return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
} //}
for(int32_t i = 0; i < num; ++i) { for(int32_t i = 0; i < num; ++i) {
SColIndex* pIndex = taosArrayGet(pGroupExpr->columnInfo, i); SColIndex* pIndex = taosArrayGet(pGroupExpr->columnInfo, i);
if (TSDB_COL_IS_NORMAL_COL(pIndex->flag) && i != num - 1) { if (!TSDB_COL_IS_NORMAL_COL(pIndex->flag)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8); //return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8);
} }
} }
......
...@@ -478,10 +478,17 @@ typedef struct SFillOperatorInfo { ...@@ -478,10 +478,17 @@ typedef struct SFillOperatorInfo {
SSDataBlock *existNewGroupBlock; SSDataBlock *existNewGroupBlock;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct SGroupbyDataInfo {
int32_t index; // index of col in dataBlock
int32_t type;
int32_t bytes;
} SGroupbyDataInfo;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
int32_t colIndex; SArray *pGroupbyDataInfo;
char *prevData; // previous group by value int32_t totalBytes;
char *prevData; // previous data buf
} SGroupbyOperatorInfo; } SGroupbyOperatorInfo;
typedef struct SSWindowOperatorInfo { typedef struct SSWindowOperatorInfo {
......
...@@ -1328,53 +1328,103 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1328,53 +1328,103 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
} }
static bool buildGroupbyInfo(const SSDataBlock *pSDataBlock, const SGroupbyExpr *pGroupbyExpr, SGroupbyOperatorInfo *pInfo) {
// check inited or not
if (pInfo->prevData != NULL) {
return true;
}
pInfo->pGroupbyDataInfo = taosArrayInit(pGroupbyExpr->numOfGroupCols, sizeof(SGroupbyDataInfo));
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
if (pColInfo->info.colId == pColIndex->colId) {
int32_t type = pColInfo->info.type;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
return false;
}
pInfo->totalBytes += pColInfo->info.bytes;
SGroupbyDataInfo info = {.index = i, .type = pColInfo->info.type, .bytes = pColInfo->info.bytes};
taosArrayInsert(pInfo->pGroupbyDataInfo, k, &info);
break;
}
if (i == pDataBlock->info.numOfCols - 1) {
// not found groupby col in dataBlock, error
return false;
}
}
}
return true;
}
static void createGroupbyKeyBuf(const SSDataBlock *pSDataBlock, SGroupbyOperatorInfo *pInfo, int32_t rowId, char **buf) {
char *p = calloc(1, pInfo->totalBytes);
if (p == NULL) { *buf = NULL; return; }
*buf = p;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pGroupbyDataInfo); i++) {
SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, i);
SColumnInfoData* pColData = taosArrayGet(pSDataBlock->pDataBlock, pDataInfo->index);
char *val = ((char *)pColData->pData) + pDataInfo->bytes * rowId;
if (isNull(val, pDataInfo->type)) {
p += pDataInfo->bytes;
continue;
}
memcpy(p, val, pDataInfo->bytes);
p += pDataInfo->bytes;
}
}
static bool isGroupbyKeyEqual(void *a, void *b, void *ext) {
SGroupbyOperatorInfo *pInfo = (SGroupbyOperatorInfo *)ext;
int32_t offset = 0;
for (i = 0; i < taosArrayGetSize(pInfo->pGroupbyDataInfo); i++) {
SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, i);
char *k1 = (char *)a + offset;
char *k2 = (char *)b + offset;
if (getComparFunc(pDataInfo->type, 0)(k1, k2) != 0) {
return false;
}
offset += pDataInfo->bytes;
}
return true;
}
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current; STableQueryInfo* item = pRuntimeEnv->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { if (buildGroupbyInfo(pSDataBlock, pRuntimeEnv->pQueryAttr->pGroupbyExpr, pInfo)) {
qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv)); qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv));
return; return;
} }
SColumnInfoData* pFirstColData = taosArrayGet(pSDataBlock->pDataBlock, 0); SColumnInfoData* pFirstColData = taosArrayGet(pSDataBlock->pDataBlock, 0);
int64_t* tsList = (pFirstColData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (int64_t*) pFirstColData->pData:NULL; int64_t* tsList = (pFirstColData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (int64_t*) pFirstColData->pData:NULL;
STimeWindow w = TSWINDOW_INITIALIZER; STimeWindow w = TSWINDOW_INITIALIZER;
char *key = NULL;
int32_t num = 0; int32_t num = 0;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
char* val = ((char*)pColInfoData->pData) + bytes * j; createGroupbyKeyBuf(pSDataBlock, pInfo, j, &key);
if (isNull(val, type)) { if (key == NULL) {}
continue;
}
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (pInfo->prevData == NULL) { if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes); // first row of
memcpy(pInfo->prevData, val, bytes); pInfo->prevData = key;
num++; num++;
continue; continue;
} } else if (isGroupbyKeyEqual(pInfo->prevData, key, pInfo)) {
num++;
if (IS_VAR_DATA_TYPE(type)) { tfree(key);
int32_t len = varDataLen(val); continue;
if(len == varDataLen(pInfo->prevData) && memcmp(varDataVal(pInfo->prevData), varDataVal(val), len) == 0) { }
num++;
continue;
}
} else {
if (memcmp(pInfo->prevData, val, bytes) == 0) {
num++;
continue;
}
}
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData, bytes); setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData, bytes);
...@@ -1388,7 +1438,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1388,7 +1438,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
num = 1; num = 1;
memcpy(pInfo->prevData, val, bytes); tfree(pInfo->prevData);
pInfo->prevData = key;
} }
if (num > 0) { if (num > 0) {
...@@ -1399,7 +1450,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1399,7 +1450,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes); setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes);
} }
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex); int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -5631,10 +5682,6 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { ...@@ -5631,10 +5682,6 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQueryAttr->order.order); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQueryAttr->order.order);
setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
if (pInfo->colIndex == -1) {
pInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock);
}
doHashGroupbyAgg(pOperator, pInfo, pBlock); doHashGroupbyAgg(pOperator, pInfo, pBlock);
} }
...@@ -5852,6 +5899,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -5852,6 +5899,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param; SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
taosArrayDestroy(pInfo->pGroupbyDataInfo);
tfree(pInfo->prevData); tfree(pInfo->prevData);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册