提交 e5b9fdbd 编写于 作者: wmmhello's avatar wmmhello

fix error in unique group by logic

上级 c39078a3
...@@ -440,6 +440,15 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu ...@@ -440,6 +440,15 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu
rlen += pExpr->base.resBytes; rlen += pExpr->base.resBytes;
} }
int32_t pg = DEFAULT_PAGE_SIZE;
int32_t overhead = sizeof(tFilePage);
while((pg - overhead) < rlen * 2) {
pg *= 2;
}
if (*nBufferSizes < pg){
*nBufferSizes = 2 * pg;
}
int32_t capacity = 0; int32_t capacity = 0;
if (rlen != 0) { if (rlen != 0) {
if ((*nBufferSizes) < rlen) { if ((*nBufferSizes) < rlen) {
...@@ -454,12 +463,6 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu ...@@ -454,12 +463,6 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
int32_t pg = DEFAULT_PAGE_SIZE;
int32_t overhead = sizeof(tFilePage);
while((pg - overhead) < rlen * 2) {
pg *= 2;
}
assert(numOfSub <= pTableMetaInfo->vgroupList->numOfVgroups); assert(numOfSub <= pTableMetaInfo->vgroupList->numOfVgroups);
for (int32_t i = 0; i < numOfSub; ++i) { for (int32_t i = 0; i < numOfSub; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(*nBufferSizes, rlen, pg, pModel); (*pMemBuffer)[i] = createExtMemBuffer(*nBufferSizes, rlen, pg, pModel);
......
...@@ -4938,7 +4938,11 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu ...@@ -4938,7 +4938,11 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu
pse->colInfo.colIndex = i; pse->colInfo.colIndex = i;
pse->colType = pExpr->base.resType; pse->colType = pExpr->base.resType;
pse->colBytes = pExpr->base.resBytes; if(pExpr->base.resBytes > INT16_MAX && pExpr->base.functionId == TSDB_FUNC_UNIQUE){
pQueryAttr->interBytesForGlobal = pExpr->base.resBytes;
}else{
pse->colBytes = pExpr->base.resBytes;
}
} }
{ {
......
...@@ -54,7 +54,7 @@ typedef struct SSqlExpr { ...@@ -54,7 +54,7 @@ typedef struct SSqlExpr {
int32_t resBytes; // length of return value int32_t resBytes; // length of return value
int32_t interBytes; // inter result buffer size int32_t interBytes; // inter result buffer size
int16_t colType; // table column type int16_t colType; // table column type, this should be int32_t, because it is too small for globale merge stage, pQueryAttr->interBytesForGlobal
int16_t colBytes; // table column bytes int16_t colBytes; // table column bytes
int16_t numOfParams; // argument value of each function int16_t numOfParams; // argument value of each function
......
...@@ -175,7 +175,7 @@ typedef struct SQLFunctionCtx { ...@@ -175,7 +175,7 @@ typedef struct SQLFunctionCtx {
void * pInput; // input data buffer void * pInput; // input data buffer
uint32_t order; // asc|desc uint32_t order; // asc|desc
int16_t inputType; int16_t inputType;
int16_t inputBytes; int32_t inputBytes;
int16_t outputType; int16_t outputType;
int32_t outputBytes; // size of results, determined by function and input column data type int32_t outputBytes; // size of results, determined by function and input column data type
...@@ -202,7 +202,7 @@ typedef struct SQLFunctionCtx { ...@@ -202,7 +202,7 @@ typedef struct SQLFunctionCtx {
SPoint1 start; SPoint1 start;
SPoint1 end; SPoint1 end;
SHashObj *pUniqueSet; // for unique function SHashObj **pUniqueSet; // for unique function
} SQLFunctionCtx; } SQLFunctionCtx;
typedef struct SAggFunctionInfo { typedef struct SAggFunctionInfo {
......
...@@ -90,6 +90,7 @@ typedef struct SResultRow { ...@@ -90,6 +90,7 @@ typedef struct SResultRow {
SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo
STimeWindow win; STimeWindow win;
char *key; // start key of current result row char *key; // start key of current result row
SHashObj *uniqueHash; // for unique function
} SResultRow; } SResultRow;
typedef struct SResultRowCell { typedef struct SResultRowCell {
...@@ -282,7 +283,7 @@ typedef struct SQueryAttr { ...@@ -282,7 +283,7 @@ typedef struct SQueryAttr {
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo> STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId; int32_t vgId;
SArray *pUdfInfo; // no need to free SArray *pUdfInfo; // no need to free
int32_t maxUniqueResult; int32_t interBytesForGlobal;
} SQueryAttr; } SQueryAttr;
typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup); typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup);
......
...@@ -70,8 +70,12 @@ static FORCE_INLINE char* getPosInResultPage(SQueryAttr* pQueryAttr, tFilePage* ...@@ -70,8 +70,12 @@ static FORCE_INLINE char* getPosInResultPage(SQueryAttr* pQueryAttr, tFilePage*
int32_t offset) { int32_t offset) {
assert(rowOffset >= 0 && pQueryAttr != NULL); assert(rowOffset >= 0 && pQueryAttr != NULL);
int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery); int64_t numOfRows = (int64_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return ((char *)page->data) + rowOffset + offset * numOfRows; numOfRows *= offset;
if(numOfRows >= INT32_MAX){
assert(0);
}
return ((char *)page->data) + rowOffset + numOfRows;
} }
bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type); bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
......
...@@ -5190,11 +5190,13 @@ static bool unique_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes ...@@ -5190,11 +5190,13 @@ static bool unique_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
return false; return false;
} }
*pCtx->pUniqueSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
return true; return true;
} }
static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSKEY timestamp, char *pData, char *tag, int32_t bytes){ static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSKEY timestamp, char *pData, char *tag, int32_t bytes){
UniqueUnit *unique = taosHashGet(pCtx->pUniqueSet, pData, bytes); UniqueUnit *unique = taosHashGet(*pCtx->pUniqueSet, pData, bytes);
if (unique == NULL) { if (unique == NULL) {
size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen; size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen;
char *tmp = pInfo->res + pInfo->num * size; char *tmp = pInfo->res + pInfo->num * size;
...@@ -5220,7 +5222,7 @@ static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSK ...@@ -5220,7 +5222,7 @@ static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSK
} }
} }
taosHashPut(pCtx->pUniqueSet, pData, bytes, &tmp, sizeof(UniqueUnit*)); taosHashPut(*pCtx->pUniqueSet, pData, bytes, &tmp, sizeof(UniqueUnit*));
pInfo->num++; pInfo->num++;
}else if(unique->timestamp > timestamp){ }else if(unique->timestamp > timestamp){
unique->timestamp = timestamp; unique->timestamp = timestamp;
......
...@@ -281,7 +281,7 @@ static int compareRowData(const void *a, const void *b, const void *userData) { ...@@ -281,7 +281,7 @@ static int compareRowData(const void *a, const void *b, const void *userData) {
tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pRow1->pageId); tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pRow1->pageId);
tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId); tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId);
int16_t offset = supporter->dataOffset; int32_t offset = supporter->dataOffset;
char *in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset); char *in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset);
char *in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset); char *in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset);
...@@ -1964,8 +1964,12 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr ...@@ -1964,8 +1964,12 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->requireNull = false; pCtx->requireNull = false;
} }
pCtx->inputBytes = pSqlExpr->colBytes;
pCtx->inputType = pSqlExpr->colType; pCtx->inputType = pSqlExpr->colType;
if (pRuntimeEnv->pQueryAttr->interBytesForGlobal > INT16_MAX && pSqlExpr->functionId == TSDB_FUNC_UNIQUE){
pCtx->inputBytes = pRuntimeEnv->pQueryAttr->interBytesForGlobal;
}else{
pCtx->inputBytes = pSqlExpr->colBytes;
}
pCtx->ptsOutputBuf = NULL; pCtx->ptsOutputBuf = NULL;
...@@ -2030,8 +2034,6 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr ...@@ -2030,8 +2034,6 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
} else if (functionId == TSDB_FUNC_SCALAR_EXPR) { } else if (functionId == TSDB_FUNC_SCALAR_EXPR) {
pCtx->param[1].pz = (char*) &pRuntimeEnv->sasArray[i]; pCtx->param[1].pz = (char*) &pRuntimeEnv->sasArray[i];
} else if (functionId == TSDB_FUNC_UNIQUE){
pCtx->pUniqueSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
} }
} }
...@@ -2056,9 +2058,6 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { ...@@ -2056,9 +2058,6 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
tVariantDestroy(&pCtx[i].tag); tVariantDestroy(&pCtx[i].tag);
tfree(pCtx[i].tagInfo.pTagCtxList); tfree(pCtx[i].tagInfo.pTagCtxList);
if (pCtx[i].functionId == TSDB_FUNC_UNIQUE){
taosHashClear(pCtx[i].pUniqueSet);
}
} }
tfree(pCtx); tfree(pCtx);
...@@ -3688,6 +3687,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i ...@@ -3688,6 +3687,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
RESET_RESULT_INFO(pCellInfo); RESET_RESULT_INFO(pCellInfo);
pCtx[i].resultInfo = pCellInfo; pCtx[i].resultInfo = pCellInfo;
pCtx[i].pUniqueSet = &pRow->uniqueHash;
pCtx[i].pOutput = pData->pData; pCtx[i].pOutput = pData->pData;
pCtx[i].currentStage = stage; pCtx[i].currentStage = stage;
assert(pCtx[i].pOutput != NULL); assert(pCtx[i].pOutput != NULL);
...@@ -4022,6 +4022,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe ...@@ -4022,6 +4022,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
int32_t offset = 0; int32_t offset = 0;
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
pCtx[i].pUniqueSet = &pResult->uniqueHash;
SResultRowCellInfo* pResInfo = pCtx[i].resultInfo; SResultRowCellInfo* pResInfo = pCtx[i].resultInfo;
if (pResInfo->initialized && pResInfo->complete) { if (pResInfo->initialized && pResInfo->complete) {
...@@ -4097,7 +4098,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF ...@@ -4097,7 +4098,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
int16_t offset = 0; int32_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResult->offset, offset); pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResult->offset, offset);
offset += pCtx[i].outputBytes; offset += pCtx[i].outputBytes;
...@@ -4115,6 +4116,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF ...@@ -4115,6 +4116,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
* not all queries require the interResultBuf, such as COUNT * not all queries require the interResultBuf, such as COUNT
*/ */
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
pCtx[i].pUniqueSet = &pResult->uniqueHash;
} }
} }
......
...@@ -88,6 +88,9 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { ...@@ -88,6 +88,9 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
for(int32_t i = 0; i < pResultRowInfo->size; ++i) { for(int32_t i = 0; i < pResultRowInfo->size; ++i) {
if (pResultRowInfo->pResult[i]) { if (pResultRowInfo->pResult[i]) {
tfree(pResultRowInfo->pResult[i]->key); tfree(pResultRowInfo->pResult[i]->key);
if (pResultRowInfo->pResult[i]->uniqueHash){
taosHashCleanup(pResultRowInfo->pResult[i]->uniqueHash);
}
} }
} }
...@@ -153,11 +156,11 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16 ...@@ -153,11 +156,11 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
if (pResultRow->pageId >= 0) { if (pResultRow->pageId >= 0) {
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId); tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId);
int16_t offset = 0; int32_t offset = 0;
for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) { for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) {
SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i]; SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i];
int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resBytes; int32_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resBytes;
char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset); char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset);
memset(s, 0, size); memset(s, 0, size);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册