提交 866b6fb6 编写于 作者: Y yihaoDeng

[TD-4335]<feature> group by multi column

上级 3224c7d2
...@@ -204,7 +204,7 @@ static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t ...@@ -204,7 +204,7 @@ static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t
static void setResultBufSize(SQueryAttr* pQueryAttr, SRspResultInfo* pResultInfo); static void setResultBufSize(SQueryAttr* pQueryAttr, SRspResultInfo* pResultInfo);
static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes); static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, SGroupbyOperatorInfo *pInfo);
static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput,
int32_t groupIndex); int32_t groupIndex);
...@@ -1329,16 +1329,16 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1329,16 +1329,16 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
static bool buildGroupbyInfo(const SSDataBlock *pSDataBlock, const SGroupbyExpr *pGroupbyExpr, SGroupbyOperatorInfo *pInfo) { static bool buildGroupbyInfo(const SSDataBlock *pSDataBlock, const SGroupbyExpr *pGroupbyExpr, SGroupbyOperatorInfo *pInfo) {
// check inited or not
if (pInfo->prevData != NULL) { if (pInfo->prevData != NULL) {
// no need build group-by info
return true; return true;
} }
pInfo->pGroupbyDataInfo = taosArrayInit(pGroupbyExpr->numOfGroupCols, sizeof(SGroupbyDataInfo)); pInfo->pGroupbyDataInfo = taosArrayInit(pGroupbyExpr->numOfGroupCols, sizeof(SGroupbyDataInfo));
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) { for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k); SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pSDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* pColInfo = taosArrayGet(pSDataBlock->pDataBlock, i);
if (pColInfo->info.colId == pColIndex->colId) { if (pColInfo->info.colId == pColIndex->colId) {
int32_t type = pColInfo->info.type; int32_t type = pColInfo->info.type;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
...@@ -1350,7 +1350,7 @@ static bool buildGroupbyInfo(const SSDataBlock *pSDataBlock, const SGroupbyExpr ...@@ -1350,7 +1350,7 @@ static bool buildGroupbyInfo(const SSDataBlock *pSDataBlock, const SGroupbyExpr
taosArrayInsert(pInfo->pGroupbyDataInfo, k, &info); taosArrayInsert(pInfo->pGroupbyDataInfo, k, &info);
break; break;
} }
if (i == pDataBlock->info.numOfCols - 1) { if (i == pSDataBlock->info.numOfCols - 1) {
// not found groupby col in dataBlock, error // not found groupby col in dataBlock, error
return false; return false;
} }
...@@ -1358,7 +1358,7 @@ static bool buildGroupbyInfo(const SSDataBlock *pSDataBlock, const SGroupbyExpr ...@@ -1358,7 +1358,7 @@ static bool buildGroupbyInfo(const SSDataBlock *pSDataBlock, const SGroupbyExpr
} }
return true; return true;
} }
static void createGroupbyKeyBuf(const SSDataBlock *pSDataBlock, SGroupbyOperatorInfo *pInfo, int32_t rowId, char **buf) { static void buildGroupbyKeyBuf(const SSDataBlock *pSDataBlock, SGroupbyOperatorInfo *pInfo, int32_t rowId, char **buf) {
char *p = calloc(1, pInfo->totalBytes); char *p = calloc(1, pInfo->totalBytes);
if (p == NULL) { *buf = NULL; return; } if (p == NULL) { *buf = NULL; return; }
...@@ -1367,12 +1367,12 @@ static void createGroupbyKeyBuf(const SSDataBlock *pSDataBlock, SGroupbyOperator ...@@ -1367,12 +1367,12 @@ static void createGroupbyKeyBuf(const SSDataBlock *pSDataBlock, SGroupbyOperator
SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, i); SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, i);
SColumnInfoData* pColData = taosArrayGet(pSDataBlock->pDataBlock, pDataInfo->index); SColumnInfoData* pColData = taosArrayGet(pSDataBlock->pDataBlock, pDataInfo->index);
//TODO(yihaoDeng): handle float & double
char *val = ((char *)pColData->pData) + pDataInfo->bytes * rowId; char *val = ((char *)pColData->pData) + pDataInfo->bytes * rowId;
if (isNull(val, pDataInfo->type)) { if (isNull(val, pDataInfo->type)) {
p += pDataInfo->bytes; p += pDataInfo->bytes;
continue; continue;
} }
memcpy(p, val, pDataInfo->bytes); memcpy(p, val, pDataInfo->bytes);
p += pDataInfo->bytes; p += pDataInfo->bytes;
} }
...@@ -1381,7 +1381,7 @@ static bool isGroupbyKeyEqual(void *a, void *b, void *ext) { ...@@ -1381,7 +1381,7 @@ static bool isGroupbyKeyEqual(void *a, void *b, void *ext) {
SGroupbyOperatorInfo *pInfo = (SGroupbyOperatorInfo *)ext; SGroupbyOperatorInfo *pInfo = (SGroupbyOperatorInfo *)ext;
int32_t offset = 0; int32_t offset = 0;
for (i = 0; i < taosArrayGetSize(pInfo->pGroupbyDataInfo); i++) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pGroupbyDataInfo); i++) {
SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, i); SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, i);
char *k1 = (char *)a + offset; char *k1 = (char *)a + offset;
char *k2 = (char *)b + offset; char *k2 = (char *)b + offset;
...@@ -1398,7 +1398,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1398,7 +1398,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
if (buildGroupbyInfo(pSDataBlock, pRuntimeEnv->pQueryAttr->pGroupbyExpr, pInfo)) { if (!buildGroupbyInfo(pSDataBlock, pRuntimeEnv->pQueryAttr->pGroupbyExpr, pInfo)) {
qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv)); qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv));
return; return;
} }
...@@ -1410,11 +1410,12 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1410,11 +1410,12 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
STimeWindow w = TSWINDOW_INITIALIZER; STimeWindow w = TSWINDOW_INITIALIZER;
char *key = NULL; char *key = NULL;
int32_t num = 0; int16_t num = 0;
int32_t type = 0;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
createGroupbyKeyBuf(pSDataBlock, pInfo, j, &key); buildGroupbyKeyBuf(pSDataBlock, pInfo, j, &key);
if (key == NULL) {}
if (key == NULL) { /* handle malloc failure*/}
if (pInfo->prevData == NULL) { if (pInfo->prevData == NULL) {
// first row of // first row of
pInfo->prevData = key; pInfo->prevData = key;
...@@ -1425,12 +1426,12 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1425,12 +1426,12 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
tfree(key); tfree(key);
continue; continue;
} }
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData, bytes); setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo);
} }
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, item->groupIndex); int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, pInfo->totalBytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -1438,25 +1439,22 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1438,25 +1439,22 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
num = 1; num = 1;
tfree(pInfo->prevData); tfree(pInfo->prevData);
pInfo->prevData = key; pInfo->prevData = key;
} }
if (num > 0) { if (num > 0) {
char* val = ((char*)pColInfoData->pData) + bytes * (pSDataBlock->info.rows - num);
memcpy(pInfo->prevData, val, bytes);
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes); setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo);
} }
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, item->groupIndex); int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, pInfo->totalBytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
tfree(pInfo->prevData);
} }
} }
...@@ -1556,10 +1554,10 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasic ...@@ -1556,10 +1554,10 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasic
// not assign result buffer yet, add new result buffer, TODO remove it // not assign result buffer yet, add new result buffer, TODO remove it
char* d = pData; char* d = pData;
int16_t len = bytes; int16_t len = bytes;
if (IS_VAR_DATA_TYPE(type)) { //if (IS_VAR_DATA_TYPE(type)) {
d = varDataVal(pData); // d = varDataVal(pData);
len = varDataLen(pData); // len = varDataLen(pData);
} //}
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex); SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex);
assert (pResultRow != NULL); assert (pResultRow != NULL);
...@@ -1826,7 +1824,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1826,7 +1824,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->pQueryAttr = pQueryAttr; pRuntimeEnv->pQueryAttr = pQueryAttr;
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t)); pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t) + 64);
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQueryAttr->numOfCols + pQueryAttr->srcRowSize); pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQueryAttr->numOfCols + pQueryAttr->srcRowSize);
pRuntimeEnv->tagVal = malloc(pQueryAttr->tagLen); pRuntimeEnv->tagVal = malloc(pQueryAttr->tagLen);
...@@ -3651,7 +3649,7 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx ...@@ -3651,7 +3649,7 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
} }
void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes) { void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, SGroupbyOperatorInfo *pInfo) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfExprs = pQueryAttr->numOfOutput; int32_t numOfExprs = pQueryAttr->numOfOutput;
...@@ -3663,6 +3661,23 @@ void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunction ...@@ -3663,6 +3661,23 @@ void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunction
pCtx[i].param[0].arr = NULL; pCtx[i].param[0].arr = NULL;
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// find colid in dataBlock
int32_t bytes, offset = 0;
char* val = NULL;
for (int32_t idx = 0; idx < taosArrayGetSize(pInfo->pGroupbyDataInfo); idx++) {
SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, idx);
if (pDataInfo->index == pExpr1->colInfo.colId) {
bytes = pDataInfo->bytes;
val = pInfo->prevData + offset;
break;
}
offset += pDataInfo->bytes;
if (idx == taosArrayGetSize(pInfo->pGroupbyDataInfo) - 1) {
continue;
}
}
// TODO use hash to speedup this loop // TODO use hash to speedup this loop
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult); int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
...@@ -6146,8 +6161,6 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti ...@@ -6146,8 +6161,6 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo));
pInfo->colIndex = -1; // group by column index
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册