diff --git a/src/plugins/http/inc/httpQueue.h b/src/plugins/http/inc/httpQueue.h new file mode 100644 index 0000000000000000000000000000000000000000..a4590719ff24d48eee875b2f2c4ff2f28a0a31f6 --- /dev/null +++ b/src/plugins/http/inc/httpQueue.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_HTTP_QUEUE_H +#define TDENGINE_HTTP_QUEUE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +bool httpInitResultQueue(); +void httpCleanupResultQueue(); +void httpDispatchToResultQueue(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/plugins/http/src/httpQueue.c b/src/plugins/http/src/httpQueue.c new file mode 100644 index 0000000000000000000000000000000000000000..9625102f7450daf409d35aa532267f3f999d80ab --- /dev/null +++ b/src/plugins/http/src/httpQueue.c @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "tqueue.h" +#include "tnote.h" +#include "taos.h" +#include "tsclient.h" +#include "httpInt.h" +#include "httpContext.h" +#include "httpSql.h" +#include "httpResp.h" +#include "httpAuth.h" +#include "httpSession.h" + +typedef struct { + pthread_t thread; + int32_t workerId; +} SHttpWorker; + +typedef struct { + int32_t num; + SHttpWorker *httpWorker; +} SHttpWorkerPool; + +typedef struct { + void *param; + void *result; + int numOfRows; + void (*fp)(void *param, void *result, int numOfRows); +} SHttpResult; + +static SHttpWorkerPool tsHttpPool; +static taos_qset tsHttpQset; +static taos_queue tsHttpQueue; + +void httpDispatchToResultQueue(void *param, TAOS_RES *result, int numOfRows, void (*fp)(void *param, void *result, int numOfRows)) { + if (tsHttpQueue != NULL) { + SHttpResult *pMsg = (SHttpResult *)taosAllocateQitem(sizeof(SHttpResult)); + pMsg->param = param; + pMsg->result = result; + pMsg->numOfRows = numOfRows; + pMsg->fp = fp; + taosWriteQitem(tsHttpQueue, TAOS_QTYPE_RPC, pMsg); + } else { + (*fp)(param, result, numOfRows); + } +} + +static void *httpProcessResultQueue(void *param) { + SHttpResult *pMsg; + int32_t type; + void *unUsed; + + while (1) { + if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) { + httpDebug("httpResultQueue: got no message from qset, exiting..."); + break; + } + + httpDebug("context:%p, res:%p will be processed in result queue", pMsg->param, pMsg->result); + (*pMsg->fp)(pMsg->param, pMsg->result, pMsg->numOfRows); + taosFreeQitem(pMsg); + } + + return NULL; +} + +static bool httpAllocateResultQueue() { + tsHttpQueue = taosOpenQueue(); + if (tsHttpQueue == NULL) return false; + + taosAddIntoQset(tsHttpQset, tsHttpQueue, NULL); + + for (int32_t i = 0; i < tsHttpPool.num; ++i) { + SHttpWorker *pWorker = tsHttpPool.httpWorker + i; + pWorker->workerId = i; + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pWorker->thread, &thAttr, httpProcessResultQueue, pWorker) != 0) { + httpError("failed to create thread to process http result queue, reason:%s", strerror(errno)); + } + + pthread_attr_destroy(&thAttr); + httpDebug("http result worker:%d is launched, total:%d", pWorker->workerId, tsHttpPool.num); + } + + httpInfo("http result queue is opened"); + return true; +} + +static void httpFreeResultQueue() { + taosCloseQueue(tsHttpQueue); + tsHttpQueue = NULL; +} + +bool httpInitResultQueue() { + tsHttpQset = taosOpenQset(); + + tsHttpPool.num = tsHttpMaxThreads; + tsHttpPool.httpWorker = (SHttpWorker *)calloc(sizeof(SHttpWorker), tsHttpPool.num); + + if (tsHttpPool.httpWorker == NULL) return -1; + for (int32_t i = 0; i < tsHttpPool.num; ++i) { + SHttpWorker *pWorker = tsHttpPool.httpWorker + i; + pWorker->workerId = i; + } + + return httpAllocateResultQueue(); +} + +void httpCleanupResultQueue() { + httpFreeResultQueue(); + + for (int32_t i = 0; i < tsHttpPool.num; ++i) { + SHttpWorker *pWorker = tsHttpPool.httpWorker + i; + if (pWorker->thread) { + taosQsetThreadResume(tsHttpQset); + } + } + + for (int32_t i = 0; i < tsHttpPool.num; ++i) { + SHttpWorker *pWorker = tsHttpPool.httpWorker + i; + if (pWorker->thread) { + pthread_join(pWorker->thread, NULL); + } + } + + taosCloseQset(tsHttpQset); + free(tsHttpPool.httpWorker); + + httpInfo("http result queue is closed"); +} diff --git a/src/plugins/http/src/httpSql.c b/src/plugins/http/src/httpSql.c index 041fbdb92a6af689f14a71ff22e7537be64daa99..07cdea1380aaf40febc66f6d0e3891f7a577101d 100644 --- a/src/plugins/http/src/httpSql.c +++ b/src/plugins/http/src/httpSql.c @@ -24,12 +24,15 @@ #include "httpResp.h" #include "httpAuth.h" #include "httpSession.h" +#include "httpQueue.h" void *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos); void httpProcessMultiSql(HttpContext *pContext); -void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { +void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows); + +void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int numOfRows) { HttpContext *pContext = (HttpContext *)param; if (pContext == NULL) return; @@ -75,7 +78,11 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO } } -void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) { +void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { + httpDispatchToResultQueue(param, result, numOfRows, httpProcessMultiSqlRetrieveCallBackImp); +} + +void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { HttpContext *pContext = (HttpContext *)param; if (pContext == NULL) return; @@ -154,6 +161,10 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) { } } +void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) { + httpDispatchToResultQueue(param, result, unUsedCode, httpProcessMultiSqlCallBackImp); +} + void httpProcessMultiSql(HttpContext *pContext) { HttpSqlCmds * multiCmds = pContext->multiCmds; HttpEncodeMethod *encode = pContext->encodeMethod; @@ -196,7 +207,9 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) { httpProcessMultiSql(pContext); } -void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { +void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows); + +void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int numOfRows) { HttpContext *pContext = (HttpContext *)param; if (pContext == NULL) return; @@ -243,7 +256,11 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num } } -void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) { +void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { + httpDispatchToResultQueue(param, result, numOfRows, httpProcessSingleSqlRetrieveCallBackImp); +} + +void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCode) { HttpContext *pContext = (HttpContext *)param; if (pContext == NULL) return; @@ -306,6 +323,10 @@ void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) } } +void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) { + httpDispatchToResultQueue(param, result, unUsedCode, httpProcessSingleSqlCallBackImp); +} + void httpProcessSingleSqlCmd(HttpContext *pContext) { HttpSqlCmd * cmd = &pContext->singleCmd; char * sql = cmd->nativSql; diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index 38bd8624b212d7c2004bf2eb89986be5e07cda5c..e51c8dd4f773397862483d9284e765a51d49c923 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -26,6 +26,7 @@ #include "httpServer.h" #include "httpResp.h" #include "httpHandle.h" +#include "httpQueue.h" #include "gcHandle.h" #include "restHandle.h" #include "tgHandle.h" @@ -67,6 +68,11 @@ int httpStartSystem() { return -1; } + if (!httpInitResultQueue()) { + httpError("http init result queue failed"); + return -1; + } + if (!httpInitContexts()) { httpError("http init contexts failed"); return -1; @@ -98,6 +104,8 @@ void httpCleanUpSystem() { httpCleanUpConnect(); httpCleanupContexts(); httpCleanUpSessions(); + httpCleanupResultQueue(); + pthread_mutex_destroy(&tsHttpServer.serverMutex); taosTFree(tsHttpServer.pThreads); tsHttpServer.pThreads = NULL; diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 9634175db7db30aa4e9aaad381861b2649d40a61..cc96f83f445a3b8e0ff1047b8fb0fd1d7721f7ce 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -327,7 +327,6 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe // no data, return directly if (pe->num == 0) { - assert(pe->next == NULL); __rd_unlock(&pHashObj->lock, pHashObj->type); return -1; } diff --git a/tests/script/tmp/182.sim b/tests/script/tmp/182.sim index 27e064dc9ba64603ca2e6a8db6629546af1b49b8..a178282cf835b236a202c71f1ef5a595e784324c 100644 --- a/tests/script/tmp/182.sim +++ b/tests/script/tmp/182.sim @@ -32,9 +32,10 @@ system sh/cfg.sh -n dnode2 -c http -v 1 system sh/cfg.sh -n dnode3 -c http -v 1 system sh/cfg.sh -n dnode4 -c http -v 1 +system sh/cfg.sh -n dnode1 -c httpMaxThreads -v 4 system sh/cfg.sh -n dnode1 -c firstEp -v 127.0.0.1:6030 system sh/cfg.sh -n dnode1 -c secondEp -v 127.0.0.1:6030 system sh/cfg.sh -n dnode1 -c serverPort -v 6030 system sh/cfg.sh -n dnode1 -c fqdn -v 127.0.0.1 -#system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode1 -s start diff --git a/tests/test/c/CMakeLists.txt b/tests/test/c/CMakeLists.txt index 536727100d906aeac636c63d5d5627f2bbd4dc24..e1fedaee3cecb8f55fa882c71b32ab49014bb5ea 100644 --- a/tests/test/c/CMakeLists.txt +++ b/tests/test/c/CMakeLists.txt @@ -28,6 +28,6 @@ IF (TD_LINUX) #add_executable(createNormalTable createNormalTable.c) #target_link_libraries(createNormalTable taos_static tutil common pthread) - #add_executable(queryPerformance queryPerformance.c) - #target_link_libraries(queryPerformance taos_static tutil common pthread) + add_executable(queryPerformance queryPerformance.c) + target_link_libraries(queryPerformance taos_static tutil common pthread) ENDIF() diff --git a/tests/test/c/queryPerformance.c b/tests/test/c/queryPerformance.c index 5e7a4333ded79e69d9fd2f8543bbe52a3c852f74..eda082dd4f293f0879603f6b71cc59150d6cfb3d 100644 --- a/tests/test/c/queryPerformance.c +++ b/tests/test/c/queryPerformance.c @@ -34,27 +34,41 @@ typedef struct { void *syncTest(void *param); void shellParseArgument(int argc, char *argv[]); -void insertData(); +void queryData(); -int64_t numOfThreads = 100; -char sql[10240] = "show dnodes"; -int32_t loopTimes = 1000; +int numOfThreads = 10; +int useGlobalConn = 1; +int requestPerThread = 10000; +char requestSql[10240] = "show dnodes"; +TAOS *globalConn; int main(int argc, char *argv[]) { shellParseArgument(argc, argv); taos_init(); - insertData(); + queryData(); } -void insertData() { +void queryData() { struct timeval systemTime; int64_t st, et; + char fqdn[TSDB_FQDN_LEN]; + uint16_t port; + + if (useGlobalConn) { + taosGetFqdnPortFromEp(tsFirst, fqdn, &port); + + globalConn = taos_connect(fqdn, "root", "taosdata", NULL, port); + if (globalConn == NULL) { + pError("failed to connect to DB, reason:%s", taos_errstr(globalConn)); + exit(1); + } + } + + pPrint("%d threads are spawned to query", numOfThreads); gettimeofday(&systemTime, NULL); st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; - pPrint("%" PRId64 " threads are spawned to query", numOfThreads); - pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); @@ -73,41 +87,41 @@ void insertData() { gettimeofday(&systemTime, NULL); et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; - double mseconds = (et - st) / 1000.0; + double totalTimeMs = (et - st) / 1000.0; - int64_t request = loopTimes * numOfThreads; - float avg = mseconds / request;; - float qps = 1000 / avg * numOfThreads; - - pPrint( - "%sall threads:%ld finished, use %.1lf ms, qps:%f, avg:%f %s", - GREEN, numOfThreads, mseconds, qps, avg, NC); + int totalReq = requestPerThread * numOfThreads; + float rspTime = totalTimeMs / requestPerThread; + float qps = totalReq / (totalTimeMs / 1000); - pPrint("threads exit"); + pPrint("%s threads:%d, totalTime %.1fms totalReq:%d qps:%.1f rspTime:%.3fms %s", GREEN, numOfThreads, totalTimeMs, + totalReq, qps, rspTime, NC); pthread_attr_destroy(&thattr); free(pInfo); } void *syncTest(void *param) { - TAOS * con; - SInfo * pInfo = (SInfo *)param; - struct timeval systemTime; - - pPrint("thread:%d, start to run", pInfo->threadIndex); + TAOS * con; + SInfo * pInfo = (SInfo *)param; char fqdn[TSDB_FQDN_LEN]; uint16_t port; - taosGetFqdnPortFromEp(tsFirst, fqdn, &port); - - con = taos_connect(fqdn, "root", "taosdata", NULL, port); - if (con == NULL) { - pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con)); - exit(1); + if (useGlobalConn) { + pPrint("thread:%d, start to run use global connection", pInfo->threadIndex); + con = globalConn; + } else { + pPrint("thread:%d, start to run, and create new conn", pInfo->threadIndex); + taosGetFqdnPortFromEp(tsFirst, fqdn, &port); + + con = taos_connect(fqdn, "root", "taosdata", NULL, port); + if (con == NULL) { + pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con)); + exit(1); + } } - for (int i = 0; i < loopTimes; ++i) { - void *tres = taos_query(con, sql); + for (int i = 0; i < requestPerThread; ++i) { + void *tres = taos_query(con, requestSql); TAOS_ROW row = taos_fetch_row(tres); if (row == NULL) { @@ -117,13 +131,10 @@ void *syncTest(void *param) { do { row = taos_fetch_row(tres); - } while( row != NULL); + } while (row != NULL); taos_free_result(tres); } - - gettimeofday(&systemTime, NULL); - return NULL; } @@ -134,12 +145,14 @@ void printHelp() { printf("%s%s\n", indent, "-c"); printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir); printf("%s%s\n", indent, "-s"); - printf("%s%s%s%s\n", indent, indent, "The sql to be executed, default is %s", sql); - printf("%s%s\n", indent, "-l"); - printf("%s%s%s%d\n", indent, indent, "Loop Times per thread, default is ", loopTimes); + printf("%s%s%s%s\n", indent, indent, "The sql to be executed, default is ", requestSql); + printf("%s%s\n", indent, "-r"); + printf("%s%s%s%d\n", indent, indent, "Request per thread, default is ", requestPerThread); printf("%s%s\n", indent, "-t"); - printf("%s%s%s%" PRId64 "\n", indent, indent, "Number of threads to be used, default is ", numOfThreads); - + printf("%s%s%s%d\n", indent, indent, "Number of threads to be used, default is ", numOfThreads); + printf("%s%s\n", indent, "-g"); + printf("%s%s%s%d\n", indent, indent, "Whether to share connections between threads, default is ", useGlobalConn); + exit(EXIT_SUCCESS); } @@ -151,17 +164,20 @@ void shellParseArgument(int argc, char *argv[]) { } else if (strcmp(argv[i], "-c") == 0) { strcpy(configDir, argv[++i]); } else if (strcmp(argv[i], "-s") == 0) { - strcpy(sql, argv[++i]); - } else if (strcmp(argv[i], "-l") == 0) { - loopTimes = atoi(argv[++i]); + strcpy(requestSql, argv[++i]); + } else if (strcmp(argv[i], "-r") == 0) { + requestPerThread = atoi(argv[++i]); } else if (strcmp(argv[i], "-t") == 0) { numOfThreads = atoi(argv[++i]); + } else if (strcmp(argv[i], "-g") == 0) { + useGlobalConn = atoi(argv[++i]); } else { } } - pPrint("%ssql:%s%s", GREEN, sql, NC); - pPrint("%sloopTImes:%d%s", GREEN, loopTimes, NC); - pPrint("%snumOfThreads:%" PRId64 "%s", GREEN, numOfThreads, NC); - pPrint("%sstart to run%s", GREEN, NC); + pPrint("%s sql:%s %s", GREEN, requestSql, NC); + pPrint("%s requestPerThread:%d %s", GREEN, requestPerThread, NC); + pPrint("%s numOfThreads:%d %s", GREEN, numOfThreads, NC); + pPrint("%s useGlobalConn:%d %s", GREEN, useGlobalConn, NC); + pPrint("%s start to run %s", GREEN, NC); }