提交 d156bff7 编写于 作者: S slzhou

enhance: support resident functions

上级 75e88133
......@@ -29,7 +29,7 @@
#include "trpc.h"
// clang-foramt on
SArray* udfdResidentFuncs;
SArray* udfdResidentFuncs = NULL;
typedef struct SUdfdContext {
uv_loop_t * loop;
......@@ -69,6 +69,7 @@ typedef struct SUdf {
EUdfState state;
uv_mutex_t lock;
uv_cond_t condReady;
bool resident;
char name[TSDB_FUNC_NAME_LEN];
int8_t funcType;
......@@ -202,6 +203,14 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
if (udf->initFunc) {
udf->initFunc();
}
udf->resident = false;
for (int32_t i = 0; i < taosArrayGetSize(udfdResidentFuncs); ++i) {
char* funcName = taosArrayGet(udfdResidentFuncs, i);
if (strcmp(setup->udfName, funcName) == 0) {
udf->resident = true;
break;
}
}
udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock);
......@@ -347,7 +356,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
uv_mutex_lock(&global.udfsMutex);
udf->refCount--;
if (udf->refCount == 0) {
if (udf->refCount == 0 && !udf->resident) {
unloadUdf = true;
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
}
......@@ -944,10 +953,28 @@ void udfdConnectMnodeThreadFunc(void *args) {
}
int32_t udfdInitResidentFuncs() {
udfdResidentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
char gpd[TSDB_FUNC_NAME_LEN] = "gpd";
taosArrayPush(udfdResidentFuncs, gpd);
char gpdBatch[TSDB_FUNC_NAME_LEN] = "gpdbatch";
taosArrayPush(udfdResidentFuncs, gpdBatch);
return TSDB_CODE_SUCCESS;
}
int32_t udfdDeinitResidentFuncs() {
for (int32_t i = 0; i < taosArrayGetSize(udfdResidentFuncs); ++i) {
char* funcName = taosArrayGet(udfdResidentFuncs, i);
SUdf** udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
if (udfInHash) {
taosHashRemove(global.udfsHash, funcName, strlen(funcName));
SUdf* udf = *udfInHash;
if (udf->destroyFunc) {
(udf->destroyFunc)();
}
uv_dlclose(&udf->lib);
taosMemoryFree(udf);
}
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -40,10 +40,6 @@ DLL_EXPORT int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) {
udfColDataSet(resultCol, i, (char *)&luckyNum, false);
}
}
TAOS* taos = taos_connect("127.0.0.1", "root", "taosdata", "gpd", 6030);
taos_query(taos, "create st (ts timestamp, f int) tags(t int)");
taos_query(taos, "insert into t using st tags(1) values(now, 1) ");
taos_query(taos, "select * from gpd.t");
//to simulate actual processing delay by udf
#ifdef LINUX
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
......
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#ifdef LINUX
#include <unistd.h>
#endif
#ifdef WINDOWS
#include <windows.h>
#endif
#include "taosudf.h"
DLL_EXPORT int32_t gpd_init() {
return 0;
}
DLL_EXPORT int32_t gpd_destroy() {
return 0;
}
DLL_EXPORT int32_t gpd(SUdfDataBlock* block, SUdfColumn *resultCol) {
SUdfColumnMeta *meta = &resultCol->colMeta;
meta->bytes = 4;
meta->type = TSDB_DATA_TYPE_INT;
meta->scale = 0;
meta->precision = 0;
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block->numOfRows;
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
int j = 0;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i);
break;
}
}
if ( j == block->numOfCols) {
int32_t luckyNum = 88;
udfColDataSet(resultCol, i, (char *)&luckyNum, false);
}
}
taos_init();
TAOS* taos = taos_connect("localhost", "root", "taosdata", "", 7100);
if (taos == NULL) {
char* errstr = "can not connect";
}
TAOS_RES* res = taos_query(taos, "create database if not exists gpd");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
res = taos_query(taos, "create table gpd.st (ts timestamp, f int) tags(t int)");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
taos_query(taos, "insert into gpd.t using gpd.st tags(1) values(now, 1) ");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
taos_query(taos, "select * from gpd.t");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
taos_close(taos);
taos_cleanup();
//to simulate actual processing delay by udf
#ifdef LINUX
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
#endif
#ifdef WINDOWS
Sleep(1);
#endif
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册