diff --git a/src/plugins/http/inc/httpQueue.h b/src/plugins/http/inc/httpQueue.h new file mode 100644 index 0000000000000000000000000000000000000000..a4590719ff24d48eee875b2f2c4ff2f28a0a31f6 --- /dev/null +++ b/src/plugins/http/inc/httpQueue.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_HTTP_QUEUE_H +#define TDENGINE_HTTP_QUEUE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +bool httpInitResultQueue(); +void httpCleanupResultQueue(); +void httpDispatchToResultQueue(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/plugins/http/src/httpQueue.c b/src/plugins/http/src/httpQueue.c new file mode 100644 index 0000000000000000000000000000000000000000..9625102f7450daf409d35aa532267f3f999d80ab --- /dev/null +++ b/src/plugins/http/src/httpQueue.c @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "tqueue.h" +#include "tnote.h" +#include "taos.h" +#include "tsclient.h" +#include "httpInt.h" +#include "httpContext.h" +#include "httpSql.h" +#include "httpResp.h" +#include "httpAuth.h" +#include "httpSession.h" + +typedef struct { + pthread_t thread; + int32_t workerId; +} SHttpWorker; + +typedef struct { + int32_t num; + SHttpWorker *httpWorker; +} SHttpWorkerPool; + +typedef struct { + void *param; + void *result; + int numOfRows; + void (*fp)(void *param, void *result, int numOfRows); +} SHttpResult; + +static SHttpWorkerPool tsHttpPool; +static taos_qset tsHttpQset; +static taos_queue tsHttpQueue; + +void httpDispatchToResultQueue(void *param, TAOS_RES *result, int numOfRows, void (*fp)(void *param, void *result, int numOfRows)) { + if (tsHttpQueue != NULL) { + SHttpResult *pMsg = (SHttpResult *)taosAllocateQitem(sizeof(SHttpResult)); + pMsg->param = param; + pMsg->result = result; + pMsg->numOfRows = numOfRows; + pMsg->fp = fp; + taosWriteQitem(tsHttpQueue, TAOS_QTYPE_RPC, pMsg); + } else { + (*fp)(param, result, numOfRows); + } +} + +static void *httpProcessResultQueue(void *param) { + SHttpResult *pMsg; + int32_t type; + void *unUsed; + + while (1) { + if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) { + httpDebug("httpResultQueue: got no message from qset, exiting..."); + break; + } + + httpDebug("context:%p, res:%p will be processed in result queue", pMsg->param, pMsg->result); + (*pMsg->fp)(pMsg->param, pMsg->result, pMsg->numOfRows); + taosFreeQitem(pMsg); + } + + return NULL; +} + +static bool httpAllocateResultQueue() { + tsHttpQueue = taosOpenQueue(); + if (tsHttpQueue == NULL) return false; + + taosAddIntoQset(tsHttpQset, tsHttpQueue, NULL); + + for (int32_t i = 0; i < tsHttpPool.num; ++i) { + SHttpWorker *pWorker = tsHttpPool.httpWorker + i; + pWorker->workerId = i; + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pWorker->thread, &thAttr, httpProcessResultQueue, pWorker) != 0) { + httpError("failed to create thread to process http result queue, reason:%s", strerror(errno)); + } + + pthread_attr_destroy(&thAttr); + httpDebug("http result worker:%d is launched, total:%d", pWorker->workerId, tsHttpPool.num); + } + + httpInfo("http result queue is opened"); + return true; +} + +static void httpFreeResultQueue() { + taosCloseQueue(tsHttpQueue); + tsHttpQueue = NULL; +} + +bool httpInitResultQueue() { + tsHttpQset = taosOpenQset(); + + tsHttpPool.num = tsHttpMaxThreads; + tsHttpPool.httpWorker = (SHttpWorker *)calloc(sizeof(SHttpWorker), tsHttpPool.num); + + if (tsHttpPool.httpWorker == NULL) return -1; + for (int32_t i = 0; i < tsHttpPool.num; ++i) { + SHttpWorker *pWorker = tsHttpPool.httpWorker + i; + pWorker->workerId = i; + } + + return httpAllocateResultQueue(); +} + +void httpCleanupResultQueue() { + httpFreeResultQueue(); + + for (int32_t i = 0; i < tsHttpPool.num; ++i) { + SHttpWorker *pWorker = tsHttpPool.httpWorker + i; + if (pWorker->thread) { + taosQsetThreadResume(tsHttpQset); + } + } + + for (int32_t i = 0; i < tsHttpPool.num; ++i) { + SHttpWorker *pWorker = tsHttpPool.httpWorker + i; + if (pWorker->thread) { + pthread_join(pWorker->thread, NULL); + } + } + + taosCloseQset(tsHttpQset); + free(tsHttpPool.httpWorker); + + httpInfo("http result queue is closed"); +} diff --git a/src/plugins/http/src/httpSql.c b/src/plugins/http/src/httpSql.c index 041fbdb92a6af689f14a71ff22e7537be64daa99..558652b4215085c34116104ee47ae766d8df276c 100644 --- a/src/plugins/http/src/httpSql.c +++ b/src/plugins/http/src/httpSql.c @@ -24,6 +24,7 @@ #include "httpResp.h" #include "httpAuth.h" #include "httpSession.h" +#include "httpQueue.h" void *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos); @@ -75,7 +76,7 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO } } -void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) { +void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { HttpContext *pContext = (HttpContext *)param; if (pContext == NULL) return; @@ -154,6 +155,10 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) { } } +void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) { + httpDispatchToResultQueue(param, result, unUsedCode, httpProcessMultiSqlCallBackImp); +} + void httpProcessMultiSql(HttpContext *pContext) { HttpSqlCmds * multiCmds = pContext->multiCmds; HttpEncodeMethod *encode = pContext->encodeMethod; @@ -196,7 +201,9 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) { httpProcessMultiSql(pContext); } -void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { +void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows); + +void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int numOfRows) { HttpContext *pContext = (HttpContext *)param; if (pContext == NULL) return; @@ -243,7 +250,11 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num } } -void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) { +void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { + httpDispatchToResultQueue(param, result, numOfRows, httpProcessSingleSqlRetrieveCallBackImp); +} + +void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCode) { HttpContext *pContext = (HttpContext *)param; if (pContext == NULL) return; @@ -306,6 +317,10 @@ void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) } } +void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) { + httpDispatchToResultQueue(param, result, unUsedCode, httpProcessSingleSqlCallBackImp); +} + void httpProcessSingleSqlCmd(HttpContext *pContext) { HttpSqlCmd * cmd = &pContext->singleCmd; char * sql = cmd->nativSql; diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index 38bd8624b212d7c2004bf2eb89986be5e07cda5c..e51c8dd4f773397862483d9284e765a51d49c923 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -26,6 +26,7 @@ #include "httpServer.h" #include "httpResp.h" #include "httpHandle.h" +#include "httpQueue.h" #include "gcHandle.h" #include "restHandle.h" #include "tgHandle.h" @@ -67,6 +68,11 @@ int httpStartSystem() { return -1; } + if (!httpInitResultQueue()) { + httpError("http init result queue failed"); + return -1; + } + if (!httpInitContexts()) { httpError("http init contexts failed"); return -1; @@ -98,6 +104,8 @@ void httpCleanUpSystem() { httpCleanUpConnect(); httpCleanupContexts(); httpCleanUpSessions(); + httpCleanupResultQueue(); + pthread_mutex_destroy(&tsHttpServer.serverMutex); taosTFree(tsHttpServer.pThreads); tsHttpServer.pThreads = NULL; diff --git a/tests/script/tmp/182.sim b/tests/script/tmp/182.sim index 27e064dc9ba64603ca2e6a8db6629546af1b49b8..69742f4bfab6e02ea060e193c49798c05fd7837c 100644 --- a/tests/script/tmp/182.sim +++ b/tests/script/tmp/182.sim @@ -32,9 +32,10 @@ system sh/cfg.sh -n dnode2 -c http -v 1 system sh/cfg.sh -n dnode3 -c http -v 1 system sh/cfg.sh -n dnode4 -c http -v 1 +system sh/cfg.sh -n dnode1 -c httpMaxThreads -v 5 system sh/cfg.sh -n dnode1 -c firstEp -v 127.0.0.1:6030 system sh/cfg.sh -n dnode1 -c secondEp -v 127.0.0.1:6030 system sh/cfg.sh -n dnode1 -c serverPort -v 6030 system sh/cfg.sh -n dnode1 -c fqdn -v 127.0.0.1 -#system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode1 -s start diff --git a/tests/test/c/CMakeLists.txt b/tests/test/c/CMakeLists.txt index 536727100d906aeac636c63d5d5627f2bbd4dc24..e1fedaee3cecb8f55fa882c71b32ab49014bb5ea 100644 --- a/tests/test/c/CMakeLists.txt +++ b/tests/test/c/CMakeLists.txt @@ -28,6 +28,6 @@ IF (TD_LINUX) #add_executable(createNormalTable createNormalTable.c) #target_link_libraries(createNormalTable taos_static tutil common pthread) - #add_executable(queryPerformance queryPerformance.c) - #target_link_libraries(queryPerformance taos_static tutil common pthread) + add_executable(queryPerformance queryPerformance.c) + target_link_libraries(queryPerformance taos_static tutil common pthread) ENDIF()