提交 23d4eef1 编写于 作者: S shenglian zhou

feat: startup of replace function when no active query

上级 ed48713a
...@@ -1054,6 +1054,7 @@ typedef struct { ...@@ -1054,6 +1054,7 @@ typedef struct {
int64_t signature; int64_t signature;
char* pComment; char* pComment;
char* pCode; char* pCode;
int8_t orReplace;
} SCreateFuncReq; } SCreateFuncReq;
int32_t tSerializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq); int32_t tSerializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq);
...@@ -1095,6 +1096,7 @@ typedef struct { ...@@ -1095,6 +1096,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t numOfFuncs; int32_t numOfFuncs;
SArray* pFuncInfos; SArray* pFuncInfos;
SArray* pFuncVersions;
} SRetrieveFuncRsp; } SRetrieveFuncRsp;
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
......
...@@ -1702,6 +1702,8 @@ int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq ...@@ -1702,6 +1702,8 @@ int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq
if (tEncodeCStr(&encoder, pReq->pComment) < 0) return -1; if (tEncodeCStr(&encoder, pReq->pComment) < 0) return -1;
} }
if (tEncodeI8(&encoder, pReq->orReplace) <0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -1744,6 +1746,8 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR ...@@ -1744,6 +1746,8 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR
if (tDecodeCStrTo(&decoder, pReq->pComment) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->pComment) < 0) return -1;
} }
if (tDecoodeI8(&decoder, &pReq->orReplace) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -1855,6 +1859,12 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp * ...@@ -1855,6 +1859,12 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp *
} }
} }
if (pRsp->numOfFuncs != (int32_t)taosArrayGetSize(pRsp->pFuncVersions)) return -1;
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
int32_t version = *(int32_t*)taosArrayGet(pRsp->pFuncVersions, i);
if (tEncodeI32(&encoder, version) < 0) return -1;
}
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -1902,6 +1912,15 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp ...@@ -1902,6 +1912,15 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp
taosArrayPush(pRsp->pFuncInfos, &fInfo); taosArrayPush(pRsp->pFuncInfos, &fInfo);
} }
pRsp->pFuncVersions = taosArrayInit(pRsp->numOfFuncs, sizeof(int32_t));
if (pRsp->pFuncVersions == NULL) return -1;
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
int32_t version = 0;
if (tDecodeI32(&decoder, &version) < 0) return -1;
taosArrayPush(pRsp->pFuncVersions, &version);
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
......
...@@ -246,6 +246,7 @@ typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfStat ...@@ -246,6 +246,7 @@ typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfStat
typedef struct SUdf { typedef struct SUdf {
char name[TSDB_FUNC_NAME_LEN + 1]; char name[TSDB_FUNC_NAME_LEN + 1];
int32_t version;
int8_t funcType; int8_t funcType;
int8_t scriptType; int8_t scriptType;
...@@ -833,13 +834,13 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -833,13 +834,13 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
goto _return; goto _return;
} }
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
// SUdf *udf = msgInfo->param;
SUdf *udf = msgInfo->param; SUdf *udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType; udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType; udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->outputType; udf->outputType = pFuncInfo->outputType;
udf->outputLen = pFuncInfo->outputLen; udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize; udf->bufSize = pFuncInfo->bufSize;
udf->version = *(int32_t*)taosArrayGet(retrieveRsp.pFuncVersions,0);
if (!osTempSpaceAvailable()) { if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK; terrno = TSDB_CODE_NO_AVAIL_DISK;
...@@ -850,9 +851,9 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -850,9 +851,9 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
char path[PATH_MAX] = {0}; char path[PATH_MAX] = {0};
#ifdef WINDOWS #ifdef WINDOWS
snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name); snprintf(path, sizeof(path), "%s%s%d", tsTempDir, pFuncInfo->name, udf->version);
#else #else
snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name); snprintf(path, sizeof(path), "%s/%s%d", tsTempDir, pFuncInfo->name, udf->version);
#endif #endif
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (file == NULL) { if (file == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册