提交 1ee72aa5 编写于 作者: W wenzhouwww@live.cn

Merge branch '3.0' of github.com:taosdata/TDengine into test/udf_3.0

...@@ -5500,18 +5500,21 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { ...@@ -5500,18 +5500,21 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
int32_t rowIndex = -1;
SColumnInfoData* pSrc = NULL; SColumnInfoData* pSrc = NULL;
if (pJoinInfo->pLeft->info.blockId == blockId) { if (pJoinInfo->pLeft->info.blockId == blockId) {
pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId); pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId);
rowIndex = pJoinInfo->leftPos;
} else { } else {
pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId); pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId);
rowIndex = pJoinInfo->rightPos;
} }
if (colDataIsNull_s(pSrc, pJoinInfo->leftPos)) { if (colDataIsNull_s(pSrc, rowIndex)) {
colDataAppendNULL(pDst, nrows); colDataAppendNULL(pDst, nrows);
} else { } else {
char* p = colDataGetData(pSrc, pJoinInfo->leftPos); char* p = colDataGetData(pSrc, rowIndex);
colDataAppend(pDst, nrows, p, false); colDataAppend(pDst, nrows, p, false);
} }
} }
......
...@@ -825,6 +825,11 @@ void onUdfcPipeClose(uv_handle_t *handle) { ...@@ -825,6 +825,11 @@ void onUdfcPipeClose(uv_handle_t *handle) {
taosMemoryFree(conn->readBuf.buf); taosMemoryFree(conn->readBuf.buf);
taosMemoryFree(conn); taosMemoryFree(conn);
taosMemoryFree((uv_pipe_t *) handle); taosMemoryFree((uv_pipe_t *) handle);
//clear the udf handles cache
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
taosArrayClear(gUdfdProxy.udfStubs);
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
} }
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) { int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
...@@ -1140,7 +1145,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { ...@@ -1140,7 +1145,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
return code; return code;
} }
void udfClientAsyncCb(uv_async_t *async) { void udfcAsyncTaskCb(uv_async_t *async) {
SUdfcProxy *udfc = async->data; SUdfcProxy *udfc = async->data;
QUEUE wq; QUEUE wq;
...@@ -1204,7 +1209,7 @@ void constructUdfService(void *argsThread) { ...@@ -1204,7 +1209,7 @@ void constructUdfService(void *argsThread) {
SUdfcProxy *udfc = (SUdfcProxy *)argsThread; SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
uv_loop_init(&udfc->uvLoop); uv_loop_init(&udfc->uvLoop);
uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfClientAsyncCb); uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb);
udfc->loopTaskAync.data = udfc; udfc->loopTaskAync.data = udfc;
uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb); uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
udfc->loopStopAsync.data = udfc; udfc->loopStopAsync.data = udfc;
...@@ -1472,6 +1477,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, ...@@ -1472,6 +1477,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
return code; return code;
} }
//TODO: when to teardown udf. teardown udf is not called
int32_t doTeardownUdf(UdfcFuncHandle handle) { int32_t doTeardownUdf(UdfcFuncHandle handle) {
fnInfo("tear down udf. udf func handle: %p", handle); fnInfo("tear down udf. udf func handle: %p", handle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册