提交 6cc2a721 编写于 作者: wmmhello's avatar wmmhello

modify unique function like top

上级 474c47b0
......@@ -242,7 +242,7 @@ SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnInde
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size);
int32_t size);
size_t tscNumOfExprs(SQueryInfo* pQueryInfo);
int32_t tscExprTopBottomIndex(SQueryInfo* pQueryInfo);
......
......@@ -4000,7 +4000,9 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
(functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) ||
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) ||
(functionId == TSDB_FUNC_SAMPLE) ||
(functionId == TSDB_FUNC_ELAPSED) || (functionId == TSDB_FUNC_HISTOGRAM)) {
(functionId == TSDB_FUNC_ELAPSED) ||
(functionId == TSDB_FUNC_HISTOGRAM) ||
(functionId == TSDB_FUNC_UNIQUE)) {
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes,
&interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......
......@@ -2615,7 +2615,7 @@ SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnInde
}
SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex,
int16_t type, int16_t size) {
int16_t type, int32_t size) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SExprInfo* pExpr = tscExprGet(pQueryInfo, index);
if (pExpr == NULL) {
......
......@@ -5193,15 +5193,15 @@ static bool unique_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes
return true;
}
static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSKEY timestamp, char *pData, char *tag){
UniqueUnit *unique = taosHashGet(pCtx->pUniqueSet, pData, pCtx->inputBytes);
static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSKEY timestamp, char *pData, char *tag, int32_t bytes){
UniqueUnit *unique = taosHashGet(pCtx->pUniqueSet, pData, bytes);
if (unique == NULL) {
size_t size = sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen;
size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen;
char *tmp = pInfo->res + pInfo->num * size;
((UniqueUnit*)tmp)->timestamp = timestamp;
char *data = tmp + sizeof(UniqueUnit);
char *tags = tmp + sizeof(UniqueUnit) + pCtx->inputBytes;
memcpy(data, pData, pCtx->inputBytes);
char *tags = tmp + sizeof(UniqueUnit) + bytes;
memcpy(data, pData, bytes);
if (pCtx->currentStage == MERGE_STAGE && tag != NULL) {
memcpy(tags, tag, (size_t)pCtx->tagInfo.tagsLen);
......@@ -5209,15 +5209,18 @@ static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSK
int32_t offset = 0;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
SQLFunctionCtx *tagCtx = pCtx->tagInfo.pTagCtxList[j];
if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
aAggs[TSDB_FUNC_TAG].xFunction(tagCtx);
memcpy(tags + offset, tagCtx->pOutput, tagCtx->outputBytes);
offset += tagCtx->outputBytes;
if (tagCtx->functionId == TSDB_FUNC_TS_DUMMY) {
tagCtx->tag.nType = TSDB_DATA_TYPE_BIGINT;
tagCtx->tag.i64 = timestamp;
}
tVariantDump(&tagCtx->tag, tagCtx->pOutput, tagCtx->tag.nType, true);
memcpy(tags + offset, tagCtx->pOutput, tagCtx->outputBytes);
offset += tagCtx->outputBytes;
}
}
taosHashPut(pCtx->pUniqueSet, pData, pCtx->inputBytes, &tmp, sizeof(tValuePair*));
taosHashPut(pCtx->pUniqueSet, pData, bytes, &tmp, sizeof(UniqueUnit*));
pInfo->num++;
}else if(unique->timestamp > timestamp){
unique->timestamp = timestamp;
......@@ -5233,7 +5236,7 @@ static void unique_function(SQLFunctionCtx *pCtx) {
if (pCtx->ptsList != NULL) {
k = GET_TS_DATA(pCtx, i);
}
do_unique_function(pCtx, pInfo, k, pData, NULL);
do_unique_function(pCtx, pInfo, k, pData, NULL, pCtx->inputBytes);
if (sizeof(SUniqueFuncInfo) + pInfo->num * (sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){
GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory
......@@ -5247,15 +5250,15 @@ static void unique_function(SQLFunctionCtx *pCtx) {
static void unique_function_merge(SQLFunctionCtx *pCtx) {
SUniqueFuncInfo *pInput = (SUniqueFuncInfo *)GET_INPUT_DATA_LIST(pCtx);
SUniqueFuncInfo *pOutput = getUniqueOutputInfo(pCtx);
size_t size = sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen;
size_t size = sizeof(UniqueUnit) + pCtx->outputBytes + pCtx->tagInfo.tagsLen;
for (int32_t i = 0; i < pInput->num; ++i) {
char *tmp = pInput->res + i* size;
TSKEY timestamp = ((UniqueUnit*)tmp)->timestamp;
char *data = tmp + sizeof(UniqueUnit);
char *tags = tmp + sizeof(UniqueUnit) + pCtx->inputBytes;
do_unique_function(pCtx, pOutput, timestamp, data, tags);
char *tags = tmp + sizeof(UniqueUnit) + pCtx->outputBytes;
do_unique_function(pCtx, pOutput, timestamp, data, tags, pCtx->outputBytes);
if (sizeof(SUniqueFuncInfo) + pOutput->num * (sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){
if (sizeof(SUniqueFuncInfo) + pOutput->num * (sizeof(UniqueUnit) + pCtx->outputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){
GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory
return;
}
......@@ -5772,7 +5775,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
// 39
"unique",
TSDB_FUNC_UNIQUE,
TSDB_FUNC_INVALID_ID,
TSDB_FUNC_UNIQUE,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_SELECTIVITY,
unique_function_setup,
unique_function,
......
......@@ -33,7 +33,7 @@ typedef struct SCompSupporter {
} SCompSupporter;
int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable) {
if (pQueryAttr && (!stable)) {
if (pQueryAttr && (!stable)) { // if table is stable, no need return more than 1 no in merge stage
for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM ||
......@@ -42,11 +42,11 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo
return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64;
}
}
if (pQueryAttr->uniqueQuery){
return MAX_UNIQUE_RESULT_ROWS;
}
}
if (pQueryAttr->uniqueQuery){
return MAX_UNIQUE_RESULT_ROWS;
}
return 1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册