未验证 提交 6e7e2b26 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #12457 from taosdata/feature/udf

fix: core caused by invalid total rows of data block when executing sql like select udf2(f2) from udf.t2 group by 1-udf1(f1)
......@@ -310,28 +310,28 @@ enum {
};
int64_t gUdfTaskSeqNum = 0;
typedef struct SUdfdProxy {
typedef struct SUdfcProxy {
char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
uv_barrier_t gUdfInitBarrier;
uv_barrier_t initBarrier;
uv_loop_t gUdfdLoop;
uv_thread_t gUdfLoopThread;
uv_async_t gUdfLoopTaskAync;
uv_loop_t uvLoop;
uv_thread_t loopThread;
uv_async_t loopTaskAync;
uv_async_t gUdfLoopStopAsync;
uv_async_t loopStopAsync;
uv_mutex_t gUdfTaskQueueMutex;
int8_t gUdfcState;
QUEUE gUdfTaskQueue;
QUEUE gUvProcTaskQueue;
uv_mutex_t taskQueueMutex;
int8_t udfcState;
QUEUE taskQueue;
QUEUE uvProcTaskQueue;
int8_t initialized;
} SUdfdProxy;
} SUdfcProxy;
SUdfdProxy gUdfdProxy = {0};
SUdfcProxy gUdfdProxy = {0};
typedef struct SClientUdfUvSession {
SUdfdProxy *udfc;
SUdfcProxy *udfc;
int64_t severHandle;
uv_pipe_t *udfUvPipe;
......@@ -341,7 +341,7 @@ typedef struct SClientUdfUvSession {
} SClientUdfUvSession;
typedef struct SClientUvTaskNode {
SUdfdProxy *udfc;
SUdfcProxy *udfc;
int8_t type;
int errCode;
......@@ -1055,11 +1055,11 @@ int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask);
SUdfdProxy *udfc = uvTask->udfc;
uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_INSERT_TAIL(&udfc->gUdfTaskQueue, &uvTask->recvTaskQueue);
uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
uv_async_send(&udfc->gUdfLoopTaskAync);
SUdfcProxy *udfc = uvTask->udfc;
uv_mutex_lock(&udfc->taskQueueMutex);
QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
uv_mutex_unlock(&udfc->taskQueueMutex);
uv_async_send(&udfc->loopTaskAync);
uv_sem_wait(&uvTask->taskSem);
fnInfo("udfc uv task finished. task: %d, %p", uvTask->type, uvTask);
......@@ -1073,7 +1073,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
switch (uvTask->type) {
case UV_TASK_CONNECT: {
uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
uv_pipe_init(&uvTask->udfc->gUdfdLoop, pipe, 0);
uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0);
uvTask->pipe = pipe;
SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
......@@ -1113,46 +1113,46 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
}
void udfClientAsyncCb(uv_async_t *async) {
SUdfdProxy *udfc = async->data;
SUdfcProxy *udfc = async->data;
QUEUE wq;
uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq);
uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
uv_mutex_lock(&udfc->taskQueueMutex);
QUEUE_MOVE(&udfc->taskQueue, &wq);
uv_mutex_unlock(&udfc->taskQueueMutex);
while (!QUEUE_EMPTY(&wq)) {
QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
udfcStartUvTask(task);
QUEUE_INSERT_TAIL(&udfc->gUvProcTaskQueue, &task->procTaskQueue);
QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
}
}
void cleanUpUvTasks(SUdfdProxy *udfc) {
void cleanUpUvTasks(SUdfcProxy *udfc) {
fnDebug("clean up uv tasks")
QUEUE wq;
uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq);
uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
uv_mutex_lock(&udfc->taskQueueMutex);
QUEUE_MOVE(&udfc->taskQueue, &wq);
uv_mutex_unlock(&udfc->taskQueueMutex);
while (!QUEUE_EMPTY(&wq)) {
QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
if (udfc->udfcState == UDFC_STATE_STOPPING) {
task->errCode = TSDB_CODE_UDF_STOPPING;
}
uv_sem_post(&task->taskSem);
}
while (!QUEUE_EMPTY(&udfc->gUvProcTaskQueue)) {
QUEUE* h = QUEUE_HEAD(&udfc->gUvProcTaskQueue);
while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) {
QUEUE* h = QUEUE_HEAD(&udfc->uvProcTaskQueue);
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
if (udfc->udfcState == UDFC_STATE_STOPPING) {
task->errCode = TSDB_CODE_UDF_STOPPING;
}
uv_sem_post(&task->taskSem);
......@@ -1160,28 +1160,28 @@ void cleanUpUvTasks(SUdfdProxy *udfc) {
}
void udfStopAsyncCb(uv_async_t *async) {
SUdfdProxy *udfc = async->data;
SUdfcProxy *udfc = async->data;
cleanUpUvTasks(udfc);
if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
uv_stop(&udfc->gUdfdLoop);
if (udfc->udfcState == UDFC_STATE_STOPPING) {
uv_stop(&udfc->uvLoop);
}
}
void constructUdfService(void *argsThread) {
SUdfdProxy *udfc = (SUdfdProxy*)argsThread;
uv_loop_init(&udfc->gUdfdLoop);
uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopTaskAync, udfClientAsyncCb);
udfc->gUdfLoopTaskAync.data = udfc;
uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopStopAsync, udfStopAsyncCb);
udfc->gUdfLoopStopAsync.data = udfc;
uv_mutex_init(&udfc->gUdfTaskQueueMutex);
QUEUE_INIT(&udfc->gUdfTaskQueue);
QUEUE_INIT(&udfc->gUvProcTaskQueue);
uv_barrier_wait(&udfc->gUdfInitBarrier);
SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
uv_loop_init(&udfc->uvLoop);
uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfClientAsyncCb);
udfc->loopTaskAync.data = udfc;
uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
udfc->loopStopAsync.data = udfc;
uv_mutex_init(&udfc->taskQueueMutex);
QUEUE_INIT(&udfc->taskQueue);
QUEUE_INIT(&udfc->uvProcTaskQueue);
uv_barrier_wait(&udfc->initBarrier);
//TODO return value of uv_run
uv_run(&udfc->gUdfdLoop, UV_RUN_DEFAULT);
uv_loop_close(&udfc->gUdfdLoop);
uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
uv_loop_close(&udfc->uvLoop);
}
int32_t udfcOpen() {
......@@ -1189,14 +1189,14 @@ int32_t udfcOpen() {
if (old == 1) {
return 0;
}
SUdfdProxy *proxy = &gUdfdProxy;
SUdfcProxy *proxy = &gUdfdProxy;
getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
proxy->gUdfcState = UDFC_STATE_STARTNG;
uv_barrier_init(&proxy->gUdfInitBarrier, 2);
uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy);
atomic_store_8(&proxy->gUdfcState, UDFC_STATE_READY);
proxy->gUdfcState = UDFC_STATE_READY;
uv_barrier_wait(&proxy->gUdfInitBarrier);
proxy->udfcState = UDFC_STATE_STARTNG;
uv_barrier_init(&proxy->initBarrier, 2);
uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
proxy->udfcState = UDFC_STATE_READY;
uv_barrier_wait(&proxy->initBarrier);
fnInfo("udfc initialized")
return 0;
}
......@@ -1207,13 +1207,13 @@ int32_t udfcClose() {
return 0;
}
SUdfdProxy *udfc = &gUdfdProxy;
udfc->gUdfcState = UDFC_STATE_STOPPING;
uv_async_send(&udfc->gUdfLoopStopAsync);
uv_thread_join(&udfc->gUdfLoopThread);
uv_mutex_destroy(&udfc->gUdfTaskQueueMutex);
uv_barrier_destroy(&udfc->gUdfInitBarrier);
udfc->gUdfcState = UDFC_STATE_INITAL;
SUdfcProxy *udfc = &gUdfdProxy;
udfc->udfcState = UDFC_STATE_STOPPING;
uv_async_send(&udfc->loopStopAsync);
uv_thread_join(&udfc->loopThread);
uv_mutex_destroy(&udfc->taskQueueMutex);
uv_barrier_destroy(&udfc->initBarrier);
udfc->udfcState = UDFC_STATE_INITAL;
fnInfo("udfc cleaned up");
return 0;
}
......@@ -1236,7 +1236,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
fnInfo("udfc setup udf. udfName: %s", udfName);
if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) {
if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
return TSDB_CODE_UDF_INVALID_STATE;
}
SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask));
......@@ -1484,7 +1484,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
SSDataBlock tempBlock = {0};
tempBlock.info.numOfCols = numOfCols;
tempBlock.info.rows = numOfRows;
tempBlock.info.rows = pInput->totalRows;
tempBlock.info.uid = pInput->uid;
bool hasVarCol = false;
tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
......
......@@ -109,6 +109,7 @@ if $data01 != 18.547236991 then
endi
sql select udf2(udf1(f2-f1)), udf2(udf1(f2/f1)) from t2;
print $rows , $data00 , $data01
if $rows != 1 then
return -1
endi
......@@ -118,7 +119,19 @@ endi
if $data01 != 152.420471066 then
return -1
endi
print $rows , $data00 , $data01
sql select udf2(f2) from udf.t2 group by 1-udf1(f1);
print $rows , $data00 , $data10
if $rows != 2 then
return -1
endi
if $data00 != 2.000000000 then
return -1
endi
if $data10 != 12.083045974 then
return -1
endi
sql drop function udf1;
sql show functions;
if $rows != 1 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册