未验证 提交 a012fd2c 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #20991 from taosdata/test/TS-3069-MAIN

add the cast that select field include two udf function
...@@ -79,6 +79,26 @@ ENDIF () ...@@ -79,6 +79,26 @@ ENDIF ()
target_link_libraries( target_link_libraries(
udf1 PUBLIC os ${LINK_JEMALLOC}) udf1 PUBLIC os ${LINK_JEMALLOC})
add_library(udf1_dup STATIC MODULE test/udf1_dup.c)
target_include_directories(
udf1_dup
PUBLIC
"${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/include/util"
"${TD_SOURCE_DIR}/include/common"
"${TD_SOURCE_DIR}/include/client"
"${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEPENDENCIES(udf1_dup jemalloc)
ENDIF ()
target_link_libraries(
udf1_dup PUBLIC os ${LINK_JEMALLOC})
add_library(udf2 STATIC MODULE test/udf2.c) add_library(udf2 STATIC MODULE test/udf2.c)
target_include_directories( target_include_directories(
udf2 udf2
...@@ -99,6 +119,26 @@ target_link_libraries( ...@@ -99,6 +119,26 @@ target_link_libraries(
udf2 PUBLIC os ${LINK_JEMALLOC} udf2 PUBLIC os ${LINK_JEMALLOC}
) )
add_library(udf2_dup STATIC MODULE test/udf2_dup.c)
target_include_directories(
udf2_dup
PUBLIC
"${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/include/util"
"${TD_SOURCE_DIR}/include/common"
"${TD_SOURCE_DIR}/include/client"
"${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEPENDENCIES(udf2_dup jemalloc)
ENDIF ()
target_link_libraries(
udf2_dup PUBLIC os ${LINK_JEMALLOC}
)
#SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin) #SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin)
add_executable(udfd src/udfd.c) add_executable(udfd src/udfd.c)
target_include_directories( target_include_directories(
......
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef LINUX
#include <unistd.h>
#endif
#ifdef WINDOWS
#include <windows.h>
#endif
#include "taosudf.h"
DLL_EXPORT int32_t udf1_dup_init() { return 0; }
DLL_EXPORT int32_t udf1_dup_destroy() { return 0; }
DLL_EXPORT int32_t udf1_dup(SUdfDataBlock *block, SUdfColumn *resultCol) {
SUdfColumnData *resultData = &resultCol->colData;
for (int32_t i = 0; i < block->numOfRows; ++i) {
int j = 0;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i);
break;
}
}
if (j == block->numOfCols) {
int32_t luckyNum = 2;
udfColDataSet(resultCol, i, (char *)&luckyNum, false);
}
}
// to simulate actual processing delay by udf
#ifdef LINUX
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
#endif
#ifdef WINDOWS
Sleep(1);
#endif
resultData->numOfRows = block->numOfRows;
return 0;
}
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosudf.h"
DLL_EXPORT int32_t udf2_dup_init() { return 0; }
DLL_EXPORT int32_t udf2_dup_destroy() { return 0; }
DLL_EXPORT int32_t udf2_dup_start(SUdfInterBuf* buf) {
*(int64_t*)(buf->buf) = 0;
buf->bufLen = sizeof(double);
buf->numOfResult = 1;
return 0;
}
DLL_EXPORT int32_t udf2_dup(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInterBuf* newInterBuf) {
double sumSquares = 0;
if (interBuf->numOfResult == 1) {
sumSquares = *(double*)interBuf->buf;
}
int8_t numNotNull = 0;
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];
if (!(col->colMeta.type == TSDB_DATA_TYPE_INT || col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
}
for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++j) {
SUdfColumn* col = block->udfCols[i];
if (udfColDataIsNull(col, j)) {
continue;
}
switch (col->colMeta.type) {
case TSDB_DATA_TYPE_INT: {
char* cell = udfColDataGetData(col, j);
int32_t num = *(int32_t*)cell;
sumSquares += (double)num * num;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
char* cell = udfColDataGetData(col, j);
double num = *(double*)cell;
sumSquares += num * num;
break;
}
default:
break;
}
++numNotNull;
}
}
*(double*)(newInterBuf->buf) = sumSquares;
newInterBuf->bufLen = sizeof(double);
if (interBuf->numOfResult == 0 && numNotNull == 0) {
newInterBuf->numOfResult = 0;
} else {
newInterBuf->numOfResult = 1;
}
return 0;
}
DLL_EXPORT int32_t udf2_dup_finish(SUdfInterBuf* buf, SUdfInterBuf* resultData) {
if (buf->numOfResult == 0) {
resultData->numOfResult = 0;
return 0;
}
double sumSquares = *(double*)(buf->buf);
*(double*)(resultData->buf) = sqrt(sumSquares) + 100;
resultData->bufLen = sizeof(double);
resultData->numOfResult = 1;
return 0;
}
...@@ -47,17 +47,27 @@ class TDTestCase: ...@@ -47,17 +47,27 @@ class TDTestCase:
if platform.system().lower() == 'windows': if platform.system().lower() == 'windows':
self.libudf1 = subprocess.Popen('(for /r %s %%i in ("udf1.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") self.libudf1 = subprocess.Popen('(for /r %s %%i in ("udf1.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf1_dup = subprocess.Popen('(for /r %s %%i in ("udf1_dup.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf2 = subprocess.Popen('(for /r %s %%i in ("udf2.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") self.libudf2 = subprocess.Popen('(for /r %s %%i in ("udf2.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf2_dup = subprocess.Popen('(for /r %s %%i in ("udf2_dup.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
if (not tdDnodes.dnodes[0].remoteIP == ""): if (not tdDnodes.dnodes[0].remoteIP == ""):
tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf1.so',projPath+"\\debug\\build\\lib\\") tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf1.so',projPath+"\\debug\\build\\lib\\")
tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf1_dup.so',projPath+"\\debug\\build\\lib\\")
tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf2.so',projPath+"\\debug\\build\\lib\\") tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf2.so',projPath+"\\debug\\build\\lib\\")
tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf2_dup.so',projPath+"\\debug\\build\\lib\\")
self.libudf1 = self.libudf1.replace('udf1.dll','libudf1.so') self.libudf1 = self.libudf1.replace('udf1.dll','libudf1.so')
self.libudf1_dup = self.libudf1_dup.replace('udf1_dup.dll','libudf1_dup.so')
self.libudf2 = self.libudf2.replace('udf2.dll','libudf2.so') self.libudf2 = self.libudf2.replace('udf2.dll','libudf2.so')
self.libudf2_dup = self.libudf2_dup.replace('udf2_dup.dll','libudf2_dup.so')
else: else:
self.libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") self.libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf1_dup = subprocess.Popen('find %s -name "libudf1_dup.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf2 = subprocess.Popen('find %s -name "libudf2.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") self.libudf2 = subprocess.Popen('find %s -name "libudf2.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf2_dup = subprocess.Popen('find %s -name "libudf2_dup.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf1 = self.libudf1.replace('\r','').replace('\n','') self.libudf1 = self.libudf1.replace('\r','').replace('\n','')
self.libudf1_dup = self.libudf1_dup.replace('\r','').replace('\n','')
self.libudf2 = self.libudf2.replace('\r','').replace('\n','') self.libudf2 = self.libudf2.replace('\r','').replace('\n','')
self.libudf2_dup = self.libudf2_dup.replace('\r','').replace('\n','')
def prepare_data(self): def prepare_data(self):
...@@ -174,10 +184,12 @@ class TDTestCase: ...@@ -174,10 +184,12 @@ class TDTestCase:
# create scalar functions # create scalar functions
tdSql.execute("create function udf1 as '%s' outputtype int;"%self.libudf1) tdSql.execute("create function udf1 as '%s' outputtype int;"%self.libudf1)
tdSql.execute("create function udf1_dup as '%s' outputtype int;"%self.libudf1_dup)
# create aggregate functions # create aggregate functions
tdSql.execute("create aggregate function udf2 as '%s' outputtype double bufSize 8;"%self.libudf2) tdSql.execute("create aggregate function udf2 as '%s' outputtype double bufSize 8;"%self.libudf2)
tdSql.execute("create aggregate function udf2_dup as '%s' outputtype double bufSize 8;"%self.libudf2_dup)
functions = tdSql.getResult("show functions") functions = tdSql.getResult("show functions")
function_nums = len(functions) function_nums = len(functions)
...@@ -188,6 +200,13 @@ class TDTestCase: ...@@ -188,6 +200,13 @@ class TDTestCase:
# scalar functions # scalar functions
# udf1_dup
tdSql.query("select udf1(num1) ,udf1_dup(num1) from tb")
tdSql.checkData(1,0,1)
tdSql.checkData(1,1,2)
tdSql.checkData(2,0,1)
tdSql.checkData(2,1,2)
tdSql.execute("use db ") tdSql.execute("use db ")
tdSql.query("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb") tdSql.query("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb")
tdSql.checkData(0,0,None) tdSql.checkData(0,0,None)
...@@ -238,6 +257,10 @@ class TDTestCase: ...@@ -238,6 +257,10 @@ class TDTestCase:
# aggregate functions # aggregate functions
tdSql.query("select udf2(num1) ,udf2_dup(num1) from tb")
val = tdSql.queryResult[0][0] + 100
tdSql.checkData(0,1,val)
tdSql.query("select udf2(num1) ,udf2(num2), udf2(num3) from tb") tdSql.query("select udf2(num1) ,udf2(num2), udf2(num3) from tb")
tdSql.checkData(0,0,15.362291496) tdSql.checkData(0,0,15.362291496)
tdSql.checkData(0,1,10000949.553189287) tdSql.checkData(0,1,10000949.553189287)
......
...@@ -82,7 +82,7 @@ class TDTestCase: ...@@ -82,7 +82,7 @@ class TDTestCase:
tdSql.query("select * from %s.notifyinfo"%cdbName) tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 2 : if tdSql.getRows() == 2 :
print(tdSql.getData(0, 1), tdSql.getData(1, 1)) tdLog.info("row[0][1]: %d, row[1][1]: %d"%(tdSql.getData(0, 1), tdSql.getData(1, 1)))
if tdSql.getData(1, 1) == 1: if tdSql.getData(1, 1) == 1:
break break
time.sleep(0.1) time.sleep(0.1)
...@@ -122,6 +122,7 @@ class TDTestCase: ...@@ -122,6 +122,7 @@ class TDTestCase:
os.system(shellCmd) os.system(shellCmd)
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tdLog.info("start create tables......")
tsql.execute("create database if not exists %s vgroups %d wal_retention_period 3600"%(dbName, vgroups)) tsql.execute("create database if not exists %s vgroups %d wal_retention_period 3600"%(dbName, vgroups))
tsql.execute("use %s" %dbName) tsql.execute("use %s" %dbName)
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
...@@ -137,11 +138,11 @@ class TDTestCase: ...@@ -137,11 +138,11 @@ class TDTestCase:
tsql.execute(sql) tsql.execute(sql)
event.set() event.set()
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) tdLog.info("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
return return
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............") tdLog.info("start to insert data ............")
tsql.execute("use %s" %dbName) tsql.execute("use %s" %dbName)
pre_insert = "insert into " pre_insert = "insert into "
sql = pre_insert sql = pre_insert
...@@ -163,7 +164,7 @@ class TDTestCase: ...@@ -163,7 +164,7 @@ class TDTestCase:
if sql != pre_insert: if sql != pre_insert:
#print("insert sql:%s"%sql) #print("insert sql:%s"%sql)
tsql.execute(sql) tsql.execute(sql)
tdLog.debug("insert data ............ [OK]") tdLog.info("insert data ............ [OK]")
return return
def prepareEnv(self, **parameterDict): def prepareEnv(self, **parameterDict):
...@@ -286,7 +287,7 @@ class TDTestCase: ...@@ -286,7 +287,7 @@ class TDTestCase:
prepareEnvThread.start() prepareEnvThread.start()
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_db1' topicName1 = 'topic_db11'
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
consumerId = 0 consumerId = 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册