提交 177b1c96 编写于 作者: S slzhou

udfd final program ongoing

上级 b62b1786
...@@ -59,6 +59,7 @@ extern int32_t sDebugFlag; ...@@ -59,6 +59,7 @@ extern int32_t sDebugFlag;
extern int32_t tsdbDebugFlag; extern int32_t tsdbDebugFlag;
extern int32_t tqDebugFlag; extern int32_t tqDebugFlag;
extern int32_t fsDebugFlag; extern int32_t fsDebugFlag;
extern int32_t fnDebugFlag;
int32_t taosInitLog(const char *logName, int32_t maxFiles); int32_t taosInitLog(const char *logName, int32_t maxFiles);
void taosCloseLog(); void taosCloseLog();
......
...@@ -284,6 +284,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) { ...@@ -284,6 +284,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "fnDebugFlag", fnDebugFlag, 0, 255, 0) != 0) return -1;
return 0; return 0;
} }
...@@ -464,6 +465,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) { ...@@ -464,6 +465,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
tsdbDebugFlag = cfgGetItem(pCfg, "tsdbDebugFlag")->i32; tsdbDebugFlag = cfgGetItem(pCfg, "tsdbDebugFlag")->i32;
tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32; tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32;
fsDebugFlag = cfgGetItem(pCfg, "fsDebugFlag")->i32; fsDebugFlag = cfgGetItem(pCfg, "fsDebugFlag")->i32;
fnDebugFlag = cfgGetItem(pCfg, "fnDebugFlag")->i32;
} }
static int32_t taosSetClientCfg(SConfig *pCfg) { static int32_t taosSetClientCfg(SConfig *pCfg) {
......
//
// Created by slzhou on 22-4-20.
//
#ifndef TDENGINE_FNLOG_H
#define TDENGINE_FNLOG_H
#include "tlog.h"
#ifdef __cplusplus
extern "C" {
#endif
#define fnFatal(...) { if (fnDebugFlag & DEBUG_FATAL) { taosPrintLog("FN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define fnError(...) { if (fnDebugFlag & DEBUG_ERROR) { taosPrintLog("FN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define fnWarn(...) { if (fnDebugFlag & DEBUG_WARN) { taosPrintLog("FN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define fnInfo(...) { if (fnDebugFlag & DEBUG_INFO) { taosPrintLog("FN ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define fnDebug(...) { if (fnDebugFlag & DEBUG_DEBUG) { taosPrintLog("FN ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define fnTrace(...) { if (fnDebugFlag & DEBUG_TRACE) { taosPrintLog("FN ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_FNLOG_H
...@@ -26,6 +26,9 @@ ...@@ -26,6 +26,9 @@
extern "C" { extern "C" {
#endif #endif
#define UDF_LISTEN_PIPE_NAME_LEN 32
#define UDF_LISTEN_PIPE_NAME_PREFIX "udf.sock."
//====================================================================================== //======================================================================================
//begin API to taosd and qworker //begin API to taosd and qworker
......
...@@ -12,10 +12,10 @@ ...@@ -12,10 +12,10 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "uv.h" #include "uv.h"
#include "os.h" #include "os.h"
#include "tlog.h" #include "fnLog.h"
#include "thash.h"
#include "tudf.h" #include "tudf.h"
#include "tudfInt.h" #include "tudfInt.h"
...@@ -25,336 +25,381 @@ ...@@ -25,336 +25,381 @@
#include "tmsg.h" #include "tmsg.h"
#include "trpc.h" #include "trpc.h"
static uv_loop_t *loop; typedef struct SUdfdContext {
uv_loop_t *loop;
char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN];
void *clientRpc;
uv_mutex_t udfsMutex;
SHashObj* udfsHash;
bool printVersion;
} SUdfdContext;
SUdfdContext global;
typedef struct SUdfdUvConn { typedef struct SUdfdUvConn {
uv_stream_t *client; uv_stream_t *client;
char *inputBuf; char *inputBuf;
int32_t inputLen; int32_t inputLen;
int32_t inputCap; int32_t inputCap;
int32_t inputTotal; int32_t inputTotal;
} SUdfdUvConn; } SUdfdUvConn;
typedef struct SUvUdfWork { typedef struct SUvUdfWork {
uv_stream_t *client; uv_stream_t *client;
uv_buf_t input; uv_buf_t input;
uv_buf_t output; uv_buf_t output;
} SUvUdfWork; } SUvUdfWork;
typedef struct SUdf { typedef enum {
int32_t refCount; UDF_STATE_INIT = 0,
UDF_STATE_LOADING,
UDF_STATE_READY,
UDF_STATE_UNLOADING
} EUdfState;
char name[16]; typedef struct SUdf {
int8_t type; int32_t refCount;
EUdfState state;
uv_lib_t lib; uv_mutex_t lock;
TUdfScalarProcFunc scalarProcFunc; uv_cond_t condReady;
TUdfFreeUdfColumnFunc freeUdfColumn;
char name[16];
int8_t type;
char path[PATH_MAX];
uv_lib_t lib;
TUdfScalarProcFunc scalarProcFunc;
TUdfFreeUdfColumnFunc freeUdfColumn;
} SUdf; } SUdf;
//TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix // TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
//TODO: add private udf structure. // TODO: add private udf structure.
typedef struct SUdfHandle { typedef struct SUdfHandle {
SUdf *udf; SUdf *udf;
} SUdfHandle; } SUdfHandle;
int32_t udfdLoadUdf(char* udfName, SUdf* udf) {
strcpy(udf->name, udfName);
void udfdProcessRequest(uv_work_t *req) { // TODO: retrive udf info from mnode
SUvUdfWork *uvUdf = (SUvUdfWork *) (req->data); char *path = "libudf1.so";
SUdfRequest request = {0}; int err = uv_dlopen(path, &udf->lib);
decodeUdfRequest(uvUdf->input.base, &request); if (err != 0) {
fnError("can not load library %s. error: %s", path, uv_strerror(err));
switch (request.type) { // TODO set error
case UDF_TASK_SETUP: { }
debugPrint("%s", "process setup request");
SUdf *udf = taosMemoryMalloc(sizeof(SUdf));
udf->refCount = 0;
SUdfSetupRequest *setup = &request.setup;
strcpy(udf->name, setup->udfName);
//TODO: retrive udf info from mnode
char* path = "libudf1.so";
int err = uv_dlopen(path, &udf->lib);
if (err != 0) {
debugPrint("can not load library %s. error: %s", path, uv_strerror(err));
//TODO set error
}
char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(normalFuncName, setup->udfName);
//TODO error, multi-thread, same udf, lock it
//TODO find all functions normal, init, destroy, normal, merge, finalize
uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->scalarProcFunc));
char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *freeSuffix = "_free";
strncpy(freeFuncName, normalFuncName, strlen(normalFuncName));
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle));
handle->udf = udf;
udf->refCount++;
//TODO: allocate private structure and call init function and set it to handle
SUdfResponse rsp;
rsp.seqNum = request.seqNum;
rsp.type = request.type;
rsp.code = 0;
rsp.setupRsp.udfHandle = (int64_t) (handle);
int32_t len = encodeUdfResponse(NULL, &rsp);
rsp.msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, &rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base);
break;
}
case UDF_TASK_CALL: { char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
debugPrint("%s", "process call request"); strcpy(normalFuncName, udfName);
SUdfCallRequest *call = &request.call; // TODO error, multi-thread, same udf, lock it
SUdfHandle *handle = (SUdfHandle *) (call->udfHandle); // TODO find all functions normal, init, destroy, normal, merge, finalize
SUdf *udf = handle->udf; uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc));
char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
SUdfDataBlock input = {0}; char *freeSuffix = "_free";
convertDataBlockToUdfDataBlock(&call->block, &input); strncpy(freeFuncName, normalFuncName, strlen(normalFuncName));
SUdfColumn output = {0}; strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
//TODO: call different functions according to call type, for now just calar uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) { return 0;
udf->scalarProcFunc(input, &output); }
}
SUdfResponse response = {0};
SUdfResponse *rsp = &response;
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
rsp->seqNum = request.seqNum;
rsp->type = request.type;
rsp->code = 0;
SUdfCallResponse *subRsp = &rsp->callRsp;
subRsp->callType = call->callType;
convertUdfColumnToDataBlock(&output, &subRsp->resultData);
}
int32_t len = encodeUdfResponse(NULL, rsp);
rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
//TODO: free
udf->freeUdfColumn(&output);
taosMemoryFree(uvUdf->input.base);
break;
}
case UDF_TASK_TEARDOWN: {
debugPrint("%s", "process teardown request");
SUdfTeardownRequest *teardown = &request.teardown;
SUdfHandle *handle = (SUdfHandle *) (teardown->udfHandle);
SUdf *udf = handle->udf;
udf->refCount--;
if (udf->refCount == 0) {
uv_dlclose(&udf->lib);
taosMemoryFree(udf);
}
//TODO: call destroy and free udf private
taosMemoryFree(handle);
SUdfResponse response;
SUdfResponse *rsp = &response;
rsp->seqNum = request.seqNum;
rsp->type = request.type;
rsp->code = 0;
int32_t len = encodeUdfResponse(NULL, rsp);
rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base);
break;
}
default: {
break;
}
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
SUdfRequest request = {0};
decodeUdfRequest(uvUdf->input.base, &request);
switch (request.type) {
case UDF_TASK_SETUP: {
//TODO: tracable id from client. connect, setup, call, teardown
fnInfo("%"PRId64" setup request. udf name: %s", request.seqNum, request.setup.udfName);
SUdfSetupRequest *setup = &request.setup;
SUdf* udf = NULL;
uv_mutex_lock(&global.udfsMutex);
SUdf** udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN);
if (*udfInHash) {
++(*udfInHash)->refCount;
udf = *udfInHash;
uv_mutex_unlock(&global.udfsMutex);
} else {
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
udfNew->refCount = 1;
udfNew->state = UDF_STATE_INIT;
uv_mutex_init(&udfNew->lock);
uv_cond_init(&udfNew->condReady);
udf = udfNew;
taosHashPut(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN, &udfNew, sizeof(&udfNew));
uv_mutex_unlock(&global.udfsMutex);
}
uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING;
udfdLoadUdf(setup->udfName, udf);
udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock);
} else {
while (udf->state != UDF_STATE_READY) {
uv_cond_wait(&udf->condReady, &udf->lock);
}
uv_mutex_unlock(&udf->lock);
}
SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle));
handle->udf = udf;
// TODO: allocate private structure and call init function and set it to handle
SUdfResponse rsp;
rsp.seqNum = request.seqNum;
rsp.type = request.type;
rsp.code = 0;
rsp.setupRsp.udfHandle = (int64_t)(handle);
int32_t len = encodeUdfResponse(NULL, &rsp);
rsp.msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, &rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base);
break;
} }
case UDF_TASK_CALL: {
SUdfCallRequest *call = &request.call;
fnDebug("%"PRId64 "call request. call type %d, handle: %"PRIx64, request.seqNum, call->callType, call->udfHandle);
SUdfHandle *handle = (SUdfHandle *)(call->udfHandle);
SUdf *udf = handle->udf;
SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input);
SUdfColumn output = {0};
// TODO: call different functions according to call type, for now just calar
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
udf->scalarProcFunc(input, &output);
}
SUdfResponse response = {0};
SUdfResponse *rsp = &response;
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
rsp->seqNum = request.seqNum;
rsp->type = request.type;
rsp->code = 0;
SUdfCallResponse *subRsp = &rsp->callRsp;
subRsp->callType = call->callType;
convertUdfColumnToDataBlock(&output, &subRsp->resultData);
}
int32_t len = encodeUdfResponse(NULL, rsp);
rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
// TODO: free udf column
udf->freeUdfColumn(&output);
taosMemoryFree(uvUdf->input.base);
break;
}
case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = &request.teardown;
fnInfo("teardown. %"PRId64"handle:%"PRIx64, request.seqNum, teardown->udfHandle)
SUdfHandle *handle = (SUdfHandle *)(teardown->udfHandle);
SUdf *udf = handle->udf;
bool unloadUdf = false;
uv_mutex_lock(&global.udfsMutex);
udf->refCount--;
if (udf->refCount == 0) {
unloadUdf = true;
taosHashRemove(global.udfsHash, udf->name, TSDB_FUNC_NAME_LEN);
}
uv_mutex_unlock(&global.udfsMutex);
if (unloadUdf) {
uv_cond_destroy(&udf->condReady);
uv_mutex_destroy(&udf->lock);
uv_dlclose(&udf->lib);
taosMemoryFree(udf);
}
// TODO: call destroy and free udf private
taosMemoryFree(handle);
SUdfResponse response;
SUdfResponse *rsp = &response;
rsp->seqNum = request.seqNum;
rsp->type = request.type;
rsp->code = 0;
int32_t len = encodeUdfResponse(NULL, rsp);
rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base);
break;
}
default: {
break;
}
}
} }
void udfdOnWrite(uv_write_t *req, int status) { void udfdOnWrite(uv_write_t *req, int status) {
debugPrint("%s", "server after writing to pipe"); SUvUdfWork *work = (SUvUdfWork *)req->data;
if (status < 0) { if (status < 0) {
debugPrint("Write error %s", uv_err_name(status)); //TODO:log error and process it.
} }
SUvUdfWork *work = (SUvUdfWork *) req->data; fnDebug("send response. length:%zu, status: %s", work->output.len, uv_err_name(status));
debugPrint("\tlength: %zu", work->output.len); taosMemoryFree(work->output.base);
taosMemoryFree(work->output.base); taosMemoryFree(work);
taosMemoryFree(work); taosMemoryFree(req);
taosMemoryFree(req);
} }
void udfdSendResponse(uv_work_t *work, int status) { void udfdSendResponse(uv_work_t *work, int status) {
debugPrint("%s", "send response"); SUvUdfWork *udfWork = (SUvUdfWork *)(work->data);
SUvUdfWork *udfWork = (SUvUdfWork *) (work->data);
uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t)); uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
write_req->data = udfWork; write_req->data = udfWork;
uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite); uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite);
taosMemoryFree(work); taosMemoryFree(work);
} }
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
debugPrint("%s", "server allocate buffer for read"); SUdfdUvConn *ctx = handle->data;
SUdfdUvConn *ctx = handle->data; int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t); if (ctx->inputCap == 0) {
if (ctx->inputCap == 0) { ctx->inputBuf = taosMemoryMalloc(msgHeadSize);
ctx->inputBuf = taosMemoryMalloc(msgHeadSize); if (ctx->inputBuf) {
if (ctx->inputBuf) { ctx->inputLen = 0;
ctx->inputLen = 0; ctx->inputCap = msgHeadSize;
ctx->inputCap = msgHeadSize; ctx->inputTotal = -1;
ctx->inputTotal = -1;
buf->base = ctx->inputBuf;
buf->base = ctx->inputBuf; buf->len = ctx->inputCap;
buf->len = ctx->inputCap;
} else {
//TODO: log error
buf->base = NULL;
buf->len = 0;
}
} else { } else {
ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap; // TODO: log error
void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap); buf->base = NULL;
if (inputBuf) { buf->len = 0;
ctx->inputBuf = inputBuf;
buf->base = ctx->inputBuf + ctx->inputLen;
buf->len = ctx->inputCap - ctx->inputLen;
} else {
//TODO: log error
buf->base = NULL;
buf->len = 0;
}
} }
debugPrint("\tinput buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal); } else {
ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
if (inputBuf) {
ctx->inputBuf = inputBuf;
buf->base = ctx->inputBuf + ctx->inputLen;
buf->len = ctx->inputCap - ctx->inputLen;
} else {
// TODO: log error
buf->base = NULL;
buf->len = 0;
}
}
fnDebug("allocate buf. input buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal);
} }
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) { bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) { if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
pipe->inputTotal = *(int32_t *) (pipe->inputBuf); pipe->inputTotal = *(int32_t *)(pipe->inputBuf);
} }
if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) { if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) {
return true; fnDebug("receive request complete. length %d", pipe->inputLen);
} return true;
return false; }
return false;
} }
void udfdHandleRequest(SUdfdUvConn *conn) { void udfdHandleRequest(SUdfdUvConn *conn) {
uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t)); uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t));
SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork)); SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
udfWork->client = conn->client; udfWork->client = conn->client;
udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen); udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen);
conn->inputBuf = NULL; conn->inputBuf = NULL;
conn->inputLen = 0; conn->inputLen = 0;
conn->inputCap = 0; conn->inputCap = 0;
conn->inputTotal = -1; conn->inputTotal = -1;
work->data = udfWork; work->data = udfWork;
uv_queue_work(loop, work, udfdProcessRequest, udfdSendResponse); uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse);
} }
void udfdPipeCloseCb(uv_handle_t *pipe) { void udfdPipeCloseCb(uv_handle_t *pipe) {
SUdfdUvConn *conn = pipe->data; SUdfdUvConn *conn = pipe->data;
taosMemoryFree(conn->client); taosMemoryFree(conn->client);
taosMemoryFree(conn->inputBuf); taosMemoryFree(conn->inputBuf);
taosMemoryFree(conn); taosMemoryFree(conn);
} }
void udfdUvHandleError(SUdfdUvConn *conn) { void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
uv_close((uv_handle_t *) conn->client, udfdPipeCloseCb);
}
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
debugPrint("%s, nread: %zd", "read from pipe", nread); fnDebug("udf read %zu bytes from client", nread);
if (nread == 0) return;
if (nread == 0) return;
SUdfdUvConn *conn = client->data; SUdfdUvConn *conn = client->data;
if (nread > 0) { if (nread > 0) {
conn->inputLen += nread; conn->inputLen += nread;
if (isUdfdUvMsgComplete(conn)) { if (isUdfdUvMsgComplete(conn)) {
udfdHandleRequest(conn); udfdHandleRequest(conn);
} else { } else {
//log error or continue; // log error or continue;
}
return;
} }
return;
}
if (nread < 0) { if (nread < 0) {
debugPrint("Read error %s", uv_err_name(nread)); fnDebug("Receive error %s", uv_err_name(nread));
if (nread == UV_EOF) { if (nread == UV_EOF) {
//TODO check more when close // TODO check more when close
} else { } else {
}
udfdUvHandleError(conn);
} }
udfdUvHandleError(conn);
}
} }
void udfdOnNewConnection(uv_stream_t *server, int status) { void udfdOnNewConnection(uv_stream_t *server, int status) {
debugPrint("%s", "on new connection"); fnDebug("new connection");
if (status < 0) { if (status < 0) {
// TODO // TODO
return; return;
} }
uv_pipe_t *client = (uv_pipe_t *) taosMemoryMalloc(sizeof(uv_pipe_t)); uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t));
uv_pipe_init(loop, client, 0); uv_pipe_init(global.loop, client, 0);
if (uv_accept(server, (uv_stream_t *) client) == 0) { if (uv_accept(server, (uv_stream_t *)client) == 0) {
SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn)); SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
ctx->client = (uv_stream_t *) client; ctx->client = (uv_stream_t *)client;
ctx->inputBuf = 0; ctx->inputBuf = 0;
ctx->inputLen = 0; ctx->inputLen = 0;
ctx->inputCap = 0; ctx->inputCap = 0;
client->data = ctx; client->data = ctx;
ctx->client = (uv_stream_t *) client; ctx->client = (uv_stream_t *)client;
uv_read_start((uv_stream_t *) client, udfdAllocBuffer, udfdPipeRead); uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead);
} else { } else {
uv_close((uv_handle_t *) client, NULL); uv_close((uv_handle_t *)client, NULL);
} }
} }
void removeListeningPipe(int sig) { void removeListeningPipe(int sig) {
uv_fs_t req; uv_fs_t req;
uv_fs_unlink(loop, &req, "udf.sock", NULL); uv_fs_unlink(global.loop, &req, "udf.sock", NULL);
exit(0); exit(0);
} }
typedef struct SServerContext { void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; }
void *clientRpc;
} SUdfdContext;
void udfdProcessRpcRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char* udfName, SUdf* udf) {
return;
}
int32_t fetchUdfFuncInfo(void *clientRpc, SEpSet* pEpSet, char* udfNames[], int32_t numOfUdfs) {
SRetrieveFuncReq retrieveReq = {0}; SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1; retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
for (int32_t i = 0; i < numOfUdfs; ++i) { taosArrayPush(retrieveReq.pFuncNames, udfName);
taosArrayPush(retrieveReq.pFuncNames, udfNames[i]);
}
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void* pReq = rpcMallocCont(contLen); void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames); taosArrayDestroy(retrieveReq.pFuncNames);
...@@ -368,66 +413,176 @@ int32_t fetchUdfFuncInfo(void *clientRpc, SEpSet* pEpSet, char* udfNames[], int3 ...@@ -368,66 +413,176 @@ int32_t fetchUdfFuncInfo(void *clientRpc, SEpSet* pEpSet, char* udfNames[], int3
SRetrieveFuncRsp retrieveRsp = {0}; SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp); tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp);
SFuncInfo* pFuncInfo = (SFuncInfo*)taosArrayGet(retrieveRsp.pFuncInfos, 0); SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
char path[PATH_MAX] = {0};
taosGetTmpfilePath("/tmp", "libudf", path);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
taosArrayDestroy(retrieveRsp.pFuncInfos); taosArrayDestroy(retrieveRsp.pFuncInfos);
rpcFreeCont(rpcRsp.pCont); rpcFreeCont(rpcRsp.pCont);
return 0; return 0;
} }
int32_t openUdfdClientRpc(SUdfdContext *ctx) { int32_t udfdOpenClientRpc() {
char *pass = "taosdata"; char *pass = "taosdata";
char *user = "root"; char *user = "root";
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0}; char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt); taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
SRpcInit rpcInit = {0}; SRpcInit rpcInit = {0};
rpcInit.label = (char*)"UDFD"; rpcInit.label = (char *)"UDFD";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = udfdProcessRpcRsp; rpcInit.cfp = udfdProcessRpcRsp;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 30 * 1000; rpcInit.idleTime = 30 * 1000;
rpcInit.parent = ctx; rpcInit.parent = &global;
rpcInit.user = (char*)user; rpcInit.user = (char *)user;
rpcInit.ckey = (char*)"key"; rpcInit.ckey = (char *)"key";
rpcInit.secret = (char*)secretEncrypt; rpcInit.secret = (char *)secretEncrypt;
rpcInit.spi = 1; rpcInit.spi = 1;
ctx->clientRpc = rpcOpen(&rpcInit); global.clientRpc = rpcOpen(&rpcInit);
return 0;
}
int32_t udfdCloseClientRpc() {
rpcClose(global.clientRpc);
return 0;
}
static void udfdPrintVersion() {
#ifdef TD_ENTERPRISE
char *releaseName = "enterprise";
#else
char *releaseName = "community";
#endif
printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
printf("buildInfo: %s\n", buildinfo);
}
static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) {
if (strlen(argv[++i]) >= PATH_MAX) {
printf("config file path overflow");
return -1;
}
tstrncpy(configDir, argv[i], PATH_MAX);
} else {
printf("'-c' requires a parameter, default is %s\n", configDir);
return -1;
}
} else if (strcmp(argv[i], "-V") == 0) {
global.printVersion = true;
} else {
}
}
return 0; return 0;
} }
int32_t closeUdfdClientRpc(SUdfdContext *ctx) { static int32_t udfdInitLog() {
rpcClose(ctx->clientRpc); char logName[12] = {0};
snprintf(logName, sizeof(logName), "%slog", "udfd");
return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, 0);
}
static int32_t udfdUvInit() {
uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t));
if (loop) {
uv_loop_init(loop);
}
global.loop = loop;
char dnodeId[8] = {0};
size_t dnodeIdSize;
uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize);
char listenPipeName[32] = {0};
snprintf(listenPipeName, sizeof(listenPipeName), "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
strcpy(global.listenPipeName, listenPipeName);
uv_fs_t req;
uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
uv_pipe_t server;
uv_pipe_init(global.loop, &server, 0);
signal(SIGINT, removeListeningPipe);
int r;
fnInfo("bind to pipe %s", global.listenPipeName);
if ((r = uv_pipe_bind(&server, listenPipeName))) {
fnError("Bind error %s", uv_err_name(r));
removeListeningPipe(0);
return -1;
}
if ((r = uv_listen((uv_stream_t *)&server, 128, udfdOnNewConnection))) {
fnError("Listen error %s", uv_err_name(r));
removeListeningPipe(0);
return -2;
}
return 0; return 0;
} }
int main() { static int32_t udfdRun() {
debugPrint("libuv version: %x", UV_VERSION_HEX); global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
uv_mutex_init(&global.udfsMutex);
loop = uv_default_loop(); //TOOD: client rpc to fetch udf function info from mnode
uv_fs_t req; if (udfdOpenClientRpc() != 0) {
uv_fs_unlink(loop, &req, "udf.sock", NULL); fnError("open rpc connection to mnode failure");
return -1;
}
uv_pipe_t server; if (udfdUvInit() != 0) {
uv_pipe_init(loop, &server, 0); fnError("uv init failure");
return -2;
}
signal(SIGINT, removeListeningPipe); fnInfo("start the udfd");
int code = uv_run(global.loop, UV_RUN_DEFAULT);
fnInfo("udfd stopped. result: %s", uv_err_name(code));
int codeClose = uv_loop_close(global.loop);
fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
udfdCloseClientRpc();
uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash);
return code;
}
int r; int main(int argc, char* argv[]) {
if ((r = uv_pipe_bind(&server, "udf.sock"))) { if (!taosCheckSystemIsSmallEnd()) {
debugPrint("Bind error %s\n", uv_err_name(r)); printf("failed to start since on non-small-end machines\n");
removeListeningPipe(0); return -1;
return 1; }
}
if ((r = uv_listen((uv_stream_t *) &server, 128, udfdOnNewConnection))) { if (udfdParseArgs(argc, argv) != 0) {
debugPrint("Listen error %s", uv_err_name(r)); printf("failed to start since parse args error\n");
return 2; return -1;
} }
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop); if (global.printVersion) {
udfdPrintVersion();
return 0;
}
if (udfdInitLog() != 0) {
printf("failed to start since init log error\n");
return -1;
}
if (taosInitCfg(configDir, NULL, NULL, NULL, 0) != 0) {
fnError("failed to start since read config error");
return -1;
}
return udfdRun();
} }
...@@ -91,6 +91,7 @@ int32_t sDebugFlag = 135; ...@@ -91,6 +91,7 @@ int32_t sDebugFlag = 135;
int32_t tsdbDebugFlag = 131; int32_t tsdbDebugFlag = 131;
int32_t tqDebugFlag = 135; int32_t tqDebugFlag = 135;
int32_t fsDebugFlag = 135; int32_t fsDebugFlag = 135;
int32_t fnDebugFlag = 135;
int64_t dbgEmptyW = 0; int64_t dbgEmptyW = 0;
int64_t dbgWN = 0; int64_t dbgWN = 0;
...@@ -752,6 +753,7 @@ void taosSetAllDebugFlag(int32_t flag) { ...@@ -752,6 +753,7 @@ void taosSetAllDebugFlag(int32_t flag) {
tsdbDebugFlag = flag; tsdbDebugFlag = flag;
tqDebugFlag = flag; tqDebugFlag = flag;
fsDebugFlag = flag; fsDebugFlag = flag;
fnDebugFlag = flag;
uInfo("all debug flag are set to %d", flag); uInfo("all debug flag are set to %d", flag);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册