未验证 提交 d8b0d405 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3344 from taosdata/feature/os

Feature/os
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_QUEUE_H
#define TDENGINE_HTTP_QUEUE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
bool httpInitResultQueue();
void httpCleanupResultQueue();
void httpDispatchToResultQueue();
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tqueue.h"
#include "tnote.h"
#include "taos.h"
#include "tsclient.h"
#include "httpInt.h"
#include "httpContext.h"
#include "httpSql.h"
#include "httpResp.h"
#include "httpAuth.h"
#include "httpSession.h"
typedef struct {
pthread_t thread;
int32_t workerId;
} SHttpWorker;
typedef struct {
int32_t num;
SHttpWorker *httpWorker;
} SHttpWorkerPool;
typedef struct {
void *param;
void *result;
int numOfRows;
void (*fp)(void *param, void *result, int numOfRows);
} SHttpResult;
static SHttpWorkerPool tsHttpPool;
static taos_qset tsHttpQset;
static taos_queue tsHttpQueue;
void httpDispatchToResultQueue(void *param, TAOS_RES *result, int numOfRows, void (*fp)(void *param, void *result, int numOfRows)) {
if (tsHttpQueue != NULL) {
SHttpResult *pMsg = (SHttpResult *)taosAllocateQitem(sizeof(SHttpResult));
pMsg->param = param;
pMsg->result = result;
pMsg->numOfRows = numOfRows;
pMsg->fp = fp;
taosWriteQitem(tsHttpQueue, TAOS_QTYPE_RPC, pMsg);
} else {
(*fp)(param, result, numOfRows);
}
}
static void *httpProcessResultQueue(void *param) {
SHttpResult *pMsg;
int32_t type;
void *unUsed;
while (1) {
if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) {
httpDebug("httpResultQueue: got no message from qset, exiting...");
break;
}
httpDebug("context:%p, res:%p will be processed in result queue", pMsg->param, pMsg->result);
(*pMsg->fp)(pMsg->param, pMsg->result, pMsg->numOfRows);
taosFreeQitem(pMsg);
}
return NULL;
}
static bool httpAllocateResultQueue() {
tsHttpQueue = taosOpenQueue();
if (tsHttpQueue == NULL) return false;
taosAddIntoQset(tsHttpQset, tsHttpQueue, NULL);
for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
pWorker->workerId = i;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, httpProcessResultQueue, pWorker) != 0) {
httpError("failed to create thread to process http result queue, reason:%s", strerror(errno));
}
pthread_attr_destroy(&thAttr);
httpDebug("http result worker:%d is launched, total:%d", pWorker->workerId, tsHttpPool.num);
}
httpInfo("http result queue is opened");
return true;
}
static void httpFreeResultQueue() {
taosCloseQueue(tsHttpQueue);
tsHttpQueue = NULL;
}
bool httpInitResultQueue() {
tsHttpQset = taosOpenQset();
tsHttpPool.num = tsHttpMaxThreads;
tsHttpPool.httpWorker = (SHttpWorker *)calloc(sizeof(SHttpWorker), tsHttpPool.num);
if (tsHttpPool.httpWorker == NULL) return -1;
for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
pWorker->workerId = i;
}
return httpAllocateResultQueue();
}
void httpCleanupResultQueue() {
httpFreeResultQueue();
for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(tsHttpQset);
}
}
for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
}
taosCloseQset(tsHttpQset);
free(tsHttpPool.httpWorker);
httpInfo("http result queue is closed");
}
......@@ -24,12 +24,15 @@
#include "httpResp.h"
#include "httpAuth.h"
#include "httpSession.h"
#include "httpQueue.h"
void *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos);
void httpProcessMultiSql(HttpContext *pContext);
void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows);
void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int numOfRows) {
HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return;
......@@ -75,7 +78,11 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO
}
}
void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
httpDispatchToResultQueue(param, result, numOfRows, httpProcessMultiSqlRetrieveCallBackImp);
}
void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return;
......@@ -154,6 +161,10 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
}
}
void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) {
httpDispatchToResultQueue(param, result, unUsedCode, httpProcessMultiSqlCallBackImp);
}
void httpProcessMultiSql(HttpContext *pContext) {
HttpSqlCmds * multiCmds = pContext->multiCmds;
HttpEncodeMethod *encode = pContext->encodeMethod;
......@@ -196,7 +207,9 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) {
httpProcessMultiSql(pContext);
}
void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows);
void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int numOfRows) {
HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return;
......@@ -243,7 +256,11 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
}
}
void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) {
void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
httpDispatchToResultQueue(param, result, numOfRows, httpProcessSingleSqlRetrieveCallBackImp);
}
void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCode) {
HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return;
......@@ -306,6 +323,10 @@ void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode)
}
}
void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) {
httpDispatchToResultQueue(param, result, unUsedCode, httpProcessSingleSqlCallBackImp);
}
void httpProcessSingleSqlCmd(HttpContext *pContext) {
HttpSqlCmd * cmd = &pContext->singleCmd;
char * sql = cmd->nativSql;
......
......@@ -26,6 +26,7 @@
#include "httpServer.h"
#include "httpResp.h"
#include "httpHandle.h"
#include "httpQueue.h"
#include "gcHandle.h"
#include "restHandle.h"
#include "tgHandle.h"
......@@ -67,6 +68,11 @@ int httpStartSystem() {
return -1;
}
if (!httpInitResultQueue()) {
httpError("http init result queue failed");
return -1;
}
if (!httpInitContexts()) {
httpError("http init contexts failed");
return -1;
......@@ -98,6 +104,8 @@ void httpCleanUpSystem() {
httpCleanUpConnect();
httpCleanupContexts();
httpCleanUpSessions();
httpCleanupResultQueue();
pthread_mutex_destroy(&tsHttpServer.serverMutex);
taosTFree(tsHttpServer.pThreads);
tsHttpServer.pThreads = NULL;
......
......@@ -327,7 +327,6 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe
// no data, return directly
if (pe->num == 0) {
assert(pe->next == NULL);
__rd_unlock(&pHashObj->lock, pHashObj->type);
return -1;
}
......
......@@ -32,9 +32,10 @@ system sh/cfg.sh -n dnode2 -c http -v 1
system sh/cfg.sh -n dnode3 -c http -v 1
system sh/cfg.sh -n dnode4 -c http -v 1
system sh/cfg.sh -n dnode1 -c httpMaxThreads -v 4
system sh/cfg.sh -n dnode1 -c firstEp -v 127.0.0.1:6030
system sh/cfg.sh -n dnode1 -c secondEp -v 127.0.0.1:6030
system sh/cfg.sh -n dnode1 -c serverPort -v 6030
system sh/cfg.sh -n dnode1 -c fqdn -v 127.0.0.1
#system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode1 -s start
......@@ -28,6 +28,6 @@ IF (TD_LINUX)
#add_executable(createNormalTable createNormalTable.c)
#target_link_libraries(createNormalTable taos_static tutil common pthread)
#add_executable(queryPerformance queryPerformance.c)
#target_link_libraries(queryPerformance taos_static tutil common pthread)
add_executable(queryPerformance queryPerformance.c)
target_link_libraries(queryPerformance taos_static tutil common pthread)
ENDIF()
......@@ -34,27 +34,41 @@ typedef struct {
void *syncTest(void *param);
void shellParseArgument(int argc, char *argv[]);
void insertData();
void queryData();
int64_t numOfThreads = 100;
char sql[10240] = "show dnodes";
int32_t loopTimes = 1000;
int numOfThreads = 10;
int useGlobalConn = 1;
int requestPerThread = 10000;
char requestSql[10240] = "show dnodes";
TAOS *globalConn;
int main(int argc, char *argv[]) {
shellParseArgument(argc, argv);
taos_init();
insertData();
queryData();
}
void insertData() {
void queryData() {
struct timeval systemTime;
int64_t st, et;
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
if (useGlobalConn) {
taosGetFqdnPortFromEp(tsFirst, fqdn, &port);
globalConn = taos_connect(fqdn, "root", "taosdata", NULL, port);
if (globalConn == NULL) {
pError("failed to connect to DB, reason:%s", taos_errstr(globalConn));
exit(1);
}
}
pPrint("%d threads are spawned to query", numOfThreads);
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
pPrint("%" PRId64 " threads are spawned to query", numOfThreads);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
......@@ -73,41 +87,41 @@ void insertData() {
gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
double mseconds = (et - st) / 1000.0;
double totalTimeMs = (et - st) / 1000.0;
int64_t request = loopTimes * numOfThreads;
float avg = mseconds / request;;
float qps = 1000 / avg * numOfThreads;
pPrint(
"%sall threads:%ld finished, use %.1lf ms, qps:%f, avg:%f %s",
GREEN, numOfThreads, mseconds, qps, avg, NC);
int totalReq = requestPerThread * numOfThreads;
float rspTime = totalTimeMs / requestPerThread;
float qps = totalReq / (totalTimeMs / 1000);
pPrint("threads exit");
pPrint("%s threads:%d, totalTime %.1fms totalReq:%d qps:%.1f rspTime:%.3fms %s", GREEN, numOfThreads, totalTimeMs,
totalReq, qps, rspTime, NC);
pthread_attr_destroy(&thattr);
free(pInfo);
}
void *syncTest(void *param) {
TAOS * con;
SInfo * pInfo = (SInfo *)param;
struct timeval systemTime;
pPrint("thread:%d, start to run", pInfo->threadIndex);
TAOS * con;
SInfo * pInfo = (SInfo *)param;
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
taosGetFqdnPortFromEp(tsFirst, fqdn, &port);
con = taos_connect(fqdn, "root", "taosdata", NULL, port);
if (con == NULL) {
pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con));
exit(1);
if (useGlobalConn) {
pPrint("thread:%d, start to run use global connection", pInfo->threadIndex);
con = globalConn;
} else {
pPrint("thread:%d, start to run, and create new conn", pInfo->threadIndex);
taosGetFqdnPortFromEp(tsFirst, fqdn, &port);
con = taos_connect(fqdn, "root", "taosdata", NULL, port);
if (con == NULL) {
pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con));
exit(1);
}
}
for (int i = 0; i < loopTimes; ++i) {
void *tres = taos_query(con, sql);
for (int i = 0; i < requestPerThread; ++i) {
void *tres = taos_query(con, requestSql);
TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) {
......@@ -117,13 +131,10 @@ void *syncTest(void *param) {
do {
row = taos_fetch_row(tres);
} while( row != NULL);
} while (row != NULL);
taos_free_result(tres);
}
gettimeofday(&systemTime, NULL);
return NULL;
}
......@@ -134,12 +145,14 @@ void printHelp() {
printf("%s%s\n", indent, "-c");
printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
printf("%s%s\n", indent, "-s");
printf("%s%s%s%s\n", indent, indent, "The sql to be executed, default is %s", sql);
printf("%s%s\n", indent, "-l");
printf("%s%s%s%d\n", indent, indent, "Loop Times per thread, default is ", loopTimes);
printf("%s%s%s%s\n", indent, indent, "The sql to be executed, default is ", requestSql);
printf("%s%s\n", indent, "-r");
printf("%s%s%s%d\n", indent, indent, "Request per thread, default is ", requestPerThread);
printf("%s%s\n", indent, "-t");
printf("%s%s%s%" PRId64 "\n", indent, indent, "Number of threads to be used, default is ", numOfThreads);
printf("%s%s%s%d\n", indent, indent, "Number of threads to be used, default is ", numOfThreads);
printf("%s%s\n", indent, "-g");
printf("%s%s%s%d\n", indent, indent, "Whether to share connections between threads, default is ", useGlobalConn);
exit(EXIT_SUCCESS);
}
......@@ -151,17 +164,20 @@ void shellParseArgument(int argc, char *argv[]) {
} else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-s") == 0) {
strcpy(sql, argv[++i]);
} else if (strcmp(argv[i], "-l") == 0) {
loopTimes = atoi(argv[++i]);
strcpy(requestSql, argv[++i]);
} else if (strcmp(argv[i], "-r") == 0) {
requestPerThread = atoi(argv[++i]);
} else if (strcmp(argv[i], "-t") == 0) {
numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-g") == 0) {
useGlobalConn = atoi(argv[++i]);
} else {
}
}
pPrint("%ssql:%s%s", GREEN, sql, NC);
pPrint("%sloopTImes:%d%s", GREEN, loopTimes, NC);
pPrint("%snumOfThreads:%" PRId64 "%s", GREEN, numOfThreads, NC);
pPrint("%sstart to run%s", GREEN, NC);
pPrint("%s sql:%s %s", GREEN, requestSql, NC);
pPrint("%s requestPerThread:%d %s", GREEN, requestPerThread, NC);
pPrint("%s numOfThreads:%d %s", GREEN, numOfThreads, NC);
pPrint("%s useGlobalConn:%d %s", GREEN, useGlobalConn, NC);
pPrint("%s start to run %s", GREEN, NC);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册