提交 e06dce48 编写于 作者: D dapan1121

support stable udf

上级 34b2f7b8
...@@ -133,13 +133,13 @@ bool tscGroupbyColumn(SQueryInfo* pQueryInfo); ...@@ -133,13 +133,13 @@ bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscIsTopbotQuery(SQueryInfo* pQueryInfo); bool tscIsTopbotQuery(SQueryInfo* pQueryInfo);
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo); int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex); bool tscNonOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo); bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscIsTwoStageSTableQuery(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryTags(SQueryInfo* pQueryInfo); bool tscQueryTags(SQueryInfo* pQueryInfo);
bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo); bool tscQueryBlockInfo(SQueryInfo* pQueryInfo);
......
...@@ -300,7 +300,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde ...@@ -300,7 +300,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
param->groupOrderType = pQueryInfo->groupbyExpr.orderType; param->groupOrderType = pQueryInfo->groupbyExpr.orderType;
pReducer->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0); pReducer->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0);
pRes->code = tLoserTreeCreate(&pReducer->pLoserTree, pReducer->numOfBuffer, param, treeComparator); pRes->code = tLoserTreeCreate(&pReducer->pLoserTree, pReducer->numOfBuffer, param, treeComparator);
if (pReducer->pLoserTree == NULL || pRes->code != 0) { if (pReducer->pLoserTree == NULL || pRes->code != 0) {
...@@ -377,7 +377,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde ...@@ -377,7 +377,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
pReducer->pDesc->pColumnModel->capacity = 1; pReducer->pDesc->pColumnModel->capacity = 1;
// restore the limitation value at the last stage // restore the limitation value at the last stage
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
pQueryInfo->limit.limit = pQueryInfo->clauseLimit; pQueryInfo->limit.limit = pQueryInfo->clauseLimit;
pQueryInfo->limit.offset = pQueryInfo->prjOffset; pQueryInfo->limit.offset = pQueryInfo->prjOffset;
} }
...@@ -561,7 +561,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm ...@@ -561,7 +561,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
} }
// primary timestamp column is involved in final result // primary timestamp column is involved in final result
if (pQueryInfo->interval.interval != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (pQueryInfo->interval.interval != 0 || tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
numOfGroupByCols++; numOfGroupByCols++;
} }
...@@ -751,6 +751,10 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -751,6 +751,10 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) { if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) {
type = pModel->pFields[i].field.type; type = pModel->pFields[i].field.type;
bytes = pModel->pFields[i].field.bytes; bytes = pModel->pFields[i].field.bytes;
} else if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
type = pUdfInfo->resType;
bytes = pUdfInfo->resBytes;
} else { } else {
if (functionId == TSDB_FUNC_FIRST_DST) { if (functionId == TSDB_FUNC_FIRST_DST) {
functionId = TSDB_FUNC_FIRST; functionId = TSDB_FUNC_FIRST;
...@@ -1082,6 +1086,10 @@ static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool n ...@@ -1082,6 +1086,10 @@ static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool n
} }
pCtx->currentStage = MERGE_STAGE; pCtx->currentStage = MERGE_STAGE;
if (functionId < 0) {
continue;
}
if (needInit) { if (needInit) {
aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo); aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo);
} }
...@@ -1093,7 +1101,24 @@ static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool n ...@@ -1093,7 +1101,24 @@ static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool n
continue; continue;
} }
aAggs[functionId].mergeFunc(&pLocalMerge->pCtx[j]); if (functionId < 0) {
int32_t output = 0;
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
assert (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE);
if (pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE]) {
(*(udfMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pLocalMerge->pCtx[j].pInput, pLocalMerge->pCtx[j].size, pLocalMerge->pCtx[j].pOutput, &output, &pUdfInfo->init);
// set the output value exist
pLocalMerge->pCtx[j].resultInfo->numOfRes = output;
if (output > 0) {
pLocalMerge->pCtx[j].resultInfo->hasResult = DATA_SET_FLAG;
}
}
} else {
aAggs[functionId].mergeFunc(&pLocalMerge->pCtx[j]);
}
} }
} }
...@@ -1168,12 +1193,30 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo ...@@ -1168,12 +1193,30 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
free(buf); free(buf);
} }
int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) { int32_t finalizeRes(SSqlCmd *pCmd, SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo); size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) { for (int32_t k = 0; k < size; ++k) {
SQLFunctionCtx* pCtx = &pLocalMerge->pCtx[k]; SQLFunctionCtx* pCtx = &pLocalMerge->pCtx[k];
aAggs[pCtx->functionId].xFinalize(pCtx);
if (pCtx->functionId < 0) {
int32_t output = 0;
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * pCtx->functionId - 1);
assert (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE);
if (pUdfInfo && pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE]) {
(*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, &output, &pUdfInfo->init);
// set the output value exist
pCtx->resultInfo->numOfRes = output;
if (output > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
}
}
} else {
aAggs[pCtx->functionId].xFinalize(pCtx);
}
} }
pLocalMerge->hasPrevRow = false; pLocalMerge->hasPrevRow = false;
...@@ -1192,13 +1235,13 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) { ...@@ -1192,13 +1235,13 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {
* results generated by simple aggregation function, we merge them all into one points * results generated by simple aggregation function, we merge them all into one points
* *Exception*: column projection query, required no merge procedure * *Exception*: column projection query, required no merge procedure
*/ */
bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { bool needToMerge(SSqlCmd *pCmd, SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
int32_t ret = 0; // merge all result by default int32_t ret = 0; // merge all result by default
int16_t functionId = pLocalMerge->pCtx[0].functionId; int16_t functionId = pLocalMerge->pCtx[0].functionId;
// todo opt performance // todo opt performance
if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0) && pQueryInfo->distinctTag == false)) { // column projection query if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && pQueryInfo->distinctTag == false)) { // column projection query
ret = 1; // disable merge procedure ret = 1; // disable merge procedure
} else { } else {
tOrderDescriptor *pDesc = pLocalMerge->pDesc; tOrderDescriptor *pDesc = pLocalMerge->pDesc;
...@@ -1522,7 +1565,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1522,7 +1565,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
} }
if (pLocalMerge->hasPrevRow) { if (pLocalMerge->hasPrevRow) {
if (needToMerge(pQueryInfo, pLocalMerge, tmpBuffer)) { if (needToMerge(pCmd, pQueryInfo, pLocalMerge, tmpBuffer)) {
// belong to the group of the previous row, continue process it // belong to the group of the previous row, continue process it
doExecuteFinalMerge(pCmd, pLocalMerge, false); doExecuteFinalMerge(pCmd, pLocalMerge, false);
...@@ -1533,7 +1576,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1533,7 +1576,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
* current row does not belong to the group of previous row. * current row does not belong to the group of previous row.
* so the processing of previous group is completed. * so the processing of previous group is completed.
*/ */
int32_t numOfRes = finalizeRes(pQueryInfo, pLocalMerge); int32_t numOfRes = finalizeRes(pCmd, pQueryInfo, pLocalMerge);
bool sameGroup = isSameGroup(pCmd, pLocalMerge, pLocalMerge->prevRowOfInput, tmpBuffer); bool sameGroup = isSameGroup(pCmd, pLocalMerge, pLocalMerge->prevRowOfInput, tmpBuffer);
tFilePage *pResBuf = pLocalMerge->pResultBuf; tFilePage *pResBuf = pLocalMerge->pResultBuf;
...@@ -1605,7 +1648,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1605,7 +1648,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
} }
if (pLocalMerge->hasPrevRow) { if (pLocalMerge->hasPrevRow) {
finalizeRes(pQueryInfo, pLocalMerge); finalizeRes(pCmd, pQueryInfo, pLocalMerge);
} }
if (pLocalMerge->pResultBuf->num) { if (pLocalMerge->pResultBuf->num) {
......
...@@ -5193,7 +5193,7 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* ...@@ -5193,7 +5193,7 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode*
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
// orderby ts query on super table // orderby ts query on super table
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
addPrimaryTsColIntoResult(pQueryInfo); addPrimaryTsColIntoResult(pQueryInfo);
} }
} }
...@@ -5817,13 +5817,13 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn ...@@ -5817,13 +5817,13 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
// todo refactor // todo refactor
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
if (!tscQueryTags(pQueryInfo)) { // local handle the super table tag query if (!tscQueryTags(pQueryInfo)) { // local handle the super table tag query
if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) { if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
// for projection query on super table, all queries are subqueries // for projection query on super table, all queries are subqueries
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY)) { !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY)) {
pQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; pQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
} }
...@@ -5859,7 +5859,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn ...@@ -5859,7 +5859,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
pQueryInfo->prjOffset = pQueryInfo->limit.offset; pQueryInfo->prjOffset = pQueryInfo->limit.offset;
pQueryInfo->vgroupLimit = -1; pQueryInfo->vgroupLimit = -1;
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
/* /*
* the offset value should be removed during retrieve data from virtual node, since the * the offset value should be removed during retrieve data from virtual node, since the
* global order are done in client side, so the offset is applied at the client side * global order are done in client side, so the offset is applied at the client side
...@@ -6462,7 +6462,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { ...@@ -6462,7 +6462,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
} }
// projection query on super table does not compatible with "group by" syntax // projection query on super table does not compatible with "group by" syntax
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
} }
......
...@@ -2485,7 +2485,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -2485,7 +2485,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
tscSetResRawPtr(pRes, pQueryInfo); tscSetResRawPtr(pRes, pQueryInfo);
} else if ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) { } else if ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) {
tscSetResRawPtr(pRes, pQueryInfo); tscSetResRawPtr(pRes, pQueryInfo);
} else if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { } else if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
tscSetResRawPtr(pRes, pQueryInfo); tscSetResRawPtr(pRes, pQueryInfo);
} }
......
...@@ -560,7 +560,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { ...@@ -560,7 +560,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pCmd, pQueryInfo, 0)) {
return true; return true;
} }
...@@ -673,7 +673,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -673,7 +673,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if (!tscIsTwoStageSTableQuery(pCmd, pQueryInfo, 0)) {
return; return;
} }
...@@ -724,7 +724,7 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -724,7 +724,7 @@ void taos_stop_query(TAOS_RES *res) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if (tscIsTwoStageSTableQuery(pCmd, pQueryInfo, 0)) {
assert(pSql->rpcRid <= 0); assert(pSql->rpcRid <= 0);
tscKillSTableQuery(pSql); tscKillSTableQuery(pSql);
} else { } else {
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "tscSubquery.h" #include "tscSubquery.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tsclient.h" #include "tsclient.h"
#include "qUdf.h"
#include "qUtil.h" #include "qUtil.h"
typedef struct SInsertSupporter { typedef struct SInsertSupporter {
...@@ -653,7 +654,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -653,7 +654,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pTableMetaInfo->pVgroupTables != NULL); assert(pTableMetaInfo->pVgroupTables != NULL);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscNonOrderedProjectionQueryOnSTable(&pNew->cmd, pQueryInfo, 0)) {
SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables); SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables);
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
pTableMetaInfo->pVgroupTables = p; pTableMetaInfo->pVgroupTables = p;
...@@ -1442,7 +1443,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1442,7 +1443,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
} }
SSubqueryState* pState = &pParentSql->subState; SSubqueryState* pState = &pParentSql->subState;
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) { if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && numOfRows == 0) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1); assert(pQueryInfo->numOfTables == 1);
...@@ -1479,7 +1480,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1479,7 +1480,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
} }
// update the records for each subquery in parent sql object. // update the records for each subquery in parent sql object.
bool stableQuery = tscIsTwoStageSTableQuery(pQueryInfo, 0); bool stableQuery = tscIsTwoStageSTableQuery(pCmd, pQueryInfo, 0);
for (int32_t i = 0; i < pState->numOfSub; ++i) { for (int32_t i = 0; i < pState->numOfSub; ++i) {
if (pParentSql->pSubs[i] == NULL) { if (pParentSql->pSubs[i] == NULL) {
tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i); tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i);
...@@ -1576,7 +1577,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1576,7 +1577,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
} }
SQueryInfo* p = tscGetQueryInfoDetail(&pSub->cmd, 0); SQueryInfo* p = tscGetQueryInfoDetail(&pSub->cmd, 0);
orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0); orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(&pSub->cmd, p, 0);
if (orderedPrjQuery) { if (orderedPrjQuery) {
break; break;
} }
...@@ -1601,7 +1602,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1601,7 +1602,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows && if (tscNonOrderedProjectionQueryOnSTable(&pSub->cmd, pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
pSub->res.completed) { pSub->res.completed) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1); assert(pQueryInfo->numOfTables == 1);
...@@ -1804,7 +1805,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1804,7 +1805,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
} }
// In case of consequence query from other vnode, do not wait for other query response here. // In case of consequence query from other vnode, do not wait for other query response here.
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) { if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(&pSql->cmd, pQueryInfo, 0))) {
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return; return;
} }
...@@ -1816,7 +1817,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1816,7 +1817,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
* if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker * data instead of returning to its invoker
*/ */
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(&pSql->cmd, pQueryInfo, 0)) {
pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH; pSql->cmd.command = TSDB_SQL_FETCH;
...@@ -2791,6 +2792,22 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -2791,6 +2792,22 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
tscFreeRetrieveSup(pSql); tscFreeRetrieveSup(pSql);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < size; ++j) {
SQLFunctionCtx *pCtx = &pParentSql->res.pLocalMerger->pCtx[j];
int32_t functionId = pCtx->functionId;
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pParentSql->cmd.pUdfInfo, -1 * functionId - 1);
int32_t code = initUdfInfo(pUdfInfo);
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
}
}
}
// set the command flag must be after the semaphore been correctly set. // set the command flag must be after the semaphore been correctly set.
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
if (pParentSql->res.code == TSDB_CODE_SUCCESS) { if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
...@@ -2864,7 +2881,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -2864,7 +2881,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
tscDebug("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql, pSql, tscDebug("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql, pSql,
pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx); pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { SSqlCmd* pCmd = &pSql->cmd;
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64, tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
pParentSql, pSql, tsMaxNumOfOrderedResults, num); pParentSql, pSql, tsMaxNumOfOrderedResults, num);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY); tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
...@@ -3430,7 +3448,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { ...@@ -3430,7 +3448,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
bool allSubqueryExhausted = true; bool allSubqueryExhausted = true;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
......
...@@ -113,7 +113,7 @@ bool tscQueryBlockInfo(SQueryInfo* pQueryInfo) { ...@@ -113,7 +113,7 @@ bool tscQueryBlockInfo(SQueryInfo* pQueryInfo) {
return false; return false;
} }
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { bool tscIsTwoStageSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (pQueryInfo == NULL) { if (pQueryInfo == NULL) {
return false; return false;
} }
...@@ -128,7 +128,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { ...@@ -128,7 +128,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
} }
// for ordered projection query, iterate all qualified vnodes sequentially // for ordered projection query, iterate all qualified vnodes sequentially
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, tableIndex)) { if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, tableIndex)) {
return false; return false;
} }
...@@ -139,7 +139,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { ...@@ -139,7 +139,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
return false; return false;
} }
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
/* /*
...@@ -156,6 +156,15 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { ...@@ -156,6 +156,15 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
return false;
}
continue;
}
if (functionId != TSDB_FUNC_PRJ && if (functionId != TSDB_FUNC_PRJ &&
functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAGPRJ &&
functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TAG &&
...@@ -171,8 +180,8 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { ...@@ -171,8 +180,8 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
} }
// not order by timestamp projection query on super table // not order by timestamp projection query on super table
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { bool tscNonOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) { if (!tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, tableIndex)) {
return false; return false;
} }
...@@ -180,8 +189,8 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableI ...@@ -180,8 +189,8 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableI
return pQueryInfo->order.orderColId < 0; return pQueryInfo->order.orderColId < 0;
} }
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { bool tscOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) { if (!tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, tableIndex)) {
return false; return false;
} }
...@@ -659,6 +668,30 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) { ...@@ -659,6 +668,30 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) {
return NULL; return NULL;
} }
void freeUdfInfo(SUdfInfo* pUdfInfo) {
if (pUdfInfo == NULL) {
return;
}
if (pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY]) {
(*(udfDestroyFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY])(&pUdfInfo->init);
}
tfree(pUdfInfo->name);
if (pUdfInfo->path) {
unlink(pUdfInfo->path);
}
tfree(pUdfInfo->path);
tfree(pUdfInfo->content);
taosCloseDll(pUdfInfo->handle);
}
void* tscDestroyUdfArrayList(SArray* pUdfList) { void* tscDestroyUdfArrayList(SArray* pUdfList) {
if (pUdfList == NULL) { if (pUdfList == NULL) {
return NULL; return NULL;
...@@ -667,10 +700,7 @@ void* tscDestroyUdfArrayList(SArray* pUdfList) { ...@@ -667,10 +700,7 @@ void* tscDestroyUdfArrayList(SArray* pUdfList) {
size_t size = taosArrayGetSize(pUdfList); size_t size = taosArrayGetSize(pUdfList);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SUdfInfo* udf = taosArrayGet(pUdfList, i); SUdfInfo* udf = taosArrayGet(pUdfList, i);
if (udf) { freeUdfInfo(udf);
tfree(udf->content);
tfree(udf->name);
}
} }
taosArrayDestroy(pUdfList); taosArrayDestroy(pUdfList);
...@@ -2440,7 +2470,7 @@ void tscDoQuery(SSqlObj* pSql) { ...@@ -2440,7 +2470,7 @@ void tscDoQuery(SSqlObj* pSql) {
if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
tscProcessSql(pSql); tscProcessSql(pSql);
} else { // secondary stage join query. } else { // secondary stage join query.
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query if (tscIsTwoStageSTableQuery(pCmd, pQueryInfo, 0)) { // super table query
tscLockByThread(&pSql->squeryLock); tscLockByThread(&pSql->squeryLock);
tscHandleMasterSTableQuery(pSql); tscHandleMasterSTableQuery(pSql);
tscUnlockByThread(&pSql->squeryLock); tscUnlockByThread(&pSql->squeryLock);
...@@ -2454,7 +2484,7 @@ void tscDoQuery(SSqlObj* pSql) { ...@@ -2454,7 +2484,7 @@ void tscDoQuery(SSqlObj* pSql) {
} else if (tscMultiRoundQuery(pQueryInfo, 0) && pQueryInfo->round == 0) { } else if (tscMultiRoundQuery(pQueryInfo, 0) && pQueryInfo->round == 0) {
tscHandleFirstRoundStableQuery(pSql); // todo lock? tscHandleFirstRoundStableQuery(pSql); // todo lock?
return; return;
} else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query } else if (tscIsTwoStageSTableQuery(pCmd, pQueryInfo, 0)) { // super table query
tscLockByThread(&pSql->squeryLock); tscLockByThread(&pSql->squeryLock);
tscHandleMasterSTableQuery(pSql); tscHandleMasterSTableQuery(pSql);
tscUnlockByThread(&pSql->squeryLock); tscUnlockByThread(&pSql->squeryLock);
...@@ -2619,7 +2649,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { ...@@ -2619,7 +2649,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
} }
return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && return tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) &&
(!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1); (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1);
} }
...@@ -2637,7 +2667,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { ...@@ -2637,7 +2667,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
* no result returned from the current virtual node anymore, try the next vnode if exists * no result returned from the current virtual node anymore, try the next vnode if exists
* if case of: multi-vnode super table projection query * if case of: multi-vnode super table projection query
*/ */
assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes)); assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups; int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#ifndef TDENGINE_QUDF_H #ifndef TDENGINE_QUDF_H
#define TDENGINE_QUDF_H #define TDENGINE_QUDF_H
enum { TSDB_UDF_FUNC_NORMAL = 0, TSDB_UDF_FUNC_INIT, TSDB_UDF_FUNC_FINALIZE, TSDB_UDF_FUNC_DESTROY, TSDB_UDF_FUNC_MAX_NUM }; enum { TSDB_UDF_FUNC_NORMAL = 0, TSDB_UDF_FUNC_INIT, TSDB_UDF_FUNC_FINALIZE, TSDB_UDF_FUNC_MERGE, TSDB_UDF_FUNC_DESTROY, TSDB_UDF_FUNC_MAX_NUM };
...@@ -43,16 +43,15 @@ typedef struct SUdfInfo { ...@@ -43,16 +43,15 @@ typedef struct SUdfInfo {
void *handle; // handle loaded in mem void *handle; // handle loaded in mem
void *funcs[TSDB_UDF_FUNC_MAX_NUM]; // function ptr void *funcs[TSDB_UDF_FUNC_MAX_NUM]; // function ptr
SUdfInit init; SUdfInit init;
union { // file path or [in memory] binary content char *content;
char *content; char *path;
char *path;
};
} SUdfInfo; } SUdfInfo;
typedef void (*udfNormalFunc)(char* data, int16_t itype, int16_t iBytes, int32_t numOfRows, int64_t* ts, char* dataOutput, char* tsOutput, typedef void (*udfNormalFunc)(char* data, int16_t itype, int16_t iBytes, int32_t numOfRows, int64_t* ts, char* dataOutput, char* tsOutput,
int32_t* numOfOutput, int16_t oType, int16_t oByte, SUdfInit* buf); int32_t* numOfOutput, int16_t oType, int16_t oBytes, SUdfInit* buf);
typedef int32_t (*udfInitFunc)(SUdfInit* data); typedef int32_t (*udfInitFunc)(SUdfInit* data);
typedef void (*udfFinalizeFunc)(char* dataOutput, int32_t* numOfOutput, SUdfInit* buf); typedef void (*udfFinalizeFunc)(char* dataOutput, int32_t* numOfOutput, SUdfInit* buf);
typedef void (*udfMergeFunc)(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf);
typedef void (*udfDestroyFunc)(SUdfInit* buf); typedef void (*udfDestroyFunc)(SUdfInit* buf);
......
...@@ -90,4 +90,6 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); ...@@ -90,4 +90,6 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t* offset); int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t* offset);
int32_t initUdfInfo(SUdfInfo* pUdfInfo);
#endif // TDENGINE_QUERYUTIL_H #endif // TDENGINE_QUERYUTIL_H
...@@ -6008,6 +6008,8 @@ void destroyUdfInfo(SUdfInfo* pUdfInfo) { ...@@ -6008,6 +6008,8 @@ void destroyUdfInfo(SUdfInfo* pUdfInfo) {
} }
tfree(pUdfInfo->path); tfree(pUdfInfo->path);
tfree(pUdfInfo->content);
taosCloseDll(pUdfInfo->handle); taosCloseDll(pUdfInfo->handle);
...@@ -6027,6 +6029,9 @@ static char* getUdfFuncName(char* name, int type) { ...@@ -6027,6 +6029,9 @@ static char* getUdfFuncName(char* name, int type) {
case TSDB_UDF_FUNC_FINALIZE: case TSDB_UDF_FUNC_FINALIZE:
sprintf(funcname, "%s_finalize", name); sprintf(funcname, "%s_finalize", name);
break; break;
case TSDB_UDF_FUNC_MERGE:
sprintf(funcname, "%s_merge", name);
break;
case TSDB_UDF_FUNC_DESTROY: case TSDB_UDF_FUNC_DESTROY:
sprintf(funcname, "%s_destroy", name); sprintf(funcname, "%s_destroy", name);
break; break;
...@@ -6038,7 +6043,7 @@ static char* getUdfFuncName(char* name, int type) { ...@@ -6038,7 +6043,7 @@ static char* getUdfFuncName(char* name, int type) {
return funcname; return funcname;
} }
static int32_t initUdfInfo(SUdfInfo* pUdfInfo) { int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
if (pUdfInfo == NULL) { if (pUdfInfo == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -6088,6 +6093,7 @@ static int32_t initUdfInfo(SUdfInfo* pUdfInfo) { ...@@ -6088,6 +6093,7 @@ static int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_FINALIZE)); pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_FINALIZE));
pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_MERGE));
} }
pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_DESTROY)); pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_DESTROY));
......
...@@ -10,12 +10,12 @@ typedef struct SUdfInit{ ...@@ -10,12 +10,12 @@ typedef struct SUdfInit{
int const_item; /* 0 if result is independent of arguments */ int const_item; /* 0 if result is independent of arguments */
} SUdfInit; } SUdfInit;
void add_one(char* data, char type, int numOfRows, long long* ts, char* dataOutput, char* tsOutput, void add_one(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* tsOutput,
int* numOfOutput, SUdfInit* buf) { int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i; int i;
int r = 0; int r = 0;
printf("add_one input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, type, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf); printf("add_one input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (type == 4) { if (itype == 4) {
for(i=0;i<numOfRows;++i) { for(i=0;i<numOfRows;++i) {
printf("input %d - %d", i, *((int *)data + i)); printf("input %d - %d", i, *((int *)data + i));
*((int *)dataOutput+i)=*((int *)data + i) + 1; *((int *)dataOutput+i)=*((int *)data + i) + 1;
......
...@@ -35,27 +35,27 @@ void sum_double(char* data, short itype, short ibytes, int numOfRows, long long* ...@@ -35,27 +35,27 @@ void sum_double(char* data, short itype, short ibytes, int numOfRows, long long*
void sum_double_finalize(char* dataOutput, int* numOfOutput, SUdfInit* buf) { void sum_double_finalize(char* dataOutput, int* numOfOutput, SUdfInit* buf) {
int i; int i;
int r = 0; int r = 0;
printf("sum_double_finalize dataoutput:%p, numOfOutput:%p, buf:%p\n", dataOutput, numOfOutput, buf); printf("sum_double_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf);
*numOfOutput=1; *numOfOutput=1;
*(int*)(buf->ptr)=*(int*)dataOutput*2; *(int*)(buf->ptr)=*(int*)dataOutput*2;
*(int*)dataOutput=*(int*)(buf->ptr); *(int*)dataOutput=*(int*)(buf->ptr);
printf("sum_double finalize, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput); printf("sum_double finalize, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
} }
void sum_double_merge(char* dataOutput, int* numOfOutput, SUdfInit* buf) { void sum_double_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
int r = 0; int r = 0;
int sum = 0; int sum = 0;
printf("sum_double_merge dataoutput:%p, numOfOutput:%p, buf:%p\n", dataOutput, numOfOutput, buf); printf("sum_double_merge dataoutput:%p, numOfOutput:%d, buf:%p\n", dataOutput, *numOfOutput, buf);
for (int i = 0; i < *numOfOutput; ++i) { for (int i = 0; i < numOfRows; ++i) {
printf("sum_double_merge %d - %d\n", i, *((int*)dataOutput + i)); printf("sum_double_merge %d - %d\n", i, *((int*)data + i));
sum +=*((int*)dataOutput + i); sum +=*((int*)data + i);
} }
*(int*)dataOutput=sum; *(int*)dataOutput+=sum;
*numOfOutput=1; *numOfOutput=1;
printf("sum_double sum_double_merge, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput); printf("sum_double_merge, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册