提交 aa8b12e2 编写于 作者: Y yihaoDeng

[TD-5797]<feature> support multi distinct

上级 bc07960f
...@@ -1948,10 +1948,10 @@ bool isValidDistinctSql(SQueryInfo* pQueryInfo) { ...@@ -1948,10 +1948,10 @@ bool isValidDistinctSql(SQueryInfo* pQueryInfo) {
&& (pQueryInfo->type & TSDB_QUERY_TYPE_TABLE_QUERY) != TSDB_QUERY_TYPE_TABLE_QUERY) { && (pQueryInfo->type & TSDB_QUERY_TYPE_TABLE_QUERY) != TSDB_QUERY_TYPE_TABLE_QUERY) {
return false; return false;
} }
if (tscNumOfExprs(pQueryInfo) == 1){ //if (tscNumOfExprs(pQueryInfo) == 1){
return true; // return true;
} //}
return false; return true;
} }
static bool hasNoneUserDefineExpr(SQueryInfo* pQueryInfo) { static bool hasNoneUserDefineExpr(SQueryInfo* pQueryInfo) {
......
...@@ -508,13 +508,21 @@ typedef struct SStateWindowOperatorInfo { ...@@ -508,13 +508,21 @@ typedef struct SStateWindowOperatorInfo {
bool reptScan; bool reptScan;
} SStateWindowOperatorInfo ; } SStateWindowOperatorInfo ;
typedef struct SDistinctDataInfo {
int32_t index;
int32_t type;
int32_t bytes;
} SDistinctDataInfo;
typedef struct SDistinctOperatorInfo { typedef struct SDistinctOperatorInfo {
SHashObj *pSet; SHashObj *pSet;
SSDataBlock *pRes; SSDataBlock *pRes;
bool recordNullVal; //has already record the null value, no need to try again bool recordNullVal; //has already record the null value, no need to try again
int64_t threshold; int64_t threshold;
int64_t outputCapacity; int64_t outputCapacity;
int32_t colIndex; int32_t totalBytes;
char* buf;
SArray* pDistinctDataInfo;
} SDistinctOperatorInfo; } SDistinctOperatorInfo;
struct SGlobalMerger; struct SGlobalMerger;
......
...@@ -44,6 +44,10 @@ ...@@ -44,6 +44,10 @@
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
#define MULTI_KEY_DELIM "-"
#define HASH_CAPACITY_LIMIT 10000000
#define TIME_WINDOW_COPY(_dst, _src) do {\ #define TIME_WINDOW_COPY(_dst, _src) do {\
(_dst).skey = (_src).skey;\ (_dst).skey = (_src).skey;\
(_dst).ekey = (_src).ekey;\ (_dst).ekey = (_src).ekey;\
...@@ -6109,6 +6113,8 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -6109,6 +6113,8 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*) param; SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*) param;
taosHashCleanup(pInfo->pSet); taosHashCleanup(pInfo->pSet);
tfree(pInfo->buf);
taosArrayDestroy(pInfo->pDistinctDataInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = destroyOutputBuf(pInfo->pRes);
} }
...@@ -6600,20 +6606,65 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf ...@@ -6600,20 +6606,65 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
return pOperator; return pOperator;
} }
static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* pOperator, SSDataBlock *pBlock) {
if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) {
// distinct info already inited
return true;
}
for (int i = 0; i < pOperator->numOfOutput; i++) {
pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
}
for (int i = 0; i < pOperator->numOfOutput; i++) {
int numOfBlock = taosArrayGetSize(pBlock->pDataBlock);
assert(i < numOfBlock);
for (int j = 0; j < numOfBlock; j++) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j);
if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resColId) {
SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes};
taosArrayInsert(pInfo->pDistinctDataInfo, i, &item);
}
}
}
pInfo->totalBytes += strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput);
pInfo->buf = calloc(1, pInfo->totalBytes);
return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false;
}
static void buildMultiDistinctKey(SDistinctOperatorInfo *pInfo, SSDataBlock *pBlock, int32_t rowId) {
char *p = pInfo->buf;
memset(p, 0, pInfo->totalBytes);
for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) {
SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo *)taosArrayGet(pInfo->pDistinctDataInfo, i);
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index);
char *val = ((char *)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId;
if (isNull(val, pDistDataInfo->type)) {
p += pDistDataInfo->bytes;
continue;
}
if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) {
memcpy(p, varDataVal(val), varDataLen(val));
p += varDataLen(val);
} else {
memcpy(p, val, pDistDataInfo->bytes);
p += pDistDataInfo->bytes;
}
memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM));
p += strlen(MULTI_KEY_DELIM);
}
}
static SSDataBlock* hashDistinct(void* param, bool* newgroup) { static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
SDistinctOperatorInfo* pInfo = pOperator->info; SDistinctOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
pRes->info.rows = 0; pRes->info.rows = 0;
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while(1) { while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
...@@ -6624,63 +6675,44 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { ...@@ -6624,63 +6675,44 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
break; break;
} }
if (pInfo->colIndex == -1) { if (!initMultiDistinctInfo(pInfo, pOperator, pBlock)) {
for (int i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, i);
if (pColDataInfo->info.colId == pOperator->pExpr[0].base.resColId) {
pInfo->colIndex = i;
break;
}
}
}
if (pInfo->colIndex == -1) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
return NULL; break;
} }
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); // ensure result output buf
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
// ensure the output buffer size
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, 0);
if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) { if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) {
int32_t newSize = pRes->info.rows + pBlock->info.rows; int32_t newSize = pRes->info.rows + pBlock->info.rows;
char* tmp = realloc(pResultColInfoData->pData, newSize * bytes); for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) {
if (tmp == NULL) { SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i);
return NULL; SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i);
} else { char* tmp = realloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes);
pResultColInfoData->pData = tmp; if (tmp == NULL) {
pInfo->outputCapacity = newSize; return NULL;
} else {
pResultColInfoData->pData = tmp;
}
} }
pInfo->outputCapacity = newSize;
} }
for(int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
char* val = ((char*)pColInfoData->pData) + bytes * i; buildMultiDistinctKey(pInfo, pBlock, i);
if (isNull(val, type)) { if (taosHashGet(pInfo->pSet, pInfo->buf, pInfo->totalBytes) == NULL) {
continue; int32_t dummy;
} taosHashPut(pInfo->pSet, pInfo->buf, pInfo->totalBytes, &dummy, sizeof(dummy));
char* p = val; for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) {
size_t keyLen = 0; SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info
if (IS_VAR_DATA_TYPE(pOperator->pExpr->base.colType)) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); //src
tstr* var = (tstr*)(val); SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist
p = var->data;
keyLen = varDataLen(var);
} else {
keyLen = bytes;
}
int dummy; char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i;
void* res = taosHashGet(pInfo->pSet, p, keyLen); char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows;
if (res == NULL) { memcpy(start, val, pDistDataInfo->bytes);
taosHashPut(pInfo->pSet, p, keyLen, &dummy, sizeof(dummy)); }
char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows;
memcpy(start, val, bytes);
pRes->info.rows += 1; pRes->info.rows += 1;
} }
} }
if (pRes->info.rows >= pInfo->threshold) { if (pRes->info.rows >= pInfo->threshold) {
break; break;
} }
...@@ -6691,11 +6723,14 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { ...@@ -6691,11 +6723,14 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo));
pInfo->colIndex = -1; pInfo->totalBytes = 0;
pInfo->threshold = 10000000; // distinct result threshold pInfo->buf = NULL;
pInfo->outputCapacity = 4096; pInfo->threshold = HASH_CAPACITY_LIMIT; // distinct result threshold
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(pExpr->base.colType), false, HASH_NO_LOCK); pInfo->outputCapacity = 4096;
pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo));
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "DistinctOperator"; pOperator->name = "DistinctOperator";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册