未验证 提交 2926d141 编写于 作者: G Ganlin Zhao 提交者: GitHub

Merge pull request #13784 from taosdata/enh/first_last_split

enh(query): add first/last function distributed implementation
...@@ -138,6 +138,10 @@ typedef enum EFunctionType { ...@@ -138,6 +138,10 @@ typedef enum EFunctionType {
FUNCTION_TYPE_TOP_MERGE, FUNCTION_TYPE_TOP_MERGE,
FUNCTION_TYPE_BOTTOM_PARTIAL, FUNCTION_TYPE_BOTTOM_PARTIAL,
FUNCTION_TYPE_BOTTOM_MERGE, FUNCTION_TYPE_BOTTOM_MERGE,
FUNCTION_TYPE_FIRST_PARTIAL,
FUNCTION_TYPE_FIRST_MERGE,
FUNCTION_TYPE_LAST_PARTIAL,
FUNCTION_TYPE_LAST_MERGE,
// user defined funcion // user defined funcion
FUNCTION_TYPE_UDF = 10000 FUNCTION_TYPE_UDF = 10000
......
...@@ -93,10 +93,14 @@ int32_t diffFunction(SqlFunctionCtx *pCtx); ...@@ -93,10 +93,14 @@ int32_t diffFunction(SqlFunctionCtx *pCtx);
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t firstFunction(SqlFunctionCtx *pCtx);
int32_t firstFunctionMerge(SqlFunctionCtx *pCtx);
int32_t lastFunction(SqlFunctionCtx *pCtx); int32_t lastFunction(SqlFunctionCtx *pCtx);
int32_t lastFunctionMerge(SqlFunctionCtx *pCtx);
int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getFirstLastInfoSize(int32_t resBytes);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
......
...@@ -954,6 +954,41 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l ...@@ -954,6 +954,41 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
// first(col_list) will be rewritten as first(col)
if (2 != LIST_LENGTH(pFunc->pParameterList)) { //input has two params c0,ts, is this a bug?
return TSDB_CODE_SUCCESS;
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
uint8_t paraType = ((SExprNode*)pPara)->resType.type;
int32_t paraBytes = ((SExprNode*)pPara)->resType.bytes;
if (isPartial) {
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The parameters of first/last can only be columns");
}
pFunc->node.resType = (SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE,
.type = TSDB_DATA_TYPE_BINARY};
} else {
if (TSDB_DATA_TYPE_BINARY != paraType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = ((SExprNode*)pPara)->resType;
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateFirstLastPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateFirstLastImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateFirstLastMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateFirstLastImpl(pFunc, pErrBuf, len, false);
}
static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) { if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
...@@ -1704,6 +1739,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1704,6 +1739,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = firstFunction, .processFunc = firstFunction,
.finalizeFunc = firstLastFinalize, .finalizeFunc = firstLastFinalize,
.pPartialFunc = "_first_partial",
.pMergeFunc = "_first_merge",
.combineFunc = firstCombine,
},
{
.name = "_first_partial",
.type = FUNCTION_TYPE_FIRST_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateFirstLastPartial,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = firstFunction,
.finalizeFunc = firstLastPartialFinalize,
.combineFunc = firstCombine,
},
{
.name = "_first_merge",
.type = FUNCTION_TYPE_FIRST_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateFirstLastMerge,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = firstFunctionMerge,
.finalizeFunc = firstLastFinalize,
.combineFunc = firstCombine, .combineFunc = firstCombine,
}, },
{ {
...@@ -1715,6 +1774,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1715,6 +1774,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = lastFunction, .processFunc = lastFunction,
.finalizeFunc = firstLastFinalize, .finalizeFunc = firstLastFinalize,
.pPartialFunc = "_last_partial",
.pMergeFunc = "_last_merge",
.combineFunc = lastCombine,
},
{
.name = "_last_partial",
.type = FUNCTION_TYPE_LAST_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateFirstLastPartial,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastFunction,
.finalizeFunc = firstLastPartialFinalize,
.combineFunc = lastCombine,
},
{
.name = "_last_merge",
.type = FUNCTION_TYPE_LAST_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateFirstLastMerge,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastFunctionMerge,
.finalizeFunc = firstLastFinalize,
.combineFunc = lastCombine, .combineFunc = lastCombine,
}, },
{ {
......
...@@ -71,6 +71,12 @@ typedef struct STopBotRes { ...@@ -71,6 +71,12 @@ typedef struct STopBotRes {
STopBotResItem* pItems; STopBotResItem* pItems;
} STopBotRes; } STopBotRes;
typedef struct SFirstLastRes {
bool hasResult;
int32_t bytes;
char buf[];
} SFirstLastRes;
typedef struct SStddevRes { typedef struct SStddevRes {
double result; double result;
int64_t count; int64_t count;
...@@ -2230,9 +2236,13 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) ...@@ -2230,9 +2236,13 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getFirstLastInfoSize(int32_t resBytes) {
return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t);
}
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); SColumnNode* pNode = (SColumnNode *)nodesListGetNode(pFunc->pParameterList, 0);
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t); pEnv->calcMemSize = sizeof(SFirstLastRes) + pNode->node.resType.bytes + sizeof(int64_t);
return true; return true;
} }
...@@ -2256,12 +2266,13 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { ...@@ -2256,12 +2266,13 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0; int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo); SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
int32_t bytes = pInputCol->info.bytes; int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes;
// All null data column, return directly. // All null data column, return directly.
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
...@@ -2279,7 +2290,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { ...@@ -2279,7 +2290,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
if (blockDataOrder == TSDB_ORDER_ASC) { if (blockDataOrder == TSDB_ORDER_ASC) {
// filter according to current result firstly // filter according to current result firstly
if (pResInfo->numOfRes > 0) { if (pResInfo->numOfRes > 0) {
TSKEY ts = *(TSKEY*)(buf + bytes); TSKEY ts = *(TSKEY*)(pInfo->buf + bytes);
if (ts < startKey) { if (ts < startKey) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2295,9 +2306,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { ...@@ -2295,9 +2306,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) {
memcpy(buf, data, bytes); memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts; *(TSKEY*)(pInfo->buf + bytes) = cts;
pInfo->hasResult = true;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts); // DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
...@@ -2308,7 +2320,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { ...@@ -2308,7 +2320,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
// in case of descending order time stamp serial, which usually happens as the results of the nest query, // in case of descending order time stamp serial, which usually happens as the results of the nest query,
// all data needs to be check. // all data needs to be check.
if (pResInfo->numOfRes > 0) { if (pResInfo->numOfRes > 0) {
TSKEY ts = *(TSKEY*)(buf + bytes); TSKEY ts = *(TSKEY*)(pInfo->buf + bytes);
if (ts < endKey) { if (ts < endKey) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2324,9 +2336,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { ...@@ -2324,9 +2336,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) {
memcpy(buf, data, bytes); memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts; *(TSKEY*)(pInfo->buf + bytes) = cts;
pInfo->hasResult = true;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts); // DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
break; break;
...@@ -2342,12 +2355,13 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2342,12 +2355,13 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0; int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo); SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
int32_t bytes = pInputCol->info.bytes; int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes;
// All null data column, return directly. // All null data column, return directly.
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
...@@ -2372,10 +2386,11 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2372,10 +2386,11 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) {
memcpy(buf, data, bytes); memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts; *(TSKEY*)(pInfo->buf + bytes) = cts;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts); // DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pInfo->hasResult = true;
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
} }
break; break;
...@@ -2390,9 +2405,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2390,9 +2405,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) {
memcpy(buf, data, bytes); memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts; *(TSKEY*)(pInfo->buf + bytes) = cts;
pInfo->hasResult = true;
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts); // DO_UPDATE_TAG_COLUMNS(pCtx, ts);
} }
...@@ -2404,6 +2420,56 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2404,6 +2420,56 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) {
if (!pInput->hasResult) {
return;
}
pOutput->bytes = pInput->bytes;
TSKEY *tsIn = (TSKEY*)(pInput->buf + pInput->bytes);
TSKEY *tsOut = (TSKEY*)(pOutput->buf + pInput->bytes);
if (pOutput->hasResult) {
if (isFirst) {
if (*tsIn > *tsOut) {
return;
}
} else {
if (*tsIn < *tsOut) {
return;
}
}
}
*tsOut = *tsIn;
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
pOutput->hasResult = true;
return;
}
static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx *pCtx, bool isFirstQuery) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start);
SFirstLastRes* pInputInfo = (SFirstLastRes *)varDataVal(data);
firstLastTransferInfo(pInputInfo, pInfo, isFirstQuery);
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
return TSDB_CODE_SUCCESS;
}
int32_t firstFunctionMerge(SqlFunctionCtx *pCtx) {
return firstLastFunctionMergeImpl(pCtx, true);
}
int32_t lastFunctionMerge(SqlFunctionCtx *pCtx) {
return firstLastFunctionMergeImpl(pCtx, false);
}
int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
...@@ -2411,12 +2477,30 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -2411,12 +2477,30 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
char* in = GET_ROWCELL_INTERBUF(pResInfo); SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes); colDataAppend(pCol, pBlock->info.rows, pRes->buf, pResInfo->isNullRes);
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getFirstLastInfoSize(pRes->bytes);
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pRes, resultBytes);
varDataSetLen(res, resultBytes);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
colDataAppend(pCol, pBlock->info.rows, res, false);
taosMemoryFree(res);
return 1;
}
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册