提交 549c6363 编写于 作者: M Minglei Jin

feat(query): group by multi columns

上级 9d4f88cd
......@@ -3958,8 +3958,8 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd
const char* msg4 = "join query does not support group by";
const char* msg5 = "not allowed column type for group by";
const char* msg6 = "tags not allowed for table query";
const char* msg7 = "not support group by expression";
const char* msg8 = "normal column can only locate at the end of group by clause";
//const char* msg7 = "not support group by primary key";
//const char* msg8 = "normal column can only locate at the end of group by clause";
const char* msg9 = "json tag must be use ->'key'";
const char* msg10 = "non json column can not use ->'key'";
const char* msg11 = "group by json->'key' is too long";
......@@ -4070,7 +4070,10 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd
// check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by
if (pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
}/*
if (index.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}*/
tscColumnListInsert(pQueryInfo->colList, index.columnIndex, pTableMeta->id.uid, pSchema);
......@@ -4085,14 +4088,14 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd
// 1. only one normal column allowed in the group by clause
// 2. the normal column in the group by clause can only located in the end position
if (numOfGroupCols > 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
//if (numOfGroupCols > 1) {
// return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
//}
for(int32_t i = 0; i < num; ++i) {
SColIndex* pIndex = taosArrayGet(pGroupExpr->columnInfo, i);
if (TSDB_COL_IS_NORMAL_COL(pIndex->flag) && i != num - 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8);
if (!TSDB_COL_IS_NORMAL_COL(pIndex->flag)) {
//return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8);
}
}
......
......@@ -546,10 +546,17 @@ typedef struct SFillOperatorInfo {
bool multigroupResult;
} SFillOperatorInfo;
typedef struct SGroupbyDataInfo {
int32_t index; // index of col in dataBlock
int32_t type;
int32_t bytes;
} SGroupbyDataInfo;
typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo;
int32_t colIndex;
char *prevData; // previous group by value
SArray *pGroupbyDataInfo;
int32_t totalBytes;
char *prevData; // previous data buf
} SGroupbyOperatorInfo;
typedef struct SSWindowOperatorInfo {
......
......@@ -258,7 +258,7 @@ static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t
static void setResultBufSize(SQueryAttr* pQueryAttr, SRspResultInfo* pResultInfo);
static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, SGroupbyOperatorInfo *pInfo);
static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId);
......@@ -1569,56 +1569,132 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
}
static bool initGroupbyInfo(const SSDataBlock *pSDataBlock, const SGroupbyExpr *pGroupbyExpr, SGroupbyOperatorInfo *pInfo) {
if (pInfo->pGroupbyDataInfo != NULL) {
// no need build group-by info
return true;
}
pInfo->pGroupbyDataInfo = taosArrayInit(pGroupbyExpr->numOfGroupCols, sizeof(SGroupbyDataInfo));
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
continue;
}
for (int32_t i = 0; i < pSDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pSDataBlock->pDataBlock, i);
if (pColInfo->info.colId == pColIndex->colId) {
int32_t type = pColInfo->info.type;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
return false;
}
pInfo->totalBytes += pColInfo->info.bytes;
SGroupbyDataInfo info = {.index = i, .type = pColInfo->info.type, .bytes = pColInfo->info.bytes};
taosArrayInsert(pInfo->pGroupbyDataInfo, k, &info);
break;
}
if (i == pSDataBlock->info.numOfCols - 1) {
// not found groupby col in dataBlock, error
return false;
}
}
}
pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * pGroupbyExpr->numOfGroupCols;
return true;
}
static void buildGroupbyKeyBuf(const SSDataBlock *pSDataBlock, SGroupbyOperatorInfo *pInfo, int32_t rowId, char **buf) {
char *p = calloc(1, pInfo->totalBytes);
if (p == NULL) {
*buf = NULL;
return;
}
*buf = p;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pGroupbyDataInfo); i++) {
SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, i);
SColumnInfoData* pColData = taosArrayGet(pSDataBlock->pDataBlock, pDataInfo->index);
//TODO(yihaoDeng): handle float & double
char *val = ((char *)pColData->pData) + pDataInfo->bytes * rowId;
if (isNull(val, pDataInfo->type)) {
p += pDataInfo->bytes;
continue;
}
if (IS_VAR_DATA_TYPE(pDataInfo->type)) {
memcpy(p, varDataVal(val), varDataLen(val));
p += varDataLen(val);
} else {
memcpy(p, val, pDataInfo->bytes);
p += pDataInfo->bytes;
}
memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM));
p += strlen(MULTI_KEY_DELIM);
}
}
static bool isGroupbyKeyEqual(void *a, void *b, void *ext) {
SGroupbyOperatorInfo *pInfo = (SGroupbyOperatorInfo *)ext;
if (memcmp(a, b, pInfo->totalBytes) == 0) {
return true;
}
int32_t offset = 0;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pGroupbyDataInfo); i++) {
SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, i);
char *k1 = (char *)a + offset;
char *k2 = (char *)b + offset;
if (getComparFunc(pDataInfo->type, 0)(k1, k2) != 0) {
return false;
}
offset += pDataInfo->bytes;
offset += (int32_t)strlen(MULTI_KEY_DELIM);
}
return true;
}
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
if (!initGroupbyInfo(pSDataBlock, pRuntimeEnv->pQueryAttr->pGroupbyExpr, pInfo)) {
qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv));
return;
}
//realloc pRuntimeEnv->keyBuf
pRuntimeEnv->keyBuf = realloc(pRuntimeEnv->keyBuf, pInfo->totalBytes + sizeof(int64_t) + POINTER_BYTES);
SColumnInfoData* pFirstColData = taosArrayGet(pSDataBlock->pDataBlock, 0);
int64_t* tsList = (pFirstColData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (int64_t*) pFirstColData->pData:NULL;
STimeWindow w = TSWINDOW_INITIALIZER;
int32_t num = 0;
char *key = NULL;
int16_t num = 0;
int32_t type = 0;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
char* val = ((char*)pColInfoData->pData) + bytes * j;
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
buildGroupbyKeyBuf(pSDataBlock, pInfo, j, &key);
if (!key) { continue; }
if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes);
memcpy(pInfo->prevData, val, bytes);
// first row of
pInfo->prevData = key;
num++;
continue;
}
if (IS_VAR_DATA_TYPE(type)) {
int32_t len = varDataLen(val);
if(len == varDataLen(pInfo->prevData) && memcmp(varDataVal(pInfo->prevData), varDataVal(val), len) == 0) {
num++;
continue;
}
} else {
if (memcmp(pInfo->prevData, val, bytes) == 0) {
num++;
continue;
}
} else if (isGroupbyKeyEqual(pInfo->prevData, key, pInfo)) {
num++;
tfree(key);
continue;
}
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData, bytes);
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo);
}
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, item->groupIndex);
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, pInfo->totalBytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
......@@ -1626,23 +1702,25 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
num = 1;
memcpy(pInfo->prevData, val, bytes);
tfree(pInfo->prevData);
pInfo->prevData = key;
}
if (num > 0) {
char* val = ((char*)pColInfoData->pData) + bytes * (pSDataBlock->info.rows - num);
memcpy(pInfo->prevData, val, bytes);
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes);
}
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
buildGroupbyKeyBuf(pSDataBlock, pInfo, pSDataBlock->info.rows - num, &key);
if (key) {
tfree(pInfo->prevData);
pInfo->prevData = key;
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo);
}
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, pInfo->totalBytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
}
doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
}
tfree(pInfo->prevData);
......@@ -1717,22 +1795,22 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
pSDataBlock->info.rows, pOperator->numOfOutput);
}
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
if (IS_VAR_DATA_TYPE(type)) {
if (pResultRow->key == NULL) {
pResultRow->key = malloc(varDataTLen(pData));
varDataCopy(pResultRow->key, pData);
} else {
assert(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0);
}
} else {
int64_t v = -1;
GET_TYPED_DATA(v, int64_t, type, pData);
pResultRow->win.skey = v;
pResultRow->win.ekey = v;
}
}
//static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
// if (IS_VAR_DATA_TYPE(type)) {
// if (pResultRow->key == NULL) {
// pResultRow->key = malloc(varDataTLen(pData));
// varDataCopy(pResultRow->key, pData);
// } else {
// assert(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0);
// }
// } else {
// int64_t v = -1;
// GET_TYPED_DATA(v, int64_t, type, pData);
//
// pResultRow->win.skey = v;
// pResultRow->win.ekey = v;
// }
//}
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
......@@ -1744,16 +1822,16 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasic
// not assign result buffer yet, add new result buffer, TODO remove it
char* d = pData;
int16_t len = bytes;
if (IS_VAR_DATA_TYPE(type)) {
d = varDataVal(pData);
len = varDataLen(pData);
}
//if (IS_VAR_DATA_TYPE(type)) {
// d = varDataVal(pData);
// len = varDataLen(pData);
//}
int64_t tid = 0;
SResultRow *pResultRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, d, len, true, groupIndex);
assert (pResultRow != NULL);
setResultRowKey(pResultRow, pData, type);
//setResultRowKey(pResultRow, pData, type);
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->pQueryAttr->resultRowSize);
if (ret != 0) {
......@@ -4161,7 +4239,7 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
}
void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes) {
void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, SGroupbyOperatorInfo *pInfo) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfExprs = pQueryAttr->numOfOutput;
......@@ -4174,6 +4252,20 @@ void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunction
pCtx[i].param[0].arr = NULL;
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// find colid in dataBlock
int32_t bytes, offset = 0;
char* val = NULL;
for (int32_t idx = 0; idx < taosArrayGetSize(pInfo->pGroupbyDataInfo); idx++) {
SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, idx);
if (pDataInfo->index == pExpr1->colInfo.colId) {
bytes = pDataInfo->bytes;
val = pInfo->prevData + offset;
break;
}
offset += pDataInfo->bytes;
}
if (val == NULL) { continue; }
// TODO use hash to speedup this loop
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
for (int32_t j = 0; j < numOfGroup; ++j) {
......@@ -7074,10 +7166,6 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQueryAttr->order.order);
setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
if (pInfo->colIndex == -1) {
pInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock);
}
doHashGroupbyAgg(pOperator, pInfo, pBlock);
}
......@@ -7352,7 +7440,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
taosArrayDestroy(&pInfo->pGroupbyDataInfo);
if (pInfo->prevData) {
tfree(pInfo->prevData);
}
......@@ -7829,7 +7917,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
return NULL;
}
pInfo->colIndex = -1; // group by column index
//pInfo->colIndex = -1; // group by column index
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
......
......@@ -783,13 +783,13 @@ if $data11 != 2 then
return -1
endi
sql_error select count(*) from m1 group by tbname,k,f1;
sql_error select count(*) from m1 group by tbname,k,a;
sql_error select count(*) from m1 group by k, tbname;
sql_error select count(*) from m1 group by k,f1;
#sql_error select count(*) from m1 group by tbname,k,f1;
#sql_error select count(*) from m1 group by tbname,k,a;
#sql_error select count(*) from m1 group by k, tbname;
#sql_error select count(*) from m1 group by k,f1;
sql_error select count(*) from tm0 group by tbname;
sql_error select count(*) from tm0 group by a;
sql_error select count(*) from tm0 group by k,f1;
#sql_error select count(*) from tm0 group by k,f1;
sql_error select count(*),f1 from m1 group by tbname,k;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册