提交 44acc063 编写于 作者: S Shengliang Guan

TD-1204

上级 992b6feb
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_QUEUE_H
#define TDENGINE_HTTP_QUEUE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
bool httpInitResultQueue();
void httpCleanupResultQueue();
void httpDispatchToResultQueue();
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tqueue.h"
#include "tnote.h"
#include "taos.h"
#include "tsclient.h"
#include "httpInt.h"
#include "httpContext.h"
#include "httpSql.h"
#include "httpResp.h"
#include "httpAuth.h"
#include "httpSession.h"
typedef struct {
pthread_t thread;
int32_t workerId;
} SHttpWorker;
typedef struct {
int32_t num;
SHttpWorker *httpWorker;
} SHttpWorkerPool;
typedef struct {
void *param;
void *result;
int numOfRows;
void (*fp)(void *param, void *result, int numOfRows);
} SHttpResult;
static SHttpWorkerPool tsHttpPool;
static taos_qset tsHttpQset;
static taos_queue tsHttpQueue;
void httpDispatchToResultQueue(void *param, TAOS_RES *result, int numOfRows, void (*fp)(void *param, void *result, int numOfRows)) {
if (tsHttpQueue != NULL) {
SHttpResult *pMsg = (SHttpResult *)taosAllocateQitem(sizeof(SHttpResult));
pMsg->param = param;
pMsg->result = result;
pMsg->numOfRows = numOfRows;
pMsg->fp = fp;
taosWriteQitem(tsHttpQueue, TAOS_QTYPE_RPC, pMsg);
} else {
(*fp)(param, result, numOfRows);
}
}
static void *httpProcessResultQueue(void *param) {
SHttpResult *pMsg;
int32_t type;
void *unUsed;
while (1) {
if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) {
httpDebug("httpResultQueue: got no message from qset, exiting...");
break;
}
httpDebug("context:%p, res:%p will be processed in result queue", pMsg->param, pMsg->result);
(*pMsg->fp)(pMsg->param, pMsg->result, pMsg->numOfRows);
taosFreeQitem(pMsg);
}
return NULL;
}
static bool httpAllocateResultQueue() {
tsHttpQueue = taosOpenQueue();
if (tsHttpQueue == NULL) return false;
taosAddIntoQset(tsHttpQset, tsHttpQueue, NULL);
for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
pWorker->workerId = i;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, httpProcessResultQueue, pWorker) != 0) {
httpError("failed to create thread to process http result queue, reason:%s", strerror(errno));
}
pthread_attr_destroy(&thAttr);
httpDebug("http result worker:%d is launched, total:%d", pWorker->workerId, tsHttpPool.num);
}
httpInfo("http result queue is opened");
return true;
}
static void httpFreeResultQueue() {
taosCloseQueue(tsHttpQueue);
tsHttpQueue = NULL;
}
bool httpInitResultQueue() {
tsHttpQset = taosOpenQset();
tsHttpPool.num = tsHttpMaxThreads;
tsHttpPool.httpWorker = (SHttpWorker *)calloc(sizeof(SHttpWorker), tsHttpPool.num);
if (tsHttpPool.httpWorker == NULL) return -1;
for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
pWorker->workerId = i;
}
return httpAllocateResultQueue();
}
void httpCleanupResultQueue() {
httpFreeResultQueue();
for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(tsHttpQset);
}
}
for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
}
taosCloseQset(tsHttpQset);
free(tsHttpPool.httpWorker);
httpInfo("http result queue is closed");
}
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "httpResp.h" #include "httpResp.h"
#include "httpAuth.h" #include "httpAuth.h"
#include "httpSession.h" #include "httpSession.h"
#include "httpQueue.h"
void *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos); void *param, void **taos);
...@@ -75,7 +76,7 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO ...@@ -75,7 +76,7 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO
} }
} }
void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) { void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return; if (pContext == NULL) return;
...@@ -154,6 +155,10 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) { ...@@ -154,6 +155,10 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
} }
} }
void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) {
httpDispatchToResultQueue(param, result, unUsedCode, httpProcessMultiSqlCallBackImp);
}
void httpProcessMultiSql(HttpContext *pContext) { void httpProcessMultiSql(HttpContext *pContext) {
HttpSqlCmds * multiCmds = pContext->multiCmds; HttpSqlCmds * multiCmds = pContext->multiCmds;
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
...@@ -196,7 +201,9 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) { ...@@ -196,7 +201,9 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) {
httpProcessMultiSql(pContext); httpProcessMultiSql(pContext);
} }
void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows);
void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int numOfRows) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return; if (pContext == NULL) return;
...@@ -243,7 +250,11 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num ...@@ -243,7 +250,11 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
} }
} }
void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) { void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
httpDispatchToResultQueue(param, result, numOfRows, httpProcessSingleSqlRetrieveCallBackImp);
}
void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCode) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return; if (pContext == NULL) return;
...@@ -306,6 +317,10 @@ void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) ...@@ -306,6 +317,10 @@ void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode)
} }
} }
void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) {
httpDispatchToResultQueue(param, result, unUsedCode, httpProcessSingleSqlCallBackImp);
}
void httpProcessSingleSqlCmd(HttpContext *pContext) { void httpProcessSingleSqlCmd(HttpContext *pContext) {
HttpSqlCmd * cmd = &pContext->singleCmd; HttpSqlCmd * cmd = &pContext->singleCmd;
char * sql = cmd->nativSql; char * sql = cmd->nativSql;
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "httpServer.h" #include "httpServer.h"
#include "httpResp.h" #include "httpResp.h"
#include "httpHandle.h" #include "httpHandle.h"
#include "httpQueue.h"
#include "gcHandle.h" #include "gcHandle.h"
#include "restHandle.h" #include "restHandle.h"
#include "tgHandle.h" #include "tgHandle.h"
...@@ -67,6 +68,11 @@ int httpStartSystem() { ...@@ -67,6 +68,11 @@ int httpStartSystem() {
return -1; return -1;
} }
if (!httpInitResultQueue()) {
httpError("http init result queue failed");
return -1;
}
if (!httpInitContexts()) { if (!httpInitContexts()) {
httpError("http init contexts failed"); httpError("http init contexts failed");
return -1; return -1;
...@@ -98,6 +104,8 @@ void httpCleanUpSystem() { ...@@ -98,6 +104,8 @@ void httpCleanUpSystem() {
httpCleanUpConnect(); httpCleanUpConnect();
httpCleanupContexts(); httpCleanupContexts();
httpCleanUpSessions(); httpCleanUpSessions();
httpCleanupResultQueue();
pthread_mutex_destroy(&tsHttpServer.serverMutex); pthread_mutex_destroy(&tsHttpServer.serverMutex);
taosTFree(tsHttpServer.pThreads); taosTFree(tsHttpServer.pThreads);
tsHttpServer.pThreads = NULL; tsHttpServer.pThreads = NULL;
......
...@@ -32,9 +32,10 @@ system sh/cfg.sh -n dnode2 -c http -v 1 ...@@ -32,9 +32,10 @@ system sh/cfg.sh -n dnode2 -c http -v 1
system sh/cfg.sh -n dnode3 -c http -v 1 system sh/cfg.sh -n dnode3 -c http -v 1
system sh/cfg.sh -n dnode4 -c http -v 1 system sh/cfg.sh -n dnode4 -c http -v 1
system sh/cfg.sh -n dnode1 -c httpMaxThreads -v 5
system sh/cfg.sh -n dnode1 -c firstEp -v 127.0.0.1:6030 system sh/cfg.sh -n dnode1 -c firstEp -v 127.0.0.1:6030
system sh/cfg.sh -n dnode1 -c secondEp -v 127.0.0.1:6030 system sh/cfg.sh -n dnode1 -c secondEp -v 127.0.0.1:6030
system sh/cfg.sh -n dnode1 -c serverPort -v 6030 system sh/cfg.sh -n dnode1 -c serverPort -v 6030
system sh/cfg.sh -n dnode1 -c fqdn -v 127.0.0.1 system sh/cfg.sh -n dnode1 -c fqdn -v 127.0.0.1
#system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
...@@ -28,6 +28,6 @@ IF (TD_LINUX) ...@@ -28,6 +28,6 @@ IF (TD_LINUX)
#add_executable(createNormalTable createNormalTable.c) #add_executable(createNormalTable createNormalTable.c)
#target_link_libraries(createNormalTable taos_static tutil common pthread) #target_link_libraries(createNormalTable taos_static tutil common pthread)
#add_executable(queryPerformance queryPerformance.c) add_executable(queryPerformance queryPerformance.c)
#target_link_libraries(queryPerformance taos_static tutil common pthread) target_link_libraries(queryPerformance taos_static tutil common pthread)
ENDIF() ENDIF()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册