未验证 提交 650334c7 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #8369 from taosdata/feature/TD-10501

Feature/td 10501
...@@ -3112,10 +3112,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -3112,10 +3112,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName)); memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName));
getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1); getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1);
SSchema s = {0}; SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
s.type = (uint8_t)resType;
s.bytes = bytes;
s.colId = pExpr->base.colInfo.colId;
uint64_t uid = pTableMetaInfo->pTableMeta->id.uid; uint64_t uid = pTableMetaInfo->pTableMeta->id.uid;
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
...@@ -3123,7 +3120,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -3123,7 +3120,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
insertResultField(pQueryInfo, colIndex, &ids, pUdfInfo->resBytes, pUdfInfo->resType, pExpr->base.aliasName, pExpr); insertResultField(pQueryInfo, colIndex, &ids, pUdfInfo->resBytes, pUdfInfo->resType, pExpr->base.aliasName, pExpr);
} else { } else {
for (int32_t i = 0; i < ids.num; ++i) { for (int32_t i = 0; i < ids.num; ++i) {
tscColumnListInsert(pQueryInfo->colList, index.columnIndex, uid, &s); tscColumnListInsert(pQueryInfo->colList, index.columnIndex, uid, pSchema);
} }
} }
tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid); tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid);
...@@ -4227,7 +4224,11 @@ static int32_t validateSQLExpr(SSqlCmd* pCmd, tSqlExpr* pExpr, SQueryInfo* pQuer ...@@ -4227,7 +4224,11 @@ static int32_t validateSQLExpr(SSqlCmd* pCmd, tSqlExpr* pExpr, SQueryInfo* pQuer
// Append the sqlExpr into exprList of pQueryInfo structure sequentially // Append the sqlExpr into exprList of pQueryInfo structure sequentially
pExpr->functionId = isValidFunction(pExpr->Expr.operand.z, pExpr->Expr.operand.n); pExpr->functionId = isValidFunction(pExpr->Expr.operand.z, pExpr->Expr.operand.n);
if (pExpr->functionId < 0) { if (pExpr->functionId < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION; SUdfInfo* pUdfInfo = NULL;
pUdfInfo = isValidUdf(pQueryInfo->pUdfInfo, pExpr->Expr.operand.z, pExpr->Expr.operand.n);
if (pUdfInfo == NULL) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "invalid function name");
}
} }
if (addExprAndResultField(pCmd, pQueryInfo, outputIndex, &item, false, NULL) != TSDB_CODE_SUCCESS) { if (addExprAndResultField(pCmd, pQueryInfo, outputIndex, &item, false, NULL) != TSDB_CODE_SUCCESS) {
...@@ -5738,6 +5739,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5738,6 +5739,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
const char* msg8 = "only column in groupby clause allowed as order column"; const char* msg8 = "only column in groupby clause allowed as order column";
const char* msg9 = "orderby column must projected in subquery"; const char* msg9 = "orderby column must projected in subquery";
const char* msg10 = "not support distinct mixed with order by"; const char* msg10 = "not support distinct mixed with order by";
const char* msg11 = "not support order with udf";
setDefaultOrderInfo(pQueryInfo); setDefaultOrderInfo(pQueryInfo);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
...@@ -5777,6 +5779,19 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5777,6 +5779,19 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
SStrToken columnName = {pVar->nLen, pVar->nType, pVar->pz}; SStrToken columnName = {pVar->nLen, pVar->nType, pVar->pz};
SColumnIndex index = COLUMN_INDEX_INITIALIZER; SColumnIndex index = COLUMN_INDEX_INITIALIZER;
bool udf = false;
if (pQueryInfo->pUdfInfo && taosArrayGetSize(pQueryInfo->pUdfInfo) > 0) {
int32_t usize = taosArrayGetSize(pQueryInfo->pUdfInfo);
for (int32_t i = 0; i < usize; ++i) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, i);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_SCALAR) {
udf = true;
break;
}
}
}
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // super table query if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // super table query
if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
...@@ -5832,6 +5847,9 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5832,6 +5847,9 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
pQueryInfo->groupbyExpr.orderType = p1->sortOrder; pQueryInfo->groupbyExpr.orderType = p1->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
if (udf) {
return invalidOperationMsg(pMsgBuf, msg11);
}
} else if (isTopBottomQuery(pQueryInfo)) { } else if (isTopBottomQuery(pQueryInfo)) {
/* order of top/bottom query in interval is not valid */ /* order of top/bottom query in interval is not valid */
...@@ -5853,6 +5871,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5853,6 +5871,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
} else { } else {
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
if (udf) {
return invalidOperationMsg(pMsgBuf, msg11);
}
pQueryInfo->order.order = p1->sortOrder; pQueryInfo->order.order = p1->sortOrder;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
...@@ -5880,9 +5902,15 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5880,9 +5902,15 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
} else if (orderByGroupbyCol){ } else if (orderByGroupbyCol){
pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = index.columnIndex; pQueryInfo->order.orderColId = index.columnIndex;
if (udf) {
return invalidOperationMsg(pMsgBuf, msg11);
}
} else { } else {
pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
if (udf) {
return invalidOperationMsg(pMsgBuf, msg11);
}
} }
pItem = taosArrayGet(pSqlNode->pSortOrder, 1); pItem = taosArrayGet(pSqlNode->pSortOrder, 1);
...@@ -5918,6 +5946,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5918,6 +5946,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
return invalidOperationMsg(pMsgBuf, msg7); return invalidOperationMsg(pMsgBuf, msg7);
} }
if (udf) {
return invalidOperationMsg(pMsgBuf, msg11);
}
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId; pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId;
pQueryInfo->groupbyExpr.orderType = p1->sortOrder; pQueryInfo->groupbyExpr.orderType = p1->sortOrder;
...@@ -5951,6 +5983,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5951,6 +5983,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (udf) {
return invalidOperationMsg(pMsgBuf, msg11);
}
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0); tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
...@@ -5963,6 +5999,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5963,6 +5999,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
return invalidOperationMsg(pMsgBuf, msg1); return invalidOperationMsg(pMsgBuf, msg1);
} }
if (udf) {
return invalidOperationMsg(pMsgBuf, msg11);
}
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0); tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
...@@ -7298,6 +7338,11 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char* ...@@ -7298,6 +7338,11 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
} }
if (f < 0) { if (f < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * f - 1);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_SCALAR) {
return invalidOperationMsg(msg, msg1);
}
continue; continue;
} }
...@@ -7321,6 +7366,10 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char* ...@@ -7321,6 +7366,10 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
return invalidOperationMsg(msg, msg1); return invalidOperationMsg(msg, msg1);
} }
if (IS_SCALAR_FUNCTION(aAggs[f].status)) {
return invalidOperationMsg(msg, msg1);
}
if (f == TSDB_FUNC_COUNT && pExpr->base.colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) { if (f == TSDB_FUNC_COUNT && pExpr->base.colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) {
return invalidOperationMsg(msg, msg1); return invalidOperationMsg(msg, msg1);
} }
...@@ -8559,12 +8608,33 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -8559,12 +8608,33 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (functionId < 0) { if (functionId < 0) {
struct SUdfInfo info = {0}; struct SUdfInfo info = {0};
info.name = strndup(t->z, t->n); info.name = strndup(t->z, t->n);
info.keep = true;
if (pQueryInfo->pUdfInfo == NULL) { if (pQueryInfo->pUdfInfo == NULL) {
pQueryInfo->pUdfInfo = taosArrayInit(4, sizeof(struct SUdfInfo)); pQueryInfo->pUdfInfo = taosArrayInit(4, sizeof(struct SUdfInfo));
} else if (taosArrayGetSize(pQueryInfo->pUdfInfo) > 0) {
int32_t usize = (int32_t)taosArrayGetSize(pQueryInfo->pUdfInfo);
int32_t exist = 0;
for (int32_t j = 0; j < usize; ++j) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, j);
int32_t len = strlen(pUdfInfo->name);
if (len == t->n && strncasecmp(info.name, pUdfInfo->name, t->n) == 0) {
exist = 1;
break;
}
}
if (exist) {
continue;
}
} }
info.functionId = (int32_t)taosArrayGetSize(pQueryInfo->pUdfInfo) * (-1) - 1;; info.functionId = (int32_t)taosArrayGetSize(pQueryInfo->pUdfInfo) * (-1) - 1;;
taosArrayPush(pQueryInfo->pUdfInfo, &info); taosArrayPush(pQueryInfo->pUdfInfo, &info);
if (taosArrayGetSize(pQueryInfo->pUdfInfo) > 1) {
code = tscInvalidOperationMsg(pCmd->payload, "only one udf allowed", NULL);
goto _end;
}
} }
} }
} }
......
...@@ -1102,6 +1102,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1102,6 +1102,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// support only one udf // support only one udf
if (pQueryInfo->pUdfInfo != NULL && taosArrayGetSize(pQueryInfo->pUdfInfo) > 0) { if (pQueryInfo->pUdfInfo != NULL && taosArrayGetSize(pQueryInfo->pUdfInfo) > 0) {
if (taosArrayGetSize(pQueryInfo->pUdfInfo) > 1) {
code = tscInvalidOperationMsg(pCmd->payload, "only one udf allowed", NULL);
goto _end;
}
pQueryMsg->udfContentOffset = htonl((int32_t) (pMsg - pCmd->payload)); pQueryMsg->udfContentOffset = htonl((int32_t) (pMsg - pCmd->payload));
for(int32_t i = 0; i < taosArrayGetSize(pQueryInfo->pUdfInfo); ++i) { for(int32_t i = 0; i < taosArrayGetSize(pQueryInfo->pUdfInfo); ++i) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, i); SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, i);
......
...@@ -1271,6 +1271,28 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue ...@@ -1271,6 +1271,28 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
.pGroupList = taosArrayInit(1, POINTER_BYTES), .pGroupList = taosArrayInit(1, POINTER_BYTES),
}; };
SUdfInfo* pUdfInfo = NULL;
size_t size = tscNumOfExprs(px);
for (int32_t j = 0; j < size; ++j) {
SExprInfo* pExprInfo = tscExprGet(px, j);
int32_t functionId = pExprInfo->base.functionId;
if (functionId < 0) {
if (pUdfInfo) {
pSql->res.code = tscInvalidOperationMsg(pSql->cmd.payload, "only one udf allowed", NULL);
return;
}
pUdfInfo = taosArrayGet(px->pUdfInfo, -1 * functionId - 1);
int32_t code = initUdfInfo(pUdfInfo);
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
return;
}
}
}
tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN}; STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN};
...@@ -1352,6 +1374,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue ...@@ -1352,6 +1374,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest queries are ready", pSql->self, pSql->self); tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest queries are ready", pSql->self, pSql->self);
px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN, pSql->self); px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN, pSql->self);
px->pQInfo->runtimeEnv.udfIsCopy = true;
px->pQInfo->runtimeEnv.pUdfInfo = pUdfInfo;
tfree(pColumnInfo); tfree(pColumnInfo);
tfree(schema); tfree(schema);
...@@ -4800,9 +4825,14 @@ int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaI ...@@ -4800,9 +4825,14 @@ int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaI
functionId = TSDB_FUNC_STDDEV; functionId = TSDB_FUNC_STDDEV;
} }
SUdfInfo* pUdfInfo = NULL;
if (functionId < 0) {
pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
}
int32_t inter = 0; int32_t inter = 0;
getResultDataInfo(pSource->base.colType, pSource->base.colBytes, functionId, 0, &pse->resType, getResultDataInfo(pSource->base.colType, pSource->base.colBytes, functionId, 0, &pse->resType,
&pse->resBytes, &inter, 0, false, NULL); &pse->resBytes, &inter, 0, false, pUdfInfo);
pse->colType = pse->resType; pse->colType = pse->resType;
pse->colBytes = pse->resBytes; pse->colBytes = pse->resBytes;
......
...@@ -233,6 +233,7 @@ int32_t isValidFunction(const char* name, int32_t len); ...@@ -233,6 +233,7 @@ int32_t isValidFunction(const char* name, int32_t len);
#define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0) #define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0)
#define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0) #define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0)
#define IS_OUTER_FORWARD(x) (((x)&TSDB_FUNCSTATE_OF) != 0) #define IS_OUTER_FORWARD(x) (((x)&TSDB_FUNCSTATE_OF) != 0)
#define IS_SCALAR_FUNCTION(x) (((x)&TSDB_FUNCSTATE_SCALAR) != 0)
// determine the real data need to calculated the result // determine the real data need to calculated the result
enum { enum {
......
...@@ -313,6 +313,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -313,6 +313,7 @@ typedef struct SQueryRuntimeEnv {
SRspResultInfo resultInfo; SRspResultInfo resultInfo;
SHashObj *pTableRetrieveTsMap; SHashObj *pTableRetrieveTsMap;
SUdfInfo *pUdfInfo; SUdfInfo *pUdfInfo;
bool udfIsCopy;
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
enum { enum {
......
...@@ -51,6 +51,7 @@ typedef struct SUdfInfo { ...@@ -51,6 +51,7 @@ typedef struct SUdfInfo {
SUdfInit init; SUdfInit init;
char *content; char *content;
char *path; char *path;
bool keep;
} SUdfInfo; } SUdfInfo;
//script //script
......
...@@ -988,13 +988,12 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t ...@@ -988,13 +988,12 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo); void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
if (pUdfInfo->isScript) { if (pUdfInfo->isScript) {
(*(scriptFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pUdfInfo->pScriptCtx, pCtx->startTs, pCtx->pOutput, &output); (*(scriptFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pUdfInfo->pScriptCtx, pCtx->startTs, pCtx->pOutput, (int32_t *)&pCtx->resultInfo->numOfRes);
} else { } else {
(*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, interBuf, &output, &pUdfInfo->init); (*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, interBuf, (int32_t *)&pCtx->resultInfo->numOfRes, &pUdfInfo->init);
} }
// set the output value exist
pCtx->resultInfo->numOfRes = output; if (pCtx->resultInfo->numOfRes > 0) {
if (output > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG; pCtx->resultInfo->hasResult = DATA_SET_FLAG;
} }
...@@ -2409,7 +2408,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2409,7 +2408,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree(pRuntimeEnv->sasArray); tfree(pRuntimeEnv->sasArray);
} }
if (!pRuntimeEnv->udfIsCopy) {
destroyUdfInfo(pRuntimeEnv->pUdfInfo); destroyUdfInfo(pRuntimeEnv->pUdfInfo);
}
destroyResultBuf(pRuntimeEnv->pResultBuf); destroyResultBuf(pRuntimeEnv->pResultBuf);
doFreeQueryHandle(pRuntimeEnv); doFreeQueryHandle(pRuntimeEnv);
...@@ -8039,7 +8040,7 @@ static char* getUdfFuncName(char* funcname, char* name, int type) { ...@@ -8039,7 +8040,7 @@ static char* getUdfFuncName(char* funcname, char* name, int type) {
} }
int32_t initUdfInfo(SUdfInfo* pUdfInfo) { int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
if (pUdfInfo == NULL) { if (pUdfInfo == NULL || pUdfInfo->handle) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
//qError("script len: %d", pUdfInfo->contLen); //qError("script len: %d", pUdfInfo->contLen);
...@@ -8074,10 +8075,21 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) { ...@@ -8074,10 +8075,21 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
// TODO check for failure of flush to disk // TODO check for failure of flush to disk
/*size_t t = */ fwrite(pUdfInfo->content, pUdfInfo->contLen, 1, file); /*size_t t = */ fwrite(pUdfInfo->content, pUdfInfo->contLen, 1, file);
fclose(file); fclose(file);
if (!pUdfInfo->keep) {
tfree(pUdfInfo->content); tfree(pUdfInfo->content);
}
if (pUdfInfo->path) {
unlink(pUdfInfo->path);
}
tfree(pUdfInfo->path);
pUdfInfo->path = strdup(path); pUdfInfo->path = strdup(path);
if (pUdfInfo->handle) {
taosCloseDll(pUdfInfo->handle);
}
pUdfInfo->handle = taosLoadDll(path); pUdfInfo->handle = taosLoadDll(path);
if (NULL == pUdfInfo->handle) { if (NULL == pUdfInfo->handle) {
...@@ -8092,9 +8104,17 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) { ...@@ -8092,9 +8104,17 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_INIT)); pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_INIT));
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_FINALIZE)); pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_FINALIZE));
pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_MERGE)); pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_MERGE));
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
if (NULL == pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] || NULL == pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE]) {
return TSDB_CODE_QRY_SYS_ERROR;
}
} else {
if (pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] || pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE]) {
return TSDB_CODE_QRY_SYS_ERROR;
}
} }
pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_DESTROY)); pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_DESTROY));
...@@ -8201,7 +8221,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp ...@@ -8201,7 +8221,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
} }
int32_t param = (int32_t)pExprs[i].base.param[0].i64; int32_t param = (int32_t)pExprs[i].base.param[0].i64;
if (pExprs[i].base.functionId != TSDB_FUNC_ARITHM && if (pExprs[i].base.functionId > 0 && pExprs[i].base.functionId != TSDB_FUNC_ARITHM &&
(type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) { (type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) {
tfree(pExprs); tfree(pExprs);
return TSDB_CODE_QRY_INVALID_MSG; return TSDB_CODE_QRY_INVALID_MSG;
......
...@@ -210,10 +210,10 @@ class TDTestCase: ...@@ -210,10 +210,10 @@ class TDTestCase:
tdSql.query("select max(id) + 5 from tb1") tdSql.query("select max(id) + 5 from tb1")
tdSql.query("select max(id) + avg(val) from st") tdSql.query("select max(id) + avg(val) from st")
tdSql.query("select max(id) + avg(val) from tb1") tdSql.query("select max(id) + avg(val) from tb1")
tdSql.error("select abs_max(number) + 5 from st") tdSql.query("select abs_max(number) + 5 from st")
tdSql.error("select abs_max(number) + 5 from tb1") tdSql.query("select abs_max(number) + 5 from tb1")
tdSql.error("select abs_max(number) + max(id) from st") tdSql.error("select abs_max(number) + max(id) from st")
tdSql.error("select abs_max(number)*abs_max(val) from st") tdSql.query("select abs_max(number)*abs_max(val) from st")
tdLog.info("======= UDF Nested query test =======") tdLog.info("======= UDF Nested query test =======")
tdSql.query("select sum(id) from (select id from st)") tdSql.query("select sum(id) from (select id from st)")
......
...@@ -452,6 +452,7 @@ if $data31 != 2 then ...@@ -452,6 +452,7 @@ if $data31 != 2 then
return -1 return -1
endi endi
sql_error select add_one(f1) from tb1 order by ts desc;
sql select add_one(f1) from tb1 limit 2; sql select add_one(f1) from tb1 limit 2;
if $rows != 2 then if $rows != 2 then
......
...@@ -10,9 +10,10 @@ sql connect ...@@ -10,9 +10,10 @@ sql connect
print ======================== dnode1 start print ======================== dnode1 start
sql create function add_one as '/tmp/add_one.so' outputtype int; sql create function add_one as '/tmp/add_one.so' outputtype int;
sql create function add_one_64232 as '/tmp/add_one_64232.so' outputtype int;
sql create aggregate function sum_double as '/tmp/sum_double.so' outputtype int; sql create aggregate function sum_double as '/tmp/sum_double.so' outputtype int;
sql show functions; sql show functions;
if $rows != 2 then if $rows != 3 then
return -1 return -1
endi endi
...@@ -1154,6 +1155,93 @@ if $data61 != 22 then ...@@ -1154,6 +1155,93 @@ if $data61 != 22 then
return -1 return -1
endi endi
sql_error select sum_double(f1),add_one(f1) from tb1 where ts>="2021-03-23 17:00:00.000" and ts<="2021-03-24 20:00:00.000" interval (1h) sliding (30m);
sql select add_one(f1) from (select * from tb1);
if $rows != 7 then
return -1
endi
if $data00 != 2 then
return -1
endi
if $data10 != 3 then
return -1
endi
if $data20 != 4 then
return -1
endi
if $data30 != 5 then
return -1
endi
if $data40 != 6 then
return -1
endi
if $data50 != 7 then
return -1
endi
if $data60 != 8 then
return -1
endi
sql select add_one(ff1) from (select add_one(f1) as ff1 from tb1);
if $rows != 7 then
return -1
endi
if $data00 != 3 then
return -1
endi
if $data10 != 4 then
return -1
endi
if $data20 != 5 then
return -1
endi
if $data30 != 6 then
return -1
endi
if $data40 != 7 then
return -1
endi
if $data50 != 8 then
return -1
endi
if $data60 != 9 then
return -1
endi
sql_error select add_one(f1),sub_one(f1) from tb1;
sql create table taaa (ts timestamp, f1 bigint);
sql insert into taaa values (now, 1);
sleep 100
sql insert into taaa values (now, 10);
sleep 100
sql insert into taaa values (now, 1000);
sleep 100
sql insert into taaa values (now, 100);
sql select add_one_64232(f1) from taaa;
if $rows != 4 then
print $rows
return -1
endi
if $data00 != 2 then
return -1
endi
if $data10 != 11 then
return -1
endi
if $data20 != 1001 then
return -1
endi
if $data30 != 101 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
......
...@@ -415,4 +415,8 @@ cd ../../../debug; make ...@@ -415,4 +415,8 @@ cd ../../../debug; make
./test.sh -f general/parser/last_cache.sim ./test.sh -f general/parser/last_cache.sim
./test.sh -f unique/big/balance.sim ./test.sh -f unique/big/balance.sim
./test.sh -f general/parser/udf.sim
./test.sh -f general/parser/udf_dll.sim
./test.sh -f general/parser/udf_dll_stable.sim
#======================b7-end=============== #======================b7-end===============
...@@ -38,6 +38,8 @@ void abs_max(char* data, short itype, short ibytes, int numOfRows, long long* ts ...@@ -38,6 +38,8 @@ void abs_max(char* data, short itype, short ibytes, int numOfRows, long long* ts
*(long *)dataOutput=r; *(long *)dataOutput=r;
printf("abs_max out, dataoutput:%ld, numOfOutput:%d\n", *(long *)dataOutput, *numOfOutput); printf("abs_max out, dataoutput:%ld, numOfOutput:%d\n", *(long *)dataOutput, *numOfOutput);
} else {
*numOfOutput=0;
} }
} }
...@@ -47,7 +49,7 @@ void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfIn ...@@ -47,7 +49,7 @@ void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfIn
int i; int i;
int r = 0; int r = 0;
printf("abs_max_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf); printf("abs_max_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf);
*numOfOutput=1;
printf("abs_max finalize, dataoutput:%ld, numOfOutput:%d\n", *(long *)dataOutput, *numOfOutput); printf("abs_max finalize, dataoutput:%ld, numOfOutput:%d\n", *(long *)dataOutput, *numOfOutput);
} }
......
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
void add_one_64232(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBUf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
int r = 0;
printf("add_one_64232 input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 5) {
for(i=0;i<numOfRows;++i) {
printf("input %d - %ld", i, *((long *)data + i));
*((int *)dataOutput+i)=(int)*((long *)data + i) + 1;
printf(", output %d\n", *((int *)dataOutput+i));
if (tsOutput) {
*(long long*)tsOutput=1000000;
}
}
*numOfOutput=numOfRows;
printf("add_one_64232 out, numOfOutput:%d\n", *numOfOutput);
}
}
...@@ -9,5 +9,7 @@ touch /tmp/normal ...@@ -9,5 +9,7 @@ touch /tmp/normal
gcc -g -O0 -fPIC -shared sh/sum_double.c -o /tmp/sum_double.so gcc -g -O0 -fPIC -shared sh/sum_double.c -o /tmp/sum_double.so
gcc -g -O0 -fPIC -shared sh/add_one.c -o /tmp/add_one.so gcc -g -O0 -fPIC -shared sh/add_one.c -o /tmp/add_one.so
gcc -g -O0 -fPIC -shared sh/add_one_64232.c -o /tmp/add_one_64232.so
gcc -g -O0 -fPIC -shared sh/sub_one.c -o /tmp/sub_one.so
gcc -g -O0 -fPIC -shared sh/demo.c -o /tmp/demo.so gcc -g -O0 -fPIC -shared sh/demo.c -o /tmp/demo.so
gcc -g -O0 -fPIC -shared sh/abs_max.c -o /tmp/abs_max.so gcc -g -O0 -fPIC -shared sh/abs_max.c -o /tmp/abs_max.so
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
void sub_one(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBUf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
int r = 0;
printf("sub_one input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 4) {
for(i=0;i<numOfRows;++i) {
printf("input %d - %d", i, *((int *)data + i));
*((int *)dataOutput+i)=*((int *)data + i) - 1;
printf(", output %d\n", *((int *)dataOutput+i));
if (tsOutput) {
*(long long*)tsOutput=1000000;
}
}
*numOfOutput=numOfRows;
printf("sub_one out, numOfOutput:%d\n", *numOfOutput);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册