未验证 提交 d6f6c649 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #18917 from taosdata/szhou/cenc

fix: udfd pipe close before send response
......@@ -837,9 +837,34 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
}
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
output->info.rows = input->numOfRows;
int32_t numOfRows = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows;
}
output->info.rows = numOfRows;
output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) {
if ((input+i)->numOfRows < numOfRows) {
SColumnInfoData* pColInfoData = (input+i)->columnData;
int32_t startRow = (input+i)->numOfRows;
int32_t expandRows = numOfRows - startRow;
colInfoDataEnsureCapacity(pColInfoData, numOfRows, false);
bool isNull = colDataIsNull_s(pColInfoData, (input+i)->numOfRows - 1);
if (isNull) {
colDataAppendNNULL(pColInfoData, startRow, expandRows);
} else {
char* src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1);
int32_t bytes = pColInfoData->info.bytes;
char* data = taosMemoryMalloc(bytes);
memcpy(data, src, bytes);
for (int j = 0; j < expandRows; ++j) {
colDataAppend(pColInfoData, startRow+j, data, false);
}
//colDataAppendNItems(pColInfoData, startRow, data, expandRows);
taosMemoryFree(data);
}
}
taosArrayPush(output->pDataBlock, (input + i)->columnData);
if (IS_VAR_DATA_TYPE((input + i)->columnData->info.type)) {
......
......@@ -28,39 +28,46 @@
#include "tmsg.h"
#include "trpc.h"
#include "tmisce.h"
// clang-foramt on
// clang-format on
typedef struct SUdfdContext {
uv_loop_t * loop;
uv_loop_t *loop;
uv_pipe_t ctrlPipe;
uv_signal_t intrSignal;
char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
uv_pipe_t listeningPipe;
void * clientRpc;
void *clientRpc;
SCorEpSet mgmtEp;
uv_mutex_t udfsMutex;
SHashObj * udfsHash;
SHashObj *udfsHash;
SArray* residentFuncs;
SArray *residentFuncs;
bool printVersion;
} SUdfdContext;
SUdfdContext global;
struct SUdfdUvConn;
struct SUvUdfWork;
typedef struct SUdfdUvConn {
uv_stream_t *client;
char * inputBuf;
char *inputBuf;
int32_t inputLen;
int32_t inputCap;
int32_t inputTotal;
struct SUvUdfWork *pWorkList; // head of work list
} SUdfdUvConn;
typedef struct SUvUdfWork {
uv_stream_t *client;
SUdfdUvConn *conn;
uv_buf_t input;
uv_buf_t output;
struct SUvUdfWork *pWorkNext;
} SUvUdfWork;
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState;
......@@ -70,7 +77,7 @@ typedef struct SUdf {
EUdfState state;
uv_mutex_t lock;
uv_cond_t condReady;
bool resident;
bool resident;
char name[TSDB_FUNC_NAME_LEN + 1];
int8_t funcType;
......@@ -107,7 +114,7 @@ typedef enum EUdfdRpcReqRspType {
typedef struct SUdfdRpcSendRecvInfo {
EUdfdRpcReqRspType rpcType;
int32_t code;
void * param;
void *param;
uv_sem_t resultSem;
} SUdfdRpcSendRecvInfo;
......@@ -178,7 +185,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
SUdfSetupRequest *setup = &request->setup;
int32_t code = TSDB_CODE_SUCCESS;
SUdf * udf = NULL;
SUdf *udf = NULL;
uv_mutex_lock(&global.udfsMutex);
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
if (udfInHash) {
......@@ -193,7 +200,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
uv_cond_init(&udfNew->condReady);
udf = udfNew;
SUdf** pUdf = &udf;
SUdf **pUdf = &udf;
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), pUdf, POINTER_BYTES);
uv_mutex_unlock(&global.udfsMutex);
}
......@@ -207,7 +214,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
}
udf->resident = false;
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
char* funcName = taosArrayGet(global.residentFuncs, i);
char *funcName = taosArrayGet(global.residentFuncs, i);
if (strcmp(setup->udfName, funcName) == 0) {
udf->resident = true;
break;
......@@ -248,11 +255,12 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfCallRequest *call = &request->call;
fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64 , call->callType, call->udfHandle, request->seqNum);
SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf * udf = handle->udf;
fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64, call->callType, call->udfHandle,
request->seqNum);
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf *udf = handle->udf;
SUdfResponse response = {0};
SUdfResponse * rsp = &response;
SUdfResponse *rsp = &response;
SUdfCallResponse *subRsp = &rsp->callRsp;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -352,7 +360,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfTeardownRequest *teardown = &request->teardown;
fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
SUdf * udf = handle->udf;
SUdf *udf = handle->udf;
bool unloadUdf = false;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -409,15 +417,14 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
int32_t now = taosGetTimestampSec();
int32_t delta = abs(now - connectRsp.svrTimestamp);
if (delta > 900) {
msgInfo->code = TSDB_CODE_TIME_UNSYNCED;
goto _return;
}
if (connectRsp.epSet.numOfEps == 0) {
msgInfo->code = TSDB_CODE_APP_ERROR;
goto _return;
......@@ -434,7 +441,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
goto _return;
}
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
SUdf * udf = msgInfo->param;
SUdf *udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->outputType;
......@@ -487,7 +494,7 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
taosArrayPush(retrieveReq.pFuncNames, udfName);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void * pReq = rpcMallocCont(contLen);
void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames);
......@@ -522,7 +529,7 @@ int32_t udfdConnectToMnode() {
connReq.startTime = taosGetTimestampMs();
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
void * pReq = rpcMallocCont(contLen);
void *pReq = rpcMallocCont(contLen);
tSerializeSConnectReq(pReq, contLen, &connReq);
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
......@@ -589,7 +596,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
strncpy(finishFuncName, processFuncName, sizeof(finishFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *mergeSuffix = "_merge";
strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName));
strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix));
......@@ -601,9 +608,10 @@ static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) {
return false;
}
}
return true;
} else {
return false;
......@@ -663,7 +671,7 @@ int32_t udfdOpenClientRpc() {
rpcInit.parent = &global;
rpcInit.rfp = udfdRpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client");
......@@ -684,6 +692,17 @@ void udfdOnWrite(uv_write_t *req, int status) {
if (status < 0) {
fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status));
}
// remove work from the connection work list
if (work->conn != NULL) {
SUvUdfWork **ppWork;
for (ppWork = &work->conn->pWorkList; *ppWork && (*ppWork != work); ppWork = &((*ppWork)->pWorkNext)) {
}
if (*ppWork == work) {
*ppWork = work->pWorkNext;
} else {
fnError("work not in conn any more");
}
}
taosMemoryFree(work->output.base);
taosMemoryFree(work);
taosMemoryFree(req);
......@@ -692,10 +711,11 @@ void udfdOnWrite(uv_write_t *req, int status) {
void udfdSendResponse(uv_work_t *work, int status) {
SUvUdfWork *udfWork = (SUvUdfWork *)(work->data);
uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
write_req->data = udfWork;
uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite);
if (udfWork->conn != NULL) {
uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
write_req->data = udfWork;
uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite);
}
taosMemoryFree(work);
}
......@@ -716,8 +736,8 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
buf->len = 0;
}
} else if (ctx->inputTotal == -1 && ctx->inputLen < msgHeadSize) {
buf->base = ctx->inputBuf + ctx->inputLen;
buf->len = msgHeadSize - ctx->inputLen;
buf->base = ctx->inputBuf + ctx->inputLen;
buf->len = msgHeadSize - ctx->inputLen;
} else {
ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
......@@ -744,10 +764,15 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
}
void udfdHandleRequest(SUdfdUvConn *conn) {
uv_work_t * work = taosMemoryMalloc(sizeof(uv_work_t));
char *inputBuf = conn->inputBuf;
int32_t inputLen = conn->inputLen;
uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t));
SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
udfWork->client = conn->client;
udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen);
udfWork->conn = conn;
udfWork->pWorkNext = conn->pWorkList;
conn->pWorkList = udfWork;
udfWork->input = uv_buf_init(inputBuf, inputLen);
conn->inputBuf = NULL;
conn->inputLen = 0;
conn->inputCap = 0;
......@@ -758,13 +783,19 @@ void udfdHandleRequest(SUdfdUvConn *conn) {
void udfdPipeCloseCb(uv_handle_t *pipe) {
SUdfdUvConn *conn = pipe->data;
SUvUdfWork* pWork = conn->pWorkList;
while (pWork != NULL) {
pWork->conn = NULL;
pWork = pWork->pWorkNext;
}
taosMemoryFree(conn->client);
taosMemoryFree(conn->inputBuf);
taosMemoryFree(conn);
}
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
fnDebug("udf read %zd bytes from client", nread);
fnDebug("udfd read %zd bytes from client", nread);
if (nread == 0) return;
SUdfdUvConn *conn = client->data;
......@@ -780,10 +811,10 @@ void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
}
if (nread < 0) {
fnError("Receive error %s", uv_err_name(nread));
if (nread == UV_EOF) {
// TODO check more when close
fnInfo("udfd pipe read EOF");
} else {
fnError("Receive error %s", uv_err_name(nread));
}
udfdUvHandleError(conn);
}
......@@ -799,6 +830,7 @@ void udfdOnNewConnection(uv_stream_t *server, int status) {
uv_pipe_init(global.loop, client, 0);
if (uv_accept(server, (uv_stream_t *)client) == 0) {
SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
ctx->pWorkList = NULL;
ctx->client = (uv_stream_t *)client;
ctx->inputBuf = 0;
ctx->inputLen = 0;
......@@ -891,7 +923,7 @@ static int32_t udfdUvInit() {
}
global.loop = loop;
if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit
if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit
uv_pipe_init(global.loop, &global.ctrlPipe, 1);
uv_pipe_open(&global.ctrlPipe, 0);
uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb);
......@@ -966,10 +998,10 @@ int32_t udfdInitResidentFuncs() {
}
global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
char* pSave = tsUdfdResFuncs;
char* token;
char *pSave = tsUdfdResFuncs;
char *token;
while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
char func[TSDB_FUNC_NAME_LEN+1] = {0};
char func[TSDB_FUNC_NAME_LEN + 1] = {0};
strncpy(func, token, TSDB_FUNC_NAME_LEN);
fnInfo("udfd add resident function %s", func);
taosArrayPush(global.residentFuncs, func);
......@@ -980,10 +1012,10 @@ int32_t udfdInitResidentFuncs() {
int32_t udfdDeinitResidentFuncs() {
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
char* funcName = taosArrayGet(global.residentFuncs, i);
SUdf** udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
char *funcName = taosArrayGet(global.residentFuncs, i);
SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
if (udfInHash) {
SUdf* udf = *udfInHash;
SUdf *udf = *udfInHash;
if (udf->destroyFunc) {
(udf->destroyFunc)();
}
......
......@@ -174,6 +174,7 @@
,,y,script,./test.sh -f tsim/query/scalarNull.sim
,,y,script,./test.sh -f tsim/query/session.sim
,,y,script,./test.sh -f tsim/query/udf.sim
,,y,script,./test.sh -f tsim/query/udf_with_const.sim
,,y,script,./test.sh -f tsim/qnode/basic1.sim
,,y,script,./test.sh -f tsim/snode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic1.sim
......
set +e
rm -rf /tmp/udf/libbitand.so /tmp/udf/libsqrsum.so
rm -rf /tmp/udf/libbitand.so /tmp/udf/libsqrsum.so /tmp/udf/libgpd.so
mkdir -p /tmp/udf
echo "compile udf bit_and and sqr_sum"
gcc -fPIC -shared sh/bit_and.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libbitand.so
gcc -fPIC -shared sh/l2norm.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libl2norm.so
gcc -fPIC -shared sh/gpd.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libgpd.so
echo "debug show /tmp/udf/*.so"
ls /tmp/udf/*.so
......@@ -12,13 +12,10 @@
TAOS* taos = NULL;
DLL_EXPORT int32_t gpd_init() {
taos = taos_connect("localhost", "root", "taosdata", "", 7100);
return 0;
}
DLL_EXPORT int32_t gpd_destroy() {
taos_close(taos);
taos_cleanup();
return 0;
}
......@@ -32,43 +29,18 @@ DLL_EXPORT int32_t gpd(SUdfDataBlock* block, SUdfColumn *resultCol) {
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block->numOfRows;
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
int j = 0;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i);
break;
}
}
if ( j == block->numOfCols) {
int32_t luckyNum = 88;
udfColDataSet(resultCol, i, (char *)&luckyNum, false);
}
int64_t* calc_ts = (int64_t*)udfColDataGetData(block->udfCols[0], i);
char* varTbname = udfColDataGetData(block->udfCols[1], i);
char* varDbname = udfColDataGetData(block->udfCols[2], i);
char dbName[256] = {0};
char tblName[256] = {0};
memcpy(dbName, varDataVal(varDbname), varDataLen(varDbname));
memcpy(tblName, varDataVal(varTbname), varDataLen(varTbname));
printf("%s, %s\n", dbName, tblName);
int32_t result = 0;
udfColDataSet(resultCol, i, (char*)&result, false);
}
TAOS_RES* res = taos_query(taos, "create database if not exists gpd");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
res = taos_query(taos, "create table gpd.st (ts timestamp, f int) tags(t int)");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
taos_query(taos, "insert into gpd.t using gpd.st tags(1) values(now, 1) ");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
taos_query(taos, "select * from gpd.t");
if (taos_errno(res) != 0) {
char* errstr = taos_errstr(res);
}
//to simulate actual processing delay by udf
#ifdef LINUX
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
#endif
#ifdef WINDOWS
Sleep(1);
#endif
return 0;
}
system_content printf %OS%
if $system_content == Windows_NT then
return 0;
endi
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c udf -v 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== step1 udf
system sh/compile_udf.sh
sql create database udf vgroups 3;
sql use udf;
sql create table t (ts timestamp, f int);
sql insert into t values(now, 1)(now+1s, 2)(now+2s,3)(now+3s,4)(now+4s,5)(now+5s,6)(now+6s,7);
system_content printf %OS%
if $system_content == Windows_NT then
return 0;
endi
if $system_content == Windows_NT then
sql create function gpd as 'C:\\Windows\\Temp\\gpd.dll' outputtype int bufSize 8;
else
sql create function gpd as '/tmp/udf/libgpd.so' outputtype int bufSize 8;
endi
sql show functions;
if $rows != 1 then
return -1
endi
sql select gpd(ts, tbname, 'detail') from t;
if $rows != 7 then
return -1
endi
print $data00 $data10
if $data00 != @0@ then
return -1
endi
sql drop function gpd;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册