未验证 提交 d70c74d4 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #11735 from taosdata/3.0_udfd

feat(udf): udf scalar api change
...@@ -307,11 +307,11 @@ int32_t dmStartUdfd(SDnode *pDnode) { ...@@ -307,11 +307,11 @@ int32_t dmStartUdfd(SDnode *pDnode) {
dInfo("dnode-mgmt start udfd already called"); dInfo("dnode-mgmt start udfd already called");
return 0; return 0;
} }
pData->startCalled = true;
uv_barrier_init(&pData->barrier, 2); uv_barrier_init(&pData->barrier, 2);
pData->stopping = 0; pData->stopping = 0;
uv_thread_create(&pData->thread, dmWatchUdfd, pDnode); uv_thread_create(&pData->thread, dmWatchUdfd, pDnode);
uv_barrier_wait(&pData->barrier); uv_barrier_wait(&pData->barrier);
pData->startCalled = true;
pData->needCleanUp = true; pData->needCleanUp = true;
return pData->spawnErr; return pData->spawnErr;
} }
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <stdbool.h> #include <stdbool.h>
#include "tmsg.h" #include "tmsg.h"
#include "tcommon.h" #include "tcommon.h"
#include "function.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -118,8 +119,7 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn ...@@ -118,8 +119,7 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn
int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf);
// input: block // input: block
// output: resultData // output: resultData
int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData); int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
/** /**
* tearn down udf * tearn down udf
* @param handle * @param handle
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "uv.h" #include "uv.h"
#include "os.h" #include "os.h"
#include "fnLog.h"
#include "tudf.h" #include "tudf.h"
#include "tudfInt.h" #include "tudfInt.h"
#include "tarray.h" #include "tarray.h"
...@@ -557,6 +558,34 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { ...@@ -557,6 +558,34 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
return 0; return 0;
} }
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
output->info.rows = input->numOfRows;
output->info.numOfCols = numOfCols;
bool hasVarCol = false;
for (int32_t i = 0; i < numOfCols; ++i) {
if (IS_VAR_DATA_TYPE((input+i)->columnData->info.type)) {
hasVarCol = true;
break;
}
}
output->info.hasVarCol = hasVarCol;
//TODO: free the array output->pDataBlock
output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
taosArrayPush(output->pDataBlock, input->columnData);
return 0;
}
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
if (input->info.numOfCols != 1) {
fnError("scalar function only support one column");
return -1;
}
output->numOfRows = input->info.rows;
//TODO: memory
output->columnData = taosArrayGet(input->pDataBlock, 0);
return 0;
}
void onUdfcPipeClose(uv_handle_t *handle) { void onUdfcPipeClose(uv_handle_t *handle) {
SClientUvConn *conn = handle->data; SClientUvConn *conn = handle->data;
...@@ -1108,11 +1137,13 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn ...@@ -1108,11 +1137,13 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn
return err; return err;
} }
// input: block int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) {
// output: resultData
int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData) {
int8_t callType = TSDB_UDF_CALL_SCALA_PROC; int8_t callType = TSDB_UDF_CALL_SCALA_PROC;
int32_t err = callUdf(handle, callType, block, NULL, NULL, resultData, NULL); SSDataBlock inputBlock = {0};
convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
SSDataBlock resultBlock = {0};
int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
convertDataBlockToScalarParm(&resultBlock, output);
return err; return err;
} }
......
...@@ -34,7 +34,7 @@ typedef struct SUdfdContext { ...@@ -34,7 +34,7 @@ typedef struct SUdfdContext {
void *clientRpc; void *clientRpc;
uv_mutex_t udfsMutex; uv_mutex_t udfsMutex;
SHashObj* udfsHash; SHashObj *udfsHash;
bool printVersion; bool printVersion;
} SUdfdContext; } SUdfdContext;
...@@ -55,12 +55,7 @@ typedef struct SUvUdfWork { ...@@ -55,12 +55,7 @@ typedef struct SUvUdfWork {
uv_buf_t output; uv_buf_t output;
} SUvUdfWork; } SUvUdfWork;
typedef enum { typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState;
UDF_STATE_INIT = 0,
UDF_STATE_LOADING,
UDF_STATE_READY,
UDF_STATE_UNLOADING
} EUdfState;
typedef struct SUdf { typedef struct SUdf {
int32_t refCount; int32_t refCount;
...@@ -83,15 +78,19 @@ typedef struct SUdfcFuncHandle { ...@@ -83,15 +78,19 @@ typedef struct SUdfcFuncHandle {
SUdf *udf; SUdf *udf;
} SUdfcFuncHandle; } SUdfcFuncHandle;
int32_t udfdLoadUdf(char* udfName, SUdf* udf) { int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf);
int32_t udfdLoadUdf(char *udfName, SEpSet *pEpSet, SUdf *udf) {
strcpy(udf->name, udfName); strcpy(udf->name, udfName);
udfdFillUdfInfoFromMNode(global.clientRpc, pEpSet, udf->name, udf);
int err = uv_dlopen(udf->path, &udf->lib); int err = uv_dlopen(udf->path, &udf->lib);
if (err != 0) { if (err != 0) {
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
// TODO set error // TODO set error
} }
//TODO: find all the functions // TODO: find all the functions
char normalFuncName[TSDB_FUNC_NAME_LEN] = {0}; char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(normalFuncName, udfName); strcpy(normalFuncName, udfName);
uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc)); uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc));
...@@ -110,13 +109,13 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -110,13 +109,13 @@ void udfdProcessRequest(uv_work_t *req) {
switch (request.type) { switch (request.type) {
case UDF_TASK_SETUP: { case UDF_TASK_SETUP: {
//TODO: tracable id from client. connect, setup, call, teardown // TODO: tracable id from client. connect, setup, call, teardown
fnInfo("%"PRId64" setup request. udf name: %s", request.seqNum, request.setup.udfName); fnInfo("%" PRId64 " setup request. udf name: %s", request.seqNum, request.setup.udfName);
SUdfSetupRequest *setup = &request.setup; SUdfSetupRequest *setup = &request.setup;
SUdf* udf = NULL; SUdf *udf = NULL;
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
SUdf** udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN); SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN);
if (*udfInHash) { if (*udfInHash) {
++(*udfInHash)->refCount; ++(*udfInHash)->refCount;
udf = *udfInHash; udf = *udfInHash;
...@@ -136,7 +135,7 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -136,7 +135,7 @@ void udfdProcessRequest(uv_work_t *req) {
uv_mutex_lock(&udf->lock); uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) { if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING; udf->state = UDF_STATE_LOADING;
udfdLoadUdf(setup->udfName, udf); udfdLoadUdf(setup->udfName, &setup->epSet, udf);
udf->state = UDF_STATE_READY; udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady); uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock); uv_mutex_unlock(&udf->lock);
...@@ -168,7 +167,8 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -168,7 +167,8 @@ void udfdProcessRequest(uv_work_t *req) {
case UDF_TASK_CALL: { case UDF_TASK_CALL: {
SUdfCallRequest *call = &request.call; SUdfCallRequest *call = &request.call;
fnDebug("%"PRId64 "call request. call type %d, handle: %"PRIx64, request.seqNum, call->callType, call->udfHandle); fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request.seqNum, call->callType,
call->udfHandle);
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf *udf = handle->udf; SUdf *udf = handle->udf;
...@@ -206,8 +206,8 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -206,8 +206,8 @@ void udfdProcessRequest(uv_work_t *req) {
} }
case UDF_TASK_TEARDOWN: { case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = &request.teardown; SUdfTeardownRequest *teardown = &request.teardown;
fnInfo("teardown. %"PRId64"handle:%"PRIx64, request.seqNum, teardown->udfHandle) fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request.seqNum, teardown->udfHandle) SUdfcFuncHandle *handle =
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); (SUdfcFuncHandle *)(teardown->udfHandle);
SUdf *udf = handle->udf; SUdf *udf = handle->udf;
bool unloadUdf = false; bool unloadUdf = false;
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
...@@ -250,7 +250,7 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -250,7 +250,7 @@ void udfdProcessRequest(uv_work_t *req) {
void udfdOnWrite(uv_write_t *req, int status) { void udfdOnWrite(uv_write_t *req, int status) {
SUvUdfWork *work = (SUvUdfWork *)req->data; SUvUdfWork *work = (SUvUdfWork *)req->data;
if (status < 0) { if (status < 0) {
//TODO:log error and process it. // TODO:log error and process it.
} }
fnDebug("send response. length:%zu, status: %s", work->output.len, uv_err_name(status)); fnDebug("send response. length:%zu, status: %s", work->output.len, uv_err_name(status));
taosMemoryFree(work->output.base); taosMemoryFree(work->output.base);
...@@ -393,7 +393,7 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { ...@@ -393,7 +393,7 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; }
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char* udfName, SUdf* udf) { int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0}; SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1; retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
...@@ -505,7 +505,7 @@ void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *bu ...@@ -505,7 +505,7 @@ void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *bu
void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
if (nread < 0) { if (nread < 0) {
fnError("udfd ctrl pipe read error. %s", uv_err_name(nread)); fnError("udfd ctrl pipe read error. %s", uv_err_name(nread));
uv_close((uv_handle_t*)q, NULL); uv_close((uv_handle_t *)q, NULL);
uv_stop(global.loop); uv_stop(global.loop);
return; return;
} }
...@@ -521,7 +521,7 @@ static int32_t removeListeningPipe() { ...@@ -521,7 +521,7 @@ static int32_t removeListeningPipe() {
} }
static int32_t udfdUvInit() { static int32_t udfdUvInit() {
uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t)); uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t));
if (loop) { if (loop) {
uv_loop_init(loop); uv_loop_init(loop);
} }
...@@ -529,7 +529,7 @@ static int32_t udfdUvInit() { ...@@ -529,7 +529,7 @@ static int32_t udfdUvInit() {
uv_pipe_init(global.loop, &global.ctrlPipe, 1); uv_pipe_init(global.loop, &global.ctrlPipe, 1);
uv_pipe_open(&global.ctrlPipe, 0); uv_pipe_open(&global.ctrlPipe, 0);
uv_read_start((uv_stream_t*)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb);
char dnodeId[8] = {0}; char dnodeId[8] = {0};
size_t dnodeIdSize; size_t dnodeIdSize;
...@@ -567,7 +567,7 @@ static int32_t udfdRun() { ...@@ -567,7 +567,7 @@ static int32_t udfdRun() {
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
uv_mutex_init(&global.udfsMutex); uv_mutex_init(&global.udfsMutex);
//TOOD: client rpc to fetch udf function info from mnode // TOOD: client rpc to fetch udf function info from mnode
if (udfdOpenClientRpc() != 0) { if (udfdOpenClientRpc() != 0) {
fnError("open rpc connection to mnode failure"); fnError("open rpc connection to mnode failure");
return -1; return -1;
...@@ -589,7 +589,7 @@ static int32_t udfdRun() { ...@@ -589,7 +589,7 @@ static int32_t udfdRun() {
return code; return code;
} }
int main(int argc, char* argv[]) { int main(int argc, char *argv[]) {
if (!taosCheckSystemIsSmallEnd()) { if (!taosCheckSystemIsSmallEnd()) {
printf("failed to start since on non-small-end machines\n"); printf("failed to start since on non-small-end machines\n");
return -1; return -1;
......
...@@ -45,11 +45,14 @@ int main(int argc, char *argv[]) { ...@@ -45,11 +45,14 @@ int main(int argc, char *argv[]) {
taosArrayPush(pBlock->pDataBlock, &colInfo); taosArrayPush(pBlock->pDataBlock, &colInfo);
} }
SSDataBlock output = {0}; SScalarParam input = {0};
callUdfScalaProcess(handle, pBlock, &output); input.numOfRows = pBlock->info.rows;
input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
SScalarParam output = {0};
callUdfScalarFunc(handle, &input, 1 , &output);
SColumnInfoData *col = taosArrayGet(output.pDataBlock, 0); SColumnInfoData *col = output.columnData;
for (int32_t i = 0; i < output.info.rows; ++i) { for (int32_t i = 0; i < output.numOfRows; ++i) {
fprintf(stderr, "%d\t%d\n" , i, *(int32_t*)(col->pData + i *sizeof(int32_t))); fprintf(stderr, "%d\t%d\n" , i, *(int32_t*)(col->pData + i *sizeof(int32_t)));
} }
teardownUdf(handle); teardownUdf(handle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册