提交 6c4400c3 编写于 作者: S slzhou

fix: the buf returend by the udf has a smaller size than the function bufsize

上级 44e3b115
......@@ -899,9 +899,11 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
typedef struct SUdfAggRes {
int8_t finalResNum;
int8_t interResNum;
int32_t interResBufLen;
char *finalResBuf;
char *interResBuf;
} SUdfAggRes;
void onUdfcPipeClose(uv_handle_t *handle);
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask);
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
......@@ -1096,9 +1098,10 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult
releaseUdfFuncHandle(pCtx->udfName);
return false;
}
udfRes->interResNum = buf.numOfResult;
if (buf.bufLen <= session->bufSize) {
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
udfRes->interResBufLen = buf.bufLen;
udfRes->interResNum = buf.numOfResult;
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
releaseUdfFuncHandle(pCtx->udfName);
......@@ -1136,7 +1139,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
SSDataBlock *inputBlock = blockDataExtractBlock(pTempBlock, start, numOfRows);
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum};
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
SUdfInterBuf newState = {0};
udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
......@@ -1144,17 +1147,17 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
fnError("udfAggProcess error. code: %d", udfCode);
newState.numOfResult = 0;
} else {
udfRes->interResNum = newState.numOfResult;
if (newState.bufLen <= session->bufSize) {
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
udfRes->interResBufLen = newState.bufLen;
udfRes->interResNum = newState.numOfResult;
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
}
}
if (newState.numOfResult == 1 || state.numOfResult == 1) {
GET_RES_INFO(pCtx)->numOfRes = 1;
}
GET_RES_INFO(pCtx)->numOfRes = udfRes->interResNum;
blockDataDestroy(inputBlock);
......@@ -1180,7 +1183,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SUdfInterBuf resultBuf = {0};
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum};
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
int32_t udfCallCode = 0;
udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf);
if (udfCallCode != 0) {
......
......@@ -173,6 +173,7 @@ int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, voi
// for others, dlopen/dlsym to find function pointers
typedef struct SUdfScriptPlugin {
int8_t scriptType;
const char* scriptSuffix;
char libPath[PATH_MAX];
bool libLoaded;
......@@ -313,6 +314,7 @@ static void udfdConnectMnodeThreadFunc(void *args);
void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
plugin->scriptSuffix = '.so';
plugin->openFunc = udfdCPluginOpen;
plugin->closeFunc = udfdCPluginClose;
plugin->udfInitFunc = udfdCPluginUdfInit;
......@@ -346,6 +348,7 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[],
void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON;
plugin->scriptSuffix = "py";
//todo: windows support
sprintf(plugin->libPath, "%s", "libtaospyudf.so");
plugin->libLoaded = false;
......@@ -776,7 +779,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
udf->outputType = pFuncInfo->outputType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
char* suffix = global.scriptPlugins[udf->scriptType]->scriptSuffix;
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
msgInfo->code = terrno;
......@@ -786,9 +789,9 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
char path[PATH_MAX] = {0};
#ifdef WINDOWS
snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name);
snprintf(path, sizeof(path), "%s%s.%s", tsTempDir, pFuncInfo->name, suffix);
#else
snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name);
snprintf(path, sizeof(path), "%s/%s.%s", tsTempDir, pFuncInfo->name, suffix);
#endif
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (file == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册