提交 604e1939 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/data_format

...@@ -122,7 +122,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); ...@@ -122,7 +122,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output); int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
int32_t teardownUdfs(); int32_t cleanUpUdfs();
// end API to taosd and qworker // end API to taosd and qworker
//============================================================================================================================= //=============================================================================================================================
// begin API to UDF writer. // begin API to UDF writer.
......
...@@ -157,7 +157,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { ...@@ -157,7 +157,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0; int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
pTaskInfo->totalRows += current; pTaskInfo->totalRows += current;
teardownUdfs(); cleanUpUdfs();
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0); GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
......
...@@ -25,8 +25,6 @@ ...@@ -25,8 +25,6 @@
#include "functionMgt.h" #include "functionMgt.h"
//TODO: add unit test //TODO: add unit test
//TODO: include all global variable under context struct
typedef struct SUdfdData { typedef struct SUdfdData {
bool startCalled; bool startCalled;
bool needCleanUp; bool needCleanUp;
...@@ -1275,7 +1273,6 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { ...@@ -1275,7 +1273,6 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
} }
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
fnInfo("udfc setup udf. udfName: %s", udfName);
if (gUdfdProxy.udfcState != UDFC_STATE_READY) { if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
return TSDB_CODE_UDF_INVALID_STATE; return TSDB_CODE_UDF_INVALID_STATE;
} }
...@@ -1305,7 +1302,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { ...@@ -1305,7 +1302,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
if (task->errCode != 0) { if (task->errCode != 0) {
fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode) fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
} else { } else {
fnInfo("sucessfully setup udf func handle. handle: %p", task->session); fnInfo("sucessfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
*funcHandle = task->session; *funcHandle = task->session;
} }
int32_t err = task->errCode; int32_t err = task->errCode;
...@@ -1490,13 +1487,11 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, ...@@ -1490,13 +1487,11 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
return code; return code;
} }
//TODO: when to teardown udf. teardown udf is not called
int32_t doTeardownUdf(UdfcFuncHandle handle) { int32_t doTeardownUdf(UdfcFuncHandle handle) {
fnInfo("tear down udf. udf func handle: %p", handle);
SUdfcUvSession *session = (SUdfcUvSession *) handle; SUdfcUvSession *session = (SUdfcUvSession *) handle;
if (session->udfUvPipe == NULL) { if (session->udfUvPipe == NULL) {
fnError("pipe to udfd does not exist"); fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName);
return TSDB_CODE_UDF_PIPE_NO_PIPE; return TSDB_CODE_UDF_PIPE_NO_PIPE;
} }
...@@ -1511,7 +1506,6 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { ...@@ -1511,7 +1506,6 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
SUdfTeardownResponse *rsp = &task->_teardown.rsp; SUdfTeardownResponse *rsp = &task->_teardown.rsp;
int32_t err = task->errCode; int32_t err = task->errCode;
udfcRunUdfUvTask(task, UV_TASK_DISCONNECT); udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
...@@ -1519,6 +1513,8 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { ...@@ -1519,6 +1513,8 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
taosMemoryFree(task->session); taosMemoryFree(task->session);
taosMemoryFree(task); taosMemoryFree(task);
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
return err; return err;
} }
...@@ -1651,25 +1647,22 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { ...@@ -1651,25 +1647,22 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
} }
// int32_t code = doTeardownUdf(session);
// if (code != 0) {
// fnError("udfAggFinalize error. doTeardownUdf step. udf code: %d", code);
// }
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
releaseUdfFuncHandle(pCtx->udfName); releaseUdfFuncHandle(pCtx->udfName);
return udfCallCode == 0 ? numOfResults : udfCallCode; return udfCallCode == 0 ? numOfResults : udfCallCode;
} }
int32_t teardownUdfs() { int32_t cleanUpUdfs() {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex); uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
int32_t i = 0; int32_t i = 0;
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
if (stub->refCount == 0) { if (stub->refCount == 0) {
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
doTeardownUdf(stub->handle); doTeardownUdf(stub->handle);
} else { } else {
fnInfo("udf still in use. udf name: %s, ref count: %d, handle: %p", stub->udfName, stub->refCount, stub->handle);
taosArrayPush(udfStubs, stub); taosArrayPush(udfStubs, stub);
} }
++i; ++i;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册