未验证 提交 004e9dab 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16927 from taosdata/szhou/cenc

fix: add udfd resident funcs that teardown when udfd exits
......@@ -120,6 +120,7 @@ extern SDiskCfg tsDiskCfg[];
// udf
extern bool tsStartUdfd;
extern char tsUdfdResFuncs[];
// schemaless
extern char tsSmlChildTableName[];
......
......@@ -163,6 +163,7 @@ int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 86400;
int32_t tsGrantHBInterval = 60;
int32_t tsUptimeInterval = 300; // seconds
char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {
......@@ -421,6 +422,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1;
GRANT_CFG_ADD;
return 0;
}
......@@ -717,6 +719,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32;
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));
if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
......
......@@ -41,6 +41,8 @@ typedef struct SUdfdContext {
uv_mutex_t udfsMutex;
SHashObj * udfsHash;
SArray* residentFuncs;
bool printVersion;
} SUdfdContext;
......@@ -67,6 +69,7 @@ typedef struct SUdf {
EUdfState state;
uv_mutex_t lock;
uv_cond_t condReady;
bool resident;
char name[TSDB_FUNC_NAME_LEN];
int8_t funcType;
......@@ -200,6 +203,14 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
if (udf->initFunc) {
udf->initFunc();
}
udf->resident = false;
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
char* funcName = taosArrayGet(global.residentFuncs, i);
if (strcmp(setup->udfName, funcName) == 0) {
udf->resident = true;
break;
}
}
udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock);
......@@ -345,7 +356,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
uv_mutex_lock(&global.udfsMutex);
udf->refCount--;
if (udf->refCount == 0) {
if (udf->refCount == 0 && !udf->resident) {
unloadUdf = true;
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
}
......@@ -576,9 +587,9 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *mergeSuffix = "_merge";
strncpy(finishFuncName, processFuncName, sizeof(finishFuncName));
strncat(finishFuncName, mergeSuffix, strlen(mergeSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggMergeFunc));
strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName));
strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix));
uv_dlsym(&udf->lib, mergeFuncName, (void **)(&udf->aggMergeFunc));
}
return 0;
}
......@@ -919,8 +930,6 @@ static int32_t udfdRun() {
uv_run(global.loop, UV_RUN_DEFAULT);
uv_loop_close(global.loop);
uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash);
return 0;
}
......@@ -941,6 +950,47 @@ void udfdConnectMnodeThreadFunc(void *args) {
}
}
int32_t udfdInitResidentFuncs() {
if (strlen(tsUdfdResFuncs) == 0) {
return TSDB_CODE_SUCCESS;
}
global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
char* pSave = tsUdfdResFuncs;
char* token;
while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
char func[TSDB_FUNC_NAME_LEN] = {0};
strncpy(func, token, strlen(token));
taosArrayPush(global.residentFuncs, func);
}
return TSDB_CODE_SUCCESS;
}
int32_t udfdDeinitResidentFuncs() {
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
char* funcName = taosArrayGet(global.residentFuncs, i);
SUdf** udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
if (udfInHash) {
taosHashRemove(global.udfsHash, funcName, strlen(funcName));
SUdf* udf = *udfInHash;
if (udf->destroyFunc) {
(udf->destroyFunc)();
}
uv_dlclose(&udf->lib);
taosMemoryFree(udf);
}
}
taosArrayDestroy(global.residentFuncs);
return TSDB_CODE_SUCCESS;
}
int32_t udfdCleanup() {
uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash);
return 0;
}
int main(int argc, char *argv[]) {
if (!taosCheckSystemIsLittleEnd()) {
printf("failed to start since on non-little-end machines\n");
......@@ -978,6 +1028,8 @@ int main(int argc, char *argv[]) {
return -5;
}
udfdInitResidentFuncs();
uv_thread_t mnodeConnectThread;
uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL);
......@@ -986,5 +1038,7 @@ int main(int argc, char *argv[]) {
removeListeningPipe();
udfdCloseClientRpc();
udfdDeinitResidentFuncs();
udfdCleanup();
return 0;
}
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#ifdef LINUX
#include <unistd.h>
#endif
#ifdef WINDOWS
#include <windows.h>
#endif
#include "taosudf.h"
TAOS* taos = NULL;
DLL_EXPORT int32_t gpd_init() {
taos = taos_connect("localhost", "root", "taosdata", "", 7100);
return 0;
}
DLL_EXPORT int32_t gpd_destroy() {
taos_close(taos);
taos_cleanup();
return 0;
}
DLL_EXPORT int32_t gpd(SUdfDataBlock* block, SUdfColumn *resultCol) {
SUdfColumnMeta *meta = &resultCol->colMeta;
meta->bytes = 4;
meta->type = TSDB_DATA_TYPE_INT;
meta->scale = 0;
meta->precision = 0;
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block->numOfRows;
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
int j = 0;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i);
break;
}
}
if ( j == block->numOfCols) {
int32_t luckyNum = 88;
udfColDataSet(resultCol, i, (char *)&luckyNum, false);
}
}
TAOS_RES* res = taos_query(taos, "create database if not exists gpd");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
res = taos_query(taos, "create table gpd.st (ts timestamp, f int) tags(t int)");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
taos_query(taos, "insert into gpd.t using gpd.st tags(1) values(now, 1) ");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
taos_query(taos, "select * from gpd.t");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
//to simulate actual processing delay by udf
#ifdef LINUX
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
#endif
#ifdef WINDOWS
Sleep(1);
#endif
return 0;
}
......@@ -144,18 +144,18 @@ if $data20 != 8.000000000 then
return -1
endi
sql drop function bit_and;
sql show functions;
if $rows != 1 then
return -1
endi
if $data00 != @l2norm@ then
return -1
endi
sql drop function l2norm;
sql show functions;
if $rows != 0 then
return -1
endi
#sql drop function bit_and;
#sql show functions;
#if $rows != 1 then
# return -1
#endi
#if $data00 != @l2norm@ then
# return -1
# endi
#sql drop function l2norm;
#sql show functions;
#if $rows != 0 then
# return -1
#endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册