未验证 提交 a6cc171d 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20498 from taosdata/szhou/python-udf

enhance: add taospyudf test and run it in CI
......@@ -241,7 +241,7 @@ typedef struct SUvUdfWork {
struct SUvUdfWork *pWorkNext;
} SUvUdfWork;
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY} EUdfState;
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfState;
typedef struct SUdf {
char name[TSDB_FUNC_NAME_LEN + 1];
......@@ -439,7 +439,7 @@ int32_t udfdInitScriptPlugin(int8_t scriptType) {
return err;
}
break;
}
}
default:
fnError("udf script type %d not supported", scriptType);
taosMemoryFree(plugin);
......@@ -509,15 +509,15 @@ int32_t udfdRenameUdfFile(SUdf *udf) {
char newPath[PATH_MAX];
if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
snprintf(newPath, PATH_MAX, "%s/lib%s.so", tsTempDir, udf->name);
taosRenameFile(udf->path, newPath);
sprintf(udf->path, "%s", newPath);
} else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
snprintf(newPath, PATH_MAX, "%s/%s.py", tsTempDir, udf->name);
taosRenameFile(udf->path, newPath);
sprintf(udf->path, "%s", newPath);
} else {
return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
}
int32_t code = taosRenameFile(udf->path, newPath);
if (code == 0) {
sprintf(udf->path, "%s", newPath);
}
return 0;
}
......@@ -1358,7 +1358,7 @@ int32_t udfdDeinitResidentFuncs() {
char *funcName = taosArrayGet(global.residentFuncs, i);
SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
if (udfInHash) {
SUdf *udf = *udfInHash;
SUdf *udf = *udfInHash;
int32_t code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
fnDebug("udfd destroy function returns %d", code);
taosHashRemove(global.udfsHash, funcName, strlen(funcName));
......
......@@ -177,6 +177,7 @@
,,y,script,./test.sh -f tsim/query/scalarNull.sim
,,y,script,./test.sh -f tsim/query/session.sim
,,y,script,./test.sh -f tsim/query/udf.sim
,,n,script,./test.sh -f tsim/query/udfpy.sim
,,y,script,./test.sh -f tsim/query/udf_with_const.sim
,,y,script,./test.sh -f tsim/query/sys_tbname.sim
,,y,script,./test.sh -f tsim/query/groupby.sim
......
......@@ -51,6 +51,9 @@ if [ $ent -eq 0 ]; then
ln -s /home/TDengine/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null
ln -s /home/TDengine/debug/build/lib/libtaos.so /usr/lib/libtaos.so.1 2>/dev/null
ln -s /home/TDengine/include/client/taos.h /usr/include/taos.h 2>/dev/null
ln -s /home/TDengine/include/common/taosdef.h /usr/include/taosdef.h 2>/dev/null
ln -s /home/TDengine/include/util/taoserror.h /usr/include/taoserror.h 2>/dev/null
ln -s /home/TDengine/include/libs/function/taosudf.h /usr/include/taosudf.h 2>/dev/null
CONTAINER_TESTDIR=/home/TDengine
else
export PATH=$PATH:/home/TDinternal/debug/build/bin
......@@ -58,6 +61,9 @@ else
ln -s /home/TDinternal/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null
ln -s /home/TDinternal/debug/build/lib/libtaos.so /usr/lib/libtaos.so.1 2>/dev/null
ln -s /home/TDinternal/community/include/client/taos.h /usr/include/taos.h 2>/dev/null
ln -s /home/TDinternal/community/include/common/taosdef.h /usr/include/taosdef.h 2>/dev/null
ln -s /home/TDinternal/community/include/util/taoserror.h /usr/include/taoserror.h 2>/dev/null
ln -s /home/TDinternal/community/include/libs/function/taosudf.h /usr/include/taosudf.h 2>/dev/null
CONTAINER_TESTDIR=/home/TDinternal/community
fi
mkdir -p /var/lib/taos/subscribe
......
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosudf.h"
DLL_EXPORT int32_t bit_and_init() { return 0; }
DLL_EXPORT int32_t bit_and_init() {
return 0;
}
DLL_EXPORT int32_t bit_and_destroy() {
return 0;
}
DLL_EXPORT int32_t bit_and(SUdfDataBlock* block, SUdfColumn *resultCol) {
DLL_EXPORT int32_t bit_and_destroy() { return 0; }
if (block->numOfCols < 2) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
DLL_EXPORT int32_t bit_and(SUdfDataBlock* block, SUdfColumn* resultCol) {
if (block->numOfCols < 2) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];
if (!(col->colMeta.type == TSDB_DATA_TYPE_INT)) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];
if (!(col->colMeta.type == TSDB_DATA_TYPE_INT)) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
}
SUdfColumnMeta *meta = &resultCol->colMeta;
meta->bytes = 4;
meta->type = TSDB_DATA_TYPE_INT;
meta->scale = 0;
meta->precision = 0;
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block->numOfRows;
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
if (udfColDataIsNull(block->udfCols[0], i)) {
udfColDataSetNull(resultCol, i);
continue;
}
int32_t result = *(int32_t*)udfColDataGetData(block->udfCols[0], i);
int j = 1;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i);
break;
}
char* colData = udfColDataGetData(block->udfCols[j], i);
result &= *(int32_t*)colData;
}
if (j == block->numOfCols) {
udfColDataSet(resultCol, i, (char*)&result, false);
}
SUdfColumnData* resultData = &resultCol->colData;
for (int32_t i = 0; i < block->numOfRows; ++i) {
if (udfColDataIsNull(block->udfCols[0], i)) {
udfColDataSetNull(resultCol, i);
continue;
}
return TSDB_CODE_SUCCESS;
int32_t result = *(int32_t*)udfColDataGetData(block->udfCols[0], i);
int j = 1;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i);
break;
}
char* colData = udfColDataGetData(block->udfCols[j], i);
result &= *(int32_t*)colData;
}
if (j == block->numOfCols) {
udfColDataSet(resultCol, i, (char*)&result, false);
}
}
resultData->numOfRows = block->numOfRows;
return TSDB_CODE_SUCCESS;
}
......@@ -16,7 +16,7 @@ DLL_EXPORT int32_t l2norm_destroy() {
DLL_EXPORT int32_t l2norm_start(SUdfInterBuf *buf) {
*(int64_t*)(buf->buf) = 0;
buf->bufLen = sizeof(double);
buf->numOfResult = 0;
buf->numOfResult = 1;
return 0;
}
......@@ -58,20 +58,11 @@ DLL_EXPORT int32_t l2norm(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInte
*(double*)(newInterBuf->buf) = sumSquares;
newInterBuf->bufLen = sizeof(double);
if (interBuf->numOfResult == 0 && numNotNull == 0) {
newInterBuf->numOfResult = 0;
} else {
newInterBuf->numOfResult = 1;
}
newInterBuf->numOfResult = 1;
return 0;
}
DLL_EXPORT int32_t l2norm_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
if (buf->numOfResult == 0) {
resultData->numOfResult = 0;
return 0;
}
double sumSquares = *(double*)(buf->buf);
*(double*)(resultData->buf) = sqrt(sumSquares);
resultData->bufLen = sizeof(double);
......
#!/bin/bash
set +e
FILE=/usr/local/lib/libtaospyudf.so
if [ ! -f "$FILE" ]; then
echo "$FILE does not exist."
apt install -y python3 python3-dev python3-venv
/usr/bin/python3 -m venv /udfenv
source /udfenv/bin/activate
pip3 install taospyudf
ldconfig
deactivate
else
echo "show dependencies of $FILE"
ldd $FILE
fi
def init():
pass
def process(block):
(rows, cols) = block.shape()
result = []
for i in range(rows):
r = 2 ** 32 - 1
for j in range(cols):
cell = block.data(i,j)
if cell is None:
result.append(None)
break
else:
r = r & cell
else:
result.append(r)
return result
def destroy():
pass
import json
import math
def init():
pass
def destroy():
pass
def start():
return json.dumps(0.0).encode('utf-8')
def finish(buf):
sum_squares = json.loads(buf)
result = math.sqrt(sum_squares)
return result
def reduce(datablock, buf):
(rows, cols) = datablock.shape()
sum_squares = json.loads(buf)
for i in range(rows):
for j in range(cols):
cell = datablock.data(i,j)
if cell is not None:
sum_squares += cell * cell
return json.dumps(sum_squares).encode('utf-8')
system_content printf %OS%
if $system_content == Windows_NT then
return 0;
endi
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c udf -v 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== step1 udf
system sh/compile_udf.sh
system sh/prepare_pyudf.sh
system mkdir -p /tmp/pyudf
system cp sh/pybitand.py /tmp/pyudf/
system cp sh/pyl2norm.py /tmp/pyudf/
system ls /tmp/pyudf
sql create database udf vgroups 3;
sql use udf;
sql select * from information_schema.ins_databases;
sql create table t (ts timestamp, f int);
sql insert into t values(now, 1)(now+1s, 2);
system_content printf %OS%
if $system_content == Windows_NT then
return 0;
endi
if $system_content == Windows_NT then
sql create function bit_and as 'C:\\Windows\\Temp\\bitand.dll' outputtype int bufSize 8;
sql create aggregate function l2norm as 'C:\\Windows\\Temp\\l2norm.dll' outputtype double bufSize 8;
else
sql create function bit_and as '/tmp/udf/libbitand.so' outputtype int bufSize 8;
sql create aggregate function l2norm as '/tmp/udf/libl2norm.so' outputtype double bufSize 8;
endi
sql create function pybitand as '/tmp/pyudf/pybitand.py' outputtype int language 'python';
sql create aggregate function pyl2norm as '/tmp/pyudf/pyl2norm.py' outputtype double bufSize 128 language 'python';
sql show functions;
if $rows != 4 then
return -1
endi
sql select bit_and(f, f) from t;
if $rows != 2 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data10 != 2 then
return -1
endi
sql select pybitand(f, f) from t;
if $rows != 2 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data10 != 2 then
return -1
endi
sql select l2norm(f) from t;
if $rows != 1 then
print expect 1, actual $rows
return -1
endi
if $data00 != 2.236067977 then
return -1
endi
sql select pyl2norm(f) from t;
if $rows != 1 then
print expect 1, actual $rows
return -1
endi
if $data00 != 2.236067977 then
return -1
endi
sql create table t2 (ts timestamp, f1 int, f2 int);
sql insert into t2 values(now, 0, 0)(now+1s, 1, 1);
sql select bit_and(f1, f2) from t2;
if $rows != 2 then
return -1
endi
if $data00 != 0 then
return -1
endi
if $data10 != 1 then
return -1
endi
sql select l2norm(f1, f2) from t2;
if $rows != 1 then
return -1
endi
if $data00 != 1.414213562 then
return -1
endi
sql select pybitand(f1, f2) from t2;
if $rows != 2 then
return -1
endi
if $data00 != 0 then
return -1
endi
if $data10 != 1 then
return -1
endi
sql select pyl2norm(f1, f2) from t2;
if $rows != 1 then
return -1
endi
if $data00 != 1.414213562 then
return -1
endi
sql insert into t2 values(now+2s, 1, null)(now+3s, null, 2);
sql select bit_and(f1, f2) from t2;
print $rows , $data00 , $data10 , $data20 , $data30
if $rows != 4 then
return -1
endi
if $data00 != 0 then
return -1
endi
if $data10 != 1 then
return -1
endi
if $data20 != NULL then
return -1
endi
if $data30 != NULL then
return -1
endi
sql select l2norm(f1, f2) from t2;
print $rows, $data00
if $rows != 1 then
return -1
endi
if $data00 != 2.645751311 then
return -1
endi
sql select pybitand(f1, f2) from t2;
print $rows , $data00 , $data10 , $data20 , $data30
if $rows != 4 then
return -1
endi
if $data00 != 0 then
return -1
endi
if $data10 != 1 then
return -1
endi
if $data20 != NULL then
return -1
endi
if $data30 != NULL then
return -1
endi
sql select pyl2norm(f1, f2) from t2;
print $rows, $data00
if $rows != 1 then
return -1
endi
if $data00 != 2.645751311 then
return -1
endi
sql insert into t2 values(now+4s, 4, 8)(now+5s, 5, 9);
sql select l2norm(f1-f2), l2norm(f1+f2) from t2;
print $rows , $data00 , $data01
if $rows != 1 then
return -1;
endi
if $data00 != 5.656854249 then
return -1
endi
if $data01 != 18.547236991 then
return -1
endi
sql select l2norm(bit_and(f2, f1)), l2norm(bit_and(f1, f2)) from t2;
print $rows , $data00 , $data01
if $rows != 1 then
return -1
endi
if $data00 != 1.414213562 then
return -1
endi
if $data01 != 1.414213562 then
return -1
endi
sql select l2norm(f2) from udf.t2 group by 1-bit_and(f1, f2) order by 1-bit_and(f1,f2);
print $rows , $data00 , $data10 , $data20
if $rows != 3 then
return -1
endi
if $data00 != 2.000000000 then
return -1
endi
if $data10 != 9.055385138 then
return -1
endi
if $data20 != 8.000000000 then
return -1
endi
sql select pyl2norm(f1-f2), pyl2norm(f1+f2) from t2;
print $rows , $data00 , $data01
if $rows != 1 then
return -1;
endi
if $data00 != 5.656854249 then
return -1
endi
if $data01 != 18.547236991 then
return -1
endi
sql select pyl2norm(pybitand(f2, f1)), pyl2norm(pybitand(f1, f2)) from t2;
print $rows , $data00 , $data01
if $rows != 1 then
return -1
endi
if $data00 != 1.414213562 then
return -1
endi
if $data01 != 1.414213562 then
return -1
endi
sql select pyl2norm(f2) from udf.t2 group by 1-pybitand(f1, f2) order by 1-pybitand(f1,f2);
print $rows , $data00 , $data10 , $data20
if $rows != 3 then
return -1
endi
if $data00 != 2.000000000 then
return -1
endi
if $data10 != 9.055385138 then
return -1
endi
if $data20 != 8.000000000 then
return -1
endi
#sql drop function bit_and;
#sql show functions;
#if $rows != 1 then
# return -1
#endi
#if $data00 != @l2norm@ then
# return -1
# endi
#sql drop function l2norm;
#sql show functions;
#if $rows != 0 then
# return -1
#endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册