未验证 提交 5ce62201 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20781 from taosdata/szhou/replace-function

feature: udf replace function
...@@ -1093,9 +1093,15 @@ typedef struct { ...@@ -1093,9 +1093,15 @@ typedef struct {
char* pCode; char* pCode;
} SFuncInfo; } SFuncInfo;
typedef struct {
int32_t funcVersion;
int64_t funcCreatedTime;
} SFuncExtraInfo;
typedef struct { typedef struct {
int32_t numOfFuncs; int32_t numOfFuncs;
SArray* pFuncInfos; SArray* pFuncInfos;
SArray* pFuncExtraInfos;
} SRetrieveFuncRsp; } SRetrieveFuncRsp;
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
......
...@@ -276,6 +276,8 @@ typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, UDF_FUNC_TYPE_AGG = 2 } EU ...@@ -276,6 +276,8 @@ typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, UDF_FUNC_TYPE_AGG = 2 } EU
typedef struct SScriptUdfInfo { typedef struct SScriptUdfInfo {
const char *name; const char *name;
int32_t version;
int64_t createdTime;
EUdfFuncType funcType; EUdfFuncType funcType;
int8_t scriptType; int8_t scriptType;
......
...@@ -116,6 +116,7 @@ static const SSysDbTableSchema userFuncSchema[] = { ...@@ -116,6 +116,7 @@ static const SSysDbTableSchema userFuncSchema[] = {
{.name = "bufsize", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "bufsize", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "func_language", .bytes = TSDB_TYPE_STR_MAX_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "func_language", .bytes = TSDB_TYPE_STR_MAX_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "func_body", .bytes = TSDB_MAX_BINARY_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "func_body", .bytes = TSDB_MAX_BINARY_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "func_version", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
}; };
static const SSysDbTableSchema userIdxSchema[] = { static const SSysDbTableSchema userIdxSchema[] = {
......
...@@ -1702,6 +1702,7 @@ int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq ...@@ -1702,6 +1702,7 @@ int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq
if (tEncodeCStr(&encoder, pReq->pComment) < 0) return -1; if (tEncodeCStr(&encoder, pReq->pComment) < 0) return -1;
} }
if (tEncodeI8(&encoder, pReq->orReplace) < 0) return -1; if (tEncodeI8(&encoder, pReq->orReplace) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -1746,6 +1747,7 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR ...@@ -1746,6 +1747,7 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR
if (tDecodeCStrTo(&decoder, pReq->pComment) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->pComment) < 0) return -1;
} }
if (!tDecodeIsEnd(&decoder)) { if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI8(&decoder, &pReq->orReplace) < 0) return -1; if (tDecodeI8(&decoder, &pReq->orReplace) < 0) return -1;
} else { } else {
...@@ -1863,6 +1865,13 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp * ...@@ -1863,6 +1865,13 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp *
} }
} }
if (pRsp->numOfFuncs != (int32_t)taosArrayGetSize(pRsp->pFuncExtraInfos)) return -1;
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
SFuncExtraInfo *extraInfo = taosArrayGet(pRsp->pFuncExtraInfos, i);
if (tEncodeI32(&encoder, extraInfo->funcVersion) < 0) return -1;
if (tEncodeI64(&encoder, extraInfo->funcCreatedTime) < 0) return -1;
}
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -1910,6 +1919,22 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp ...@@ -1910,6 +1919,22 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp
taosArrayPush(pRsp->pFuncInfos, &fInfo); taosArrayPush(pRsp->pFuncInfos, &fInfo);
} }
pRsp->pFuncExtraInfos = taosArrayInit(pRsp->numOfFuncs, sizeof(SFuncExtraInfo));
if (pRsp->pFuncExtraInfos == NULL) return -1;
if (tDecodeIsEnd(&decoder)) {
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
SFuncExtraInfo extraInfo = { 0 };
taosArrayPush(pRsp->pFuncExtraInfos, &extraInfo);
}
} else {
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
SFuncExtraInfo extraInfo = { 0 };
if (tDecodeI32(&decoder, &extraInfo.funcVersion) < 0) return -1;
if (tDecodeI64(&decoder, &extraInfo.funcCreatedTime) < 0) return -1;
taosArrayPush(pRsp->pFuncExtraInfos, &extraInfo);
}
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -1932,6 +1957,7 @@ void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp *pRsp) { ...@@ -1932,6 +1957,7 @@ void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp *pRsp) {
tFreeSFuncInfo(pInfo); tFreeSFuncInfo(pInfo);
} }
taosArrayDestroy(pRsp->pFuncInfos); taosArrayDestroy(pRsp->pFuncInfos);
taosArrayDestroy(pRsp->pFuncExtraInfos);
} }
int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq) { int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq) {
......
...@@ -447,6 +447,8 @@ typedef struct { ...@@ -447,6 +447,8 @@ typedef struct {
int32_t codeSize; int32_t codeSize;
char* pComment; char* pComment;
char* pCode; char* pCode;
int32_t funcVersion;
SRWLatch lock;
} SFuncObj; } SFuncObj;
typedef struct { typedef struct {
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#define SDB_FUNC_VER 1 #define SDB_FUNC_VER 2
#define SDB_FUNC_RESERVE_SIZE 64 #define SDB_FUNC_RESERVE_SIZE 64
static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc); static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc);
...@@ -83,6 +83,7 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) { ...@@ -83,6 +83,7 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) {
SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER) SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
} }
SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER) SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->funcVersion, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER) SDB_SET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER); SDB_SET_DATALEN(pRaw, dataPos, _OVER);
...@@ -107,7 +108,7 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { ...@@ -107,7 +108,7 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != SDB_FUNC_VER) { if (sver != 1 && sver != 2) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER; goto _OVER;
} }
...@@ -144,8 +145,15 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { ...@@ -144,8 +145,15 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
goto _OVER; goto _OVER;
} }
SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER) SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
if (sver >= 2) {
SDB_GET_INT32(pRaw, dataPos, &pFunc->funcVersion, _OVER)
}
SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER) SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pFunc->lock);
terrno = 0; terrno = 0;
_OVER: _OVER:
...@@ -173,6 +181,44 @@ static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) { ...@@ -173,6 +181,44 @@ static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) {
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) { static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) {
mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew); mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
taosWLockLatch(&pOld->lock);
pOld->align = pNew->align;
pOld->bufSize = pNew->bufSize;
pOld->codeSize = pNew->codeSize;
pOld->commentSize = pNew->commentSize;
pOld->createdTime = pNew->createdTime;
pOld->funcType = pNew->funcType;
pOld->funcVersion = pNew->funcVersion;
pOld->outputLen = pNew->outputLen;
pOld->outputType = pNew->outputType;
if (pOld->pComment != NULL) {
taosMemoryFree(pOld->pComment);
pOld->pComment = NULL;
}
if (pNew->commentSize > 0 && pNew->pComment != NULL) {
pOld->commentSize = pNew->commentSize;
pOld->pComment = taosMemoryMalloc(pOld->commentSize);
memcpy(pOld->pComment, pNew->pComment, pOld->commentSize);
}
if (pOld->pCode != NULL) {
taosMemoryFree(pOld->pCode);
pOld->pCode = NULL;
}
if (pNew->codeSize > 0 && pNew->pCode != NULL) {
pOld->codeSize = pNew->codeSize;
pOld->pCode = taosMemoryMalloc(pOld->codeSize);
memcpy(pOld->pCode, pNew->pCode, pOld->codeSize);
}
pOld->scriptType = pNew->scriptType;
pOld->signature = pNew->signature;
taosWUnLockLatch(&pOld->lock);
return 0; return 0;
} }
...@@ -225,26 +271,47 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre ...@@ -225,26 +271,47 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-func"); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-func");
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mInfo("trans:%d, used to create func:%s", pTrans->id, pCreate->name); mInfo("trans:%d, used to create func:%s", pTrans->id, pCreate->name);
SSdbRaw *pRedoRaw = mndFuncActionEncode(&func); SFuncObj *oldFunc = mndAcquireFunc(pMnode, pCreate->name);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER; if (pCreate->orReplace == 1 && oldFunc != NULL) {
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto _OVER; func.funcVersion = oldFunc->funcVersion + 1;
func.createdTime = oldFunc->createdTime;
SSdbRaw *pRedoRaw = mndFuncActionEncode(oldFunc);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) goto _OVER;
SSdbRaw *pUndoRaw = mndFuncActionEncode(oldFunc);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto _OVER;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) goto _OVER;
SSdbRaw *pCommitRaw = mndFuncActionEncode(&func);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto _OVER;
} else {
SSdbRaw *pRedoRaw = mndFuncActionEncode(&func);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto _OVER;
SSdbRaw *pUndoRaw = mndFuncActionEncode(&func); SSdbRaw *pUndoRaw = mndFuncActionEncode(&func);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto _OVER; if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto _OVER;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) goto _OVER; if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) goto _OVER;
SSdbRaw *pCommitRaw = mndFuncActionEncode(&func); SSdbRaw *pCommitRaw = mndFuncActionEncode(&func);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER; if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto _OVER; if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto _OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
_OVER: _OVER:
if (oldFunc != NULL) {
mndReleaseFunc(pMnode, oldFunc);
}
taosMemoryFree(func.pCode); taosMemoryFree(func.pCode);
taosMemoryFree(func.pComment); taosMemoryFree(func.pComment);
mndTransDrop(pTrans); mndTransDrop(pTrans);
...@@ -304,6 +371,9 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) { ...@@ -304,6 +371,9 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
mInfo("func:%s, already exist, ignore exist is set", createReq.name); mInfo("func:%s, already exist, ignore exist is set", createReq.name);
code = 0; code = 0;
goto _OVER; goto _OVER;
} else if (createReq.orReplace) {
mInfo("func:%s, replace function is set", createReq.name);
code = 0;
} else { } else {
terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST; terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST;
goto _OVER; goto _OVER;
...@@ -413,6 +483,12 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) { ...@@ -413,6 +483,12 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) {
goto RETRIEVE_FUNC_OVER; goto RETRIEVE_FUNC_OVER;
} }
retrieveRsp.pFuncExtraInfos = taosArrayInit(retrieveReq.numOfFuncs, sizeof(SFuncExtraInfo));
if (retrieveRsp.pFuncExtraInfos == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto RETRIEVE_FUNC_OVER;
}
for (int32_t i = 0; i < retrieveReq.numOfFuncs; ++i) { for (int32_t i = 0; i < retrieveReq.numOfFuncs; ++i) {
char *funcName = taosArrayGet(retrieveReq.pFuncNames, i); char *funcName = taosArrayGet(retrieveReq.pFuncNames, i);
...@@ -451,6 +527,11 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) { ...@@ -451,6 +527,11 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) {
} }
} }
taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo); taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo);
SFuncExtraInfo extraInfo = {0};
extraInfo.funcVersion = pFunc->funcVersion;
extraInfo.funcCreatedTime = pFunc->createdTime;
taosArrayPush(retrieveRsp.pFuncExtraInfos, &extraInfo);
mndReleaseFunc(pMnode, pFunc); mndReleaseFunc(pMnode, pFunc);
} }
...@@ -547,7 +628,7 @@ static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl ...@@ -547,7 +628,7 @@ static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
colDataSetVal(pColInfo, numOfRows, (const char *)&pFunc->bufSize, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pFunc->bufSize, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char* language = ""; char *language = "";
if (pFunc->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { if (pFunc->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
language = "C"; language = "C";
} else if (pFunc->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { } else if (pFunc->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
...@@ -559,13 +640,18 @@ static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl ...@@ -559,13 +640,18 @@ static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
colDataSetVal(pColInfo, numOfRows, (const char *)varLang, false); colDataSetVal(pColInfo, numOfRows, (const char *)varLang, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
int32_t varCodeLen = (pFunc->codeSize + VARSTR_HEADER_SIZE) > TSDB_MAX_BINARY_LEN ? TSDB_MAX_BINARY_LEN : pFunc->codeSize + VARSTR_HEADER_SIZE; int32_t varCodeLen = (pFunc->codeSize + VARSTR_HEADER_SIZE) > TSDB_MAX_BINARY_LEN
char *b4 = taosMemoryMalloc(varCodeLen); ? TSDB_MAX_BINARY_LEN
: pFunc->codeSize + VARSTR_HEADER_SIZE;
char *b4 = taosMemoryMalloc(varCodeLen);
memcpy(varDataVal(b4), pFunc->pCode, varCodeLen - VARSTR_HEADER_SIZE); memcpy(varDataVal(b4), pFunc->pCode, varCodeLen - VARSTR_HEADER_SIZE);
varDataSetLen(b4, varCodeLen - VARSTR_HEADER_SIZE); varDataSetLen(b4, varCodeLen - VARSTR_HEADER_SIZE);
colDataSetVal(pColInfo, numOfRows, (const char*)b4, false); colDataSetVal(pColInfo, numOfRows, (const char *)b4, false);
taosMemoryFree(b4); taosMemoryFree(b4);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pFunc->funcVersion, false);
numOfRows++; numOfRows++;
sdbRelease(pSdb, pFunc); sdbRelease(pSdb, pFunc);
} }
......
...@@ -672,12 +672,17 @@ void ctgTestRspUdfInfo(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pR ...@@ -672,12 +672,17 @@ void ctgTestRspUdfInfo(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pR
SRetrieveFuncRsp funcRsp = {0}; SRetrieveFuncRsp funcRsp = {0};
funcRsp.numOfFuncs = 1; funcRsp.numOfFuncs = 1;
funcRsp.pFuncInfos = taosArrayInit(1, sizeof(SFuncInfo)); funcRsp.pFuncInfos = taosArrayInit(1, sizeof(SFuncInfo));
funcRsp.pFuncExtraInfos = taosArrayInit(1, sizeof(SFuncExtraInfo));
SFuncInfo funcInfo = {0}; SFuncInfo funcInfo = {0};
strcpy(funcInfo.name, "func1"); strcpy(funcInfo.name, "func1");
funcInfo.funcType = ctgTestFuncType; funcInfo.funcType = ctgTestFuncType;
(void)taosArrayPush(funcRsp.pFuncInfos, &funcInfo); (void)taosArrayPush(funcRsp.pFuncInfos, &funcInfo);
SFuncExtraInfo extraInfo = {0};
extraInfo.funcVersion = 0;
extraInfo.funcCreatedTime = taosGetTimestampMs();
(void)taosArrayPush(funcRsp.pFuncExtraInfos, &extraInfo);
int32_t rspLen = tSerializeSRetrieveFuncRsp(NULL, 0, &funcRsp); int32_t rspLen = tSerializeSRetrieveFuncRsp(NULL, 0, &funcRsp);
void *pReq = rpcMallocCont(rspLen); void *pReq = rpcMallocCont(rspLen);
tSerializeSRetrieveFuncRsp(pReq, rspLen, &funcRsp); tSerializeSRetrieveFuncRsp(pReq, rspLen, &funcRsp);
......
...@@ -1400,8 +1400,9 @@ void udfcUvHandleError(SClientUvConn *conn) { ...@@ -1400,8 +1400,9 @@ void udfcUvHandleError(SClientUvConn *conn) {
QUEUE_REMOVE(&task->procTaskQueue); QUEUE_REMOVE(&task->procTaskQueue);
uv_sem_post(&task->taskSem); uv_sem_post(&task->taskSem);
} }
if (!uv_is_closing((uv_handle_t *)conn->pipe)) {
uv_close((uv_handle_t *)conn->pipe, onUdfcPipeClose); uv_close((uv_handle_t *)conn->pipe, onUdfcPipeClose);
}
} }
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
...@@ -1553,7 +1554,9 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { ...@@ -1553,7 +1554,9 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
} else { } else {
SClientUvConn *conn = pipe->data; SClientUvConn *conn = pipe->data;
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose); if (!uv_is_closing((uv_handle_t *)uvTask->pipe)) {
uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose);
}
code = 0; code = 0;
} }
break; break;
......
...@@ -53,6 +53,45 @@ int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; } ...@@ -53,6 +53,45 @@ int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; }
int32_t udfdCPluginClose() { return 0; } int32_t udfdCPluginClose() { return 0; }
const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
char *initSuffix = "_init";
strcpy(initFuncName, udfName);
strncat(initFuncName, initSuffix, strlen(initSuffix));
uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc));
char destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
char *destroySuffix = "_destroy";
strcpy(destroyFuncName, udfName);
strncat(destroyFuncName, destroySuffix, strlen(destroySuffix));
uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc));
return udfName;
}
void udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(processFuncName, udfName);
uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc));
char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *startSuffix = "_start";
strncpy(startFuncName, processFuncName, sizeof(startFuncName));
strncat(startFuncName, startSuffix, strlen(startSuffix));
uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc));
char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
char *finishSuffix = "_finish";
strncpy(finishFuncName, processFuncName, sizeof(finishFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc));
char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *mergeSuffix = "_merge";
strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName));
strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix));
uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc));
}
int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
int32_t err = 0; int32_t err = 0;
SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx)); SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx));
...@@ -62,45 +101,19 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { ...@@ -62,45 +101,19 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
return TSDB_CODE_UDF_LOAD_UDF_FAILURE; return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
} }
const char *udfName = udf->name; const char *udfName = udf->name;
char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
char *initSuffix = "_init";
strcpy(initFuncName, udfName);
strncat(initFuncName, initSuffix, strlen(initSuffix));
uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc));
char destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName);
char *destroySuffix = "_destroy";
strcpy(destroyFuncName, udfName);
strncat(destroyFuncName, destroySuffix, strlen(destroySuffix));
uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc));
if (udf->funcType == UDF_FUNC_TYPE_SCALAR) { if (udf->funcType == UDF_FUNC_TYPE_SCALAR) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(processFuncName, udfName); strcpy(processFuncName, udfName);
uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)); uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc));
} else if (udf->funcType == UDF_FUNC_TYPE_AGG) { } else if (udf->funcType == UDF_FUNC_TYPE_AGG) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName);
strcpy(processFuncName, udfName);
uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc));
char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *startSuffix = "_start";
strncpy(startFuncName, processFuncName, sizeof(startFuncName));
strncat(startFuncName, startSuffix, strlen(startSuffix));
uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc));
char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
char *finishSuffix = "_finish";
strncpy(finishFuncName, processFuncName, sizeof(finishFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc));
char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *mergeSuffix = "_merge";
strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName));
strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix));
uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc));
} }
int32_t code = 0; int32_t code = 0;
if (udfCtx->initFunc) { if (udfCtx->initFunc) {
// TODO: handle init call return error
code = (udfCtx->initFunc)(); code = (udfCtx->initFunc)();
if (code != 0) { if (code != 0) {
uv_dlclose(&udfCtx->lib); uv_dlclose(&udfCtx->lib);
...@@ -216,6 +229,7 @@ typedef struct SUdfdContext { ...@@ -216,6 +229,7 @@ typedef struct SUdfdContext {
SArray *residentFuncs; SArray *residentFuncs;
char udfDataDir[PATH_MAX];
bool printVersion; bool printVersion;
} SUdfdContext; } SUdfdContext;
...@@ -245,7 +259,9 @@ typedef struct SUvUdfWork { ...@@ -245,7 +259,9 @@ typedef struct SUvUdfWork {
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfState; typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfState;
typedef struct SUdf { typedef struct SUdf {
char name[TSDB_FUNC_NAME_LEN + 1]; char name[TSDB_FUNC_NAME_LEN + 1];
int32_t version;
int64_t createdTime;
int8_t funcType; int8_t funcType;
int8_t scriptType; int8_t scriptType;
...@@ -263,9 +279,11 @@ typedef struct SUdf { ...@@ -263,9 +279,11 @@ typedef struct SUdf {
SUdfScriptPlugin *scriptPlugin; SUdfScriptPlugin *scriptPlugin;
void *scriptUdfCtx; void *scriptUdfCtx;
int64_t lastFetchTime; // last fetch time in milliseconds
bool expired;
} SUdf; } SUdf;
// TODO: add private udf structure.
typedef struct SUdfcFuncHandle { typedef struct SUdfcFuncHandle {
SUdf *udf; SUdf *udf;
} SUdfcFuncHandle; } SUdfcFuncHandle;
...@@ -318,6 +336,9 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); ...@@ -318,6 +336,9 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg);
static int32_t udfdRun(); static int32_t udfdRun();
static void udfdConnectMnodeThreadFunc(void *args); static void udfdConnectMnodeThreadFunc(void *args);
SUdf *udfdNewUdf(const char *udfName);
void udfdGetFuncBodyPath(const SUdf *udf, char *path);
void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
plugin->openFunc = udfdCPluginOpen; plugin->openFunc = udfdCPluginOpen;
...@@ -370,12 +391,13 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { ...@@ -370,12 +391,13 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
} }
if (plugin->openFunc) { if (plugin->openFunc) {
int16_t lenPythonPath = strlen(tsUdfdLdLibPath) + strlen(tsTempDir) + 1 + 1; // tsTempDir:tsUdfdLdLibPath int16_t lenPythonPath =
char *pythonPath = taosMemoryMalloc(lenPythonPath); strlen(tsUdfdLdLibPath) + strlen(global.udfDataDir) + 1 + 1; // global.udfDataDir:tsUdfdLdLibPath
char *pythonPath = taosMemoryMalloc(lenPythonPath);
#ifdef WINDOWS #ifdef WINDOWS
snprintf(pythonPath, lenPythonPath, "%s;%s", tsTempDir, tsUdfdLdLibPath); snprintf(pythonPath, lenPythonPath, "%s;%s", global.udfDataDir, tsUdfdLdLibPath);
#else #else
snprintf(pythonPath, lenPythonPath, "%s:%s", tsTempDir, tsUdfdLdLibPath); snprintf(pythonPath, lenPythonPath, "%s:%s", global.udfDataDir, tsUdfdLdLibPath);
#endif #endif
SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}}; SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}};
err = plugin->openFunc(items, 2); err = plugin->openFunc(items, 2);
...@@ -500,28 +522,14 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { ...@@ -500,28 +522,14 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
udfInfo->funcType = UDF_FUNC_TYPE_SCALAR; udfInfo->funcType = UDF_FUNC_TYPE_SCALAR;
} }
udfInfo->name = udf->name; udfInfo->name = udf->name;
udfInfo->version = udf->version;
udfInfo->createdTime = udf->createdTime;
udfInfo->outputLen = udf->outputLen; udfInfo->outputLen = udf->outputLen;
udfInfo->outputType = udf->outputType; udfInfo->outputType = udf->outputType;
udfInfo->path = udf->path; udfInfo->path = udf->path;
udfInfo->scriptType = udf->scriptType; udfInfo->scriptType = udf->scriptType;
} }
int32_t udfdRenameUdfFile(SUdf *udf) {
char newPath[PATH_MAX];
if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
snprintf(newPath, PATH_MAX, "%s/lib%s.so", tsTempDir, udf->name);
} else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
snprintf(newPath, PATH_MAX, "%s/%s.py", tsTempDir, udf->name);
} else {
return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
}
int32_t code = taosRenameFile(udf->path, newPath);
if (code == 0) {
sprintf(udf->path, "%s", newPath);
}
return 0;
}
int32_t udfdInitUdf(char *udfName, SUdf *udf) { int32_t udfdInitUdf(char *udfName, SUdf *udf) {
int32_t err = 0; int32_t err = 0;
err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf); err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf);
...@@ -546,8 +554,6 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { ...@@ -546,8 +554,6 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) {
uv_mutex_unlock(&global.scriptPluginsMutex); uv_mutex_unlock(&global.scriptPluginsMutex);
udf->scriptPlugin = global.scriptPlugins[udf->scriptType]; udf->scriptPlugin = global.scriptPlugins[udf->scriptType];
udfdRenameUdfFile(udf);
SScriptUdfInfo info = {0}; SScriptUdfInfo info = {0};
convertUdf2UdfInfo(udf, &info); convertUdf2UdfInfo(udf, &info);
err = udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx); err = udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx);
...@@ -556,40 +562,60 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { ...@@ -556,40 +562,60 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) {
return err; return err;
} }
fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void*)udf->scriptUdfCtx); fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void *)udf->scriptUdfCtx);
return 0; return 0;
} }
SUdf *udfdNewUdf(const char *udfName) {
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
udfNew->refCount = 1;
udfNew->lastFetchTime = taosGetTimestampMs();
strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN);
udfNew->state = UDF_STATE_INIT;
uv_mutex_init(&udfNew->lock);
uv_cond_init(&udfNew->condReady);
udfNew->resident = false;
udfNew->expired = false;
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
char *funcName = taosArrayGet(global.residentFuncs, i);
if (strcmp(udfName, funcName) == 0) {
udfNew->resident = true;
break;
}
}
return udfNew;
}
SUdf *udfdGetOrCreateUdf(const char *udfName) { SUdf *udfdGetOrCreateUdf(const char *udfName) {
SUdf *udf = NULL;
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
SUdf **udfInHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
if (udfInHash) { int64_t currTime = taosGetTimestampSec();
++(*udfInHash)->refCount; bool expired = false;
udf = *udfInHash; if (pUdfHash) {
uv_mutex_unlock(&global.udfsMutex); expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s
} else { if (!expired) {
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); ++(*pUdfHash)->refCount;
udfNew->refCount = 1; SUdf *udf = *pUdfHash;
strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); uv_mutex_unlock(&global.udfsMutex);
fnInfo("udfd reuse existing udf. udf %s udf version %d, udf created time %" PRIx64, udf->name, udf->version,
udfNew->state = UDF_STATE_INIT; udf->createdTime);
uv_mutex_init(&udfNew->lock); return udf;
uv_cond_init(&udfNew->condReady); } else {
(*pUdfHash)->expired = true;
udf = udfNew; taosHashRemove(global.udfsHash, udfName, strlen(udfName));
udf->resident = false; fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64,
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { (*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime);
char *funcName = taosArrayGet(global.residentFuncs, i);
if (strcmp(udfName, funcName) == 0) {
udf->resident = true;
break;
}
} }
SUdf **pUdf = &udf;
taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES);
uv_mutex_unlock(&global.udfsMutex);
} }
SUdf *udf = udfdNewUdf(udfName);
SUdf **pUdf = &udf;
taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES);
uv_mutex_unlock(&global.udfsMutex);
return udf; return udf;
} }
...@@ -760,13 +786,13 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -760,13 +786,13 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
udf->refCount--; udf->refCount--;
if (udf->refCount == 0 && !udf->resident) { if (udf->refCount == 0 && (!udf->resident || udf->expired)) {
unloadUdf = true; unloadUdf = true;
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
} }
uv_mutex_unlock(&global.udfsMutex); uv_mutex_unlock(&global.udfsMutex);
if (unloadUdf) { if (unloadUdf) {
fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void*)(udf->scriptUdfCtx)); fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void *)(udf->scriptUdfCtx));
uv_cond_destroy(&udf->condReady); uv_cond_destroy(&udf->condReady);
uv_mutex_destroy(&udf->lock); uv_mutex_destroy(&udf->lock);
code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
...@@ -791,6 +817,61 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -791,6 +817,61 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
return; return;
} }
void udfdGetFuncBodyPath(const SUdf *udf, char *path) {
if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime);
#else
snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version,
udf->createdTime);
#endif
} else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
#else
snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
#endif
} else {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
#else
snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
#endif
}
}
int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) {
if (!osDataSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
fnError("udfd create shared library failed since %s", terrstr(terrno));
return terrno;
}
char path[PATH_MAX] = {0};
udfdGetFuncBodyPath(udf, path);
bool fileExist = !(taosStatFile(path, NULL, NULL) < 0);
if (fileExist) {
strncpy(udf->path, path, PATH_MAX);
fnInfo("udfd func body file. reuse existing file %s", path);
return TSDB_CODE_SUCCESS;
}
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (file == NULL) {
fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno));
return TSDB_CODE_FILE_CORRUPTED;
}
int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
if (count != pFuncInfo->codeSize) {
fnError("udfd write udf shared library failed");
return TSDB_CODE_FILE_CORRUPTED;
}
taosCloseFile(&file);
strncpy(udf->path, path, PATH_MAX);
return TSDB_CODE_SUCCESS;
}
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle; SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
...@@ -829,49 +910,25 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -829,49 +910,25 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
SRetrieveFuncRsp retrieveRsp = {0}; SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
if (retrieveRsp.pFuncInfos == NULL) {
goto _return;
}
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
// SUdf *udf = msgInfo->param; SUdf *udf = msgInfo->param;
SUdf *udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType; udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType; udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->outputType; udf->outputType = pFuncInfo->outputType;
udf->outputLen = pFuncInfo->outputLen; udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize; udf->bufSize = pFuncInfo->bufSize;
if (!osTempSpaceAvailable()) { SFuncExtraInfo *pFuncExtraInfo = (SFuncExtraInfo *)taosArrayGet(retrieveRsp.pFuncExtraInfos, 0);
terrno = TSDB_CODE_NO_AVAIL_DISK; udf->version = pFuncExtraInfo->funcVersion;
msgInfo->code = terrno; udf->createdTime = pFuncExtraInfo->funcCreatedTime;
fnError("udfd create shared library failed since %s", terrstr(terrno)); msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf);
goto _return; if (msgInfo->code != 0) {
} udf->lastFetchTime = 0;
char path[PATH_MAX] = {0};
#ifdef WINDOWS
snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name);
#else
snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name);
#endif
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (file == NULL) {
fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno));
msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
goto _return;
}
int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
if (count != pFuncInfo->codeSize) {
fnError("udfd write udf shared library failed");
msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
goto _return;
} }
taosCloseFile(&file);
strncpy(udf->path, path, PATH_MAX);
tFreeSFuncInfo(pFuncInfo); tFreeSFuncInfo(pFuncInfo);
taosArrayDestroy(retrieveRsp.pFuncInfos); taosArrayDestroy(retrieveRsp.pFuncInfos);
msgInfo->code = 0; taosArrayDestroy(retrieveRsp.pFuncExtraInfos);
} }
_return: _return:
...@@ -1380,6 +1437,24 @@ int32_t udfdCleanup() { ...@@ -1380,6 +1437,24 @@ int32_t udfdCleanup() {
return 0; return 0;
} }
int32_t udfdCreateUdfSourceDir() {
snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsDataDir);
int32_t code = taosMkDir(global.udfDataDir);
if (code != TSDB_CODE_SUCCESS) {
snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsTempDir);
code = taosMkDir(global.udfDataDir);
}
fnInfo("udfd create udf source directory %s. result: %s", global.udfDataDir, tstrerror(code));
return code;
}
int32_t udfdDestroyUdfSourceDir() {
fnInfo("destory udf source directory %s", global.udfDataDir);
taosRemoveDir(global.udfDataDir);
return 0;
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
if (!taosCheckSystemIsLittleEnd()) { if (!taosCheckSystemIsLittleEnd()) {
printf("failed to start since on non-little-end machines\n"); printf("failed to start since on non-little-end machines\n");
...@@ -1408,10 +1483,15 @@ int main(int argc, char *argv[]) { ...@@ -1408,10 +1483,15 @@ int main(int argc, char *argv[]) {
initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp); initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp);
if (udfdOpenClientRpc() != 0) { if (udfdOpenClientRpc() != 0) {
fnError("open rpc connection to mnode failure"); fnError("open rpc connection to mnode failed");
return -3; return -3;
} }
if (udfdCreateUdfSourceDir() != 0) {
fnError("create udf source directory failed");
return -4;
}
if (udfdUvInit() != 0) { if (udfdUvInit() != 0) {
fnError("uv init failure"); fnError("uv init failure");
return -5; return -5;
...@@ -1425,6 +1505,7 @@ int main(int argc, char *argv[]) { ...@@ -1425,6 +1505,7 @@ int main(int argc, char *argv[]) {
udfdRun(); udfdRun();
removeListeningPipe(); removeListeningPipe();
udfdDestroyUdfSourceDir();
udfdCloseClientRpc(); udfdCloseClientRpc();
udfdDeinitResidentFuncs(); udfdDeinitResidentFuncs();
......
...@@ -587,7 +587,8 @@ int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) { ...@@ -587,7 +587,8 @@ int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) {
memcpy(output, funcInfo, sizeof(*funcInfo)); memcpy(output, funcInfo, sizeof(*funcInfo));
taosArrayDestroy(out.pFuncInfos); taosArrayDestroy(out.pFuncInfos);
taosArrayDestroy(out.pFuncExtraInfos);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -280,7 +280,37 @@ if $data20 != 8.000000000 then ...@@ -280,7 +280,37 @@ if $data20 != 8.000000000 then
return -1 return -1
endi endi
sql create or replace function bit_and as '/tmp/udf/libbitand.so' outputtype int
sql select func_version from information_schema.ins_functions where name='bit_and'
if $data00 != 1 then
return -1
endi
sql select bit_and(f1, f2) from t2;
print $rows , $data00 , $data10 , $data20 , $data30 , $data40 , $data50
if $rows != 6 then
return -1
endi
if $data00 != 0 then
return -1
endi
if $data10 != 1 then
return -1
endi
if $data20 != NULL then
return -1
endi
if $data30 != NULL then
return -1
endi
if $data40 != 0 then
return -1
endi
if $data50 != 1 then
return -1
endi
#sql drop function bit_and; #sql drop function bit_and;
#sql show functions; #sql show functions;
#if $rows != 1 then #if $rows != 1 then
......
...@@ -22,7 +22,7 @@ class TDTestCase: ...@@ -22,7 +22,7 @@ class TDTestCase:
tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)") tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)")
tdSql.query("select count(*) from information_schema.ins_columns") tdSql.query("select count(*) from information_schema.ins_columns")
tdSql.checkData(0, 0, 274) tdSql.checkData(0, 0, 275)
tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'") tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'")
tdSql.checkRows(14) tdSql.checkRows(14)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册