未验证 提交 61974b9f 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20605 from taosdata/fix/TD-23272

fix:add combine function for groupKey
......@@ -241,6 +241,7 @@ int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
bool fmIsInvertible(int32_t funcId);
char* fmGetFuncName(int32_t funcId);
#ifdef __cplusplus
}
......
......@@ -2110,10 +2110,12 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
} else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
qError("%s apply combine functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
}
} else if (pDestCtx[k].fpSet.combine == NULL) {
char* funName = fmGetFuncName(pDestCtx[k].functionId);
qError("%s error, combine funcion for %s is not implemented", GET_TASKID(pTaskInfo), funName);
taosMemoryFreeClear(funName);
}
}
}
......
......@@ -235,6 +235,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t groupKeyFunction(SqlFunctionCtx* pCtx);
int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
#ifdef __cplusplus
}
......
......@@ -2480,7 +2480,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "irate",
.type = FUNCTION_TYPE_IRATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateIrate,
.getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup,
......@@ -3234,6 +3234,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup,
.processFunc = groupKeyFunction,
.finalizeFunc = groupKeyFinalize,
.combineFunc = groupKeyCombine,
.pPartialFunc = "_group_key",
.pMergeFunc = "_group_key"
},
......
......@@ -5900,6 +5900,39 @@ int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes;
}
int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SGroupKeyInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SGroupKeyInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
// escape rest of data blocks to avoid first entry to be overwritten.
if (pDBuf->hasResult) {
goto _group_key_over;
}
if (pSBuf->isNull) {
pDBuf->isNull = true;
pDBuf->hasResult = true;
goto _group_key_over;
}
if (IS_VAR_DATA_TYPE(pSourceCtx->resDataInfo.type)) {
memcpy(pDBuf->data, pSBuf->data,
(pSourceCtx->resDataInfo.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(pSBuf->data) : varDataTLen(pSBuf->data));
} else {
memcpy(pDBuf->data, pSBuf->data, pSourceCtx->resDataInfo.bytes);
}
pDBuf->hasResult = true;
_group_key_over:
SET_VAL(pDResInfo, 1, 1);
return TSDB_CODE_SUCCESS;
}
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
......
......@@ -447,3 +447,10 @@ int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc
return code;
}
char* fmGetFuncName(int32_t funcId) {
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return taosStrdup("invalid function");
}
return taosStrdup(funcMgtBuiltins[funcId].name);
}
......@@ -272,4 +272,122 @@ if $data12 != 2 then
goto loop3
endi
print ===== step3
sql drop database if exists test4;
sql create database test4 vgroups 10;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c varchar(250) ) tags(ta int,tb int,tc int);
sql create table aaa using st tags(1,1,1);
sql create table bbb using st tags(2,2,2);
sql create table ccc using st tags(3,2,2);
sql create table ddd using st tags(4,2,2);
sql create stream streams1 ignore expired 0 fill_history 0 watermark 3s into streamst subtable(c) as select _wstart, c , count(*) c1 from st partition by c interval(1s) ;
sql insert into aaa values(1648791221001,2,2,"/a1/aa/aa");
sql insert into bbb values(1648791221001,2,2,"/a1/aa/aa");
sql insert into ccc values(1648791221001,2,2,"/a1/aa/aa");
sql insert into ddd values(1648791221001,2,2,"/a1/aa/aa");
sql insert into aaa values(1648791222002,2,2,"/a2/aa/aa");
sql insert into bbb values(1648791222002,2,2,"/a2/aa/aa");
sql insert into ccc values(1648791222002,2,2,"/a2/aa/aa");
sql insert into ddd values(1648791222002,2,2,"/a2/aa/aa");
sql insert into aaa values(1648791223003,2,2,"/a3/aa/aa");
sql insert into bbb values(1648791223003,2,2,"/a3/aa/aa");
sql insert into ccc values(1648791223003,2,2,"/a3/aa/aa");
sql insert into ddd values(1648791223003,2,2,"/a3/aa/aa");
sql insert into aaa values(1648791224003,2,2,"/a4/aa/aa");
sql insert into bbb values(1648791224003,2,2,"/a4/aa/aa");
sql insert into ccc values(1648791224003,2,2,"/a4/aa/aa");
sql insert into ddd values(1648791224003,2,2,"/a4/aa/aa");
sql insert into aaa values(1648791225003,2,2,"/a5/aa/aa");
sql insert into bbb values(1648791225003,2,2,"/a5/aa/aa");
sql insert into ccc values(1648791225003,2,2,"/a5/aa/aa");
sql insert into ddd values(1648791225003,2,2,"/a5/aa/aa");
sql insert into aaa values(1648791226003,2,2,"/a6/aa/aa");
sql insert into bbb values(1648791226003,2,2,"/a6/aa/aa");
sql insert into ccc values(1648791226003,2,2,"/a6/aa/aa");
sql insert into ddd values(1648791226003,2,2,"/a6/aa/aa");
$loop_count = 0
loop4:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamst;
if $rows == 0 then
goto loop4
endi
sql delete from aaa where ts = 1648791223003 ;
$loop_count = 0
loop5:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamst;
if $rows == 0 then
goto loop5
endi
sql delete from ccc;
$loop_count = 0
loop6:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamst;
if $rows == 0 then
goto loop6
endi
sql delete from ddd;
$loop_count = 0
loop7:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamst;
if $rows == 0 then
goto loop7
endi
print ===== over
system sh/stop_dnodes.sh
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册