提交 86d78a29 编写于 作者: S Shengliang Guan

TD-2044

上级 a5fdb787
...@@ -22,9 +22,11 @@ extern "C" { ...@@ -22,9 +22,11 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
typedef void (*FHttpResultFp)(void *param, void *result, int32_t code, int32_t rows);
bool httpInitResultQueue(); bool httpInitResultQueue();
void httpCleanupResultQueue(); void httpCleanupResultQueue();
void httpDispatchToResultQueue(); void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t code, int32_t rows, FHttpResultFp fp);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "httpResp.h" #include "httpResp.h"
#include "httpAuth.h" #include "httpAuth.h"
#include "httpSession.h" #include "httpSession.h"
#include "httpQueue.h"
typedef struct { typedef struct {
pthread_t thread; pthread_t thread;
...@@ -37,26 +38,28 @@ typedef struct { ...@@ -37,26 +38,28 @@ typedef struct {
} SHttpWorkerPool; } SHttpWorkerPool;
typedef struct { typedef struct {
void *param; void * param;
void *result; void * result;
int32_t numOfRows; int32_t code;
void (*fp)(void *param, void *result, int32_t numOfRows); int32_t rows;
FHttpResultFp fp;
} SHttpResult; } SHttpResult;
static SHttpWorkerPool tsHttpPool; static SHttpWorkerPool tsHttpPool;
static taos_qset tsHttpQset; static taos_qset tsHttpQset;
static taos_queue tsHttpQueue; static taos_queue tsHttpQueue;
void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t numOfRows, void (*fp)(void *param, void *result, int32_t numOfRows)) { void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t code, int32_t rows, FHttpResultFp fp) {
if (tsHttpQueue != NULL) { if (tsHttpQueue != NULL) {
SHttpResult *pMsg = taosAllocateQitem(sizeof(SHttpResult)); SHttpResult *pMsg = taosAllocateQitem(sizeof(SHttpResult));
pMsg->param = param; pMsg->param = param;
pMsg->result = result; pMsg->result = result;
pMsg->numOfRows = numOfRows; pMsg->code = code;
pMsg->rows = rows;
pMsg->fp = fp; pMsg->fp = fp;
taosWriteQitem(tsHttpQueue, TAOS_QTYPE_RPC, pMsg); taosWriteQitem(tsHttpQueue, TAOS_QTYPE_RPC, pMsg);
} else { } else {
(*fp)(param, result, numOfRows); (*fp)(param, result, code, rows);
} }
} }
...@@ -71,8 +74,9 @@ static void *httpProcessResultQueue(void *param) { ...@@ -71,8 +74,9 @@ static void *httpProcessResultQueue(void *param) {
break; break;
} }
httpTrace("context:%p, res:%p will be processed in result queue", pMsg->param, pMsg->result); httpTrace("context:%p, res:%p will be processed in result queue, code:%d rows:%d", pMsg->param, pMsg->result,
(*pMsg->fp)(pMsg->param, pMsg->result, pMsg->numOfRows); pMsg->code, pMsg->rows);
(*pMsg->fp)(pMsg->param, pMsg->result, pMsg->code, pMsg->rows);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
......
...@@ -31,7 +31,7 @@ void httpProcessMultiSql(HttpContext *pContext); ...@@ -31,7 +31,7 @@ void httpProcessMultiSql(HttpContext *pContext);
void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows); void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows);
void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int numOfRows) { void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int code, int numOfRows) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return; if (pContext == NULL) return;
...@@ -43,7 +43,7 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n ...@@ -43,7 +43,7 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n
bool isContinue = false; bool isContinue = false;
if (numOfRows > 0) { if (code == TSDB_CODE_SUCCESS && numOfRows > 0) {
if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->buildQueryJsonFp) { if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->buildQueryJsonFp) {
isContinue = (encode->buildQueryJsonFp)(pContext, singleCmd, result, numOfRows); isContinue = (encode->buildQueryJsonFp)(pContext, singleCmd, result, numOfRows);
} }
...@@ -58,9 +58,9 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n ...@@ -58,9 +58,9 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n
httpDebug("context:%p, fd:%d, user:%s, process pos:%d, stop retrieve, numOfRows:%d, sql:%s", pContext, pContext->fd, httpDebug("context:%p, fd:%d, user:%s, process pos:%d, stop retrieve, numOfRows:%d, sql:%s", pContext, pContext->fd,
pContext->user, multiCmds->pos, numOfRows, sql); pContext->user, multiCmds->pos, numOfRows, sql);
if (numOfRows < 0) { if (code < 0) {
httpError("context:%p, fd:%d, user:%s, process pos:%d, retrieve failed code:%s, sql:%s", pContext, pContext->fd, httpError("context:%p, fd:%d, user:%s, process pos:%d, retrieve failed code:%s, sql:%s", pContext, pContext->fd,
pContext->user, multiCmds->pos, tstrerror(numOfRows), sql); pContext->user, multiCmds->pos, tstrerror(code), sql);
} }
taos_free_result(result); taos_free_result(result);
...@@ -74,14 +74,14 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n ...@@ -74,14 +74,14 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n
} }
void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
httpDispatchToResultQueue(param, result, numOfRows, httpProcessMultiSqlRetrieveCallBackImp); int32_t code = taos_errno(result);
httpDispatchToResultQueue(param, result, code, numOfRows, httpProcessMultiSqlRetrieveCallBackImp);
} }
void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code, int affectRowsInput) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return; if (pContext == NULL) return;
code = taos_errno(result);
HttpSqlCmds *multiCmds = pContext->multiCmds; HttpSqlCmds *multiCmds = pContext->multiCmds;
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
...@@ -94,7 +94,7 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { ...@@ -94,7 +94,7 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) {
return; return;
} }
if (code < 0) { if (code != TSDB_CODE_SUCCESS) {
if (encode->checkFinishedFp != NULL && !encode->checkFinishedFp(pContext, singleCmd, code)) { if (encode->checkFinishedFp != NULL && !encode->checkFinishedFp(pContext, singleCmd, code)) {
singleCmd->code = code; singleCmd->code = code;
httpDebug("context:%p, fd:%d, user:%s, process pos jump to:%d, last code:%s, last sql:%s", pContext, pContext->fd, httpDebug("context:%p, fd:%d, user:%s, process pos jump to:%d, last code:%s, last sql:%s", pContext, pContext->fd,
...@@ -157,7 +157,9 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { ...@@ -157,7 +157,9 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) {
} }
void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) { void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) {
httpDispatchToResultQueue(param, result, unUsedCode, httpProcessMultiSqlCallBackImp); int32_t code = taos_errno(result);
int32_t affectRows = taos_affected_rows(result);
httpDispatchToResultQueue(param, result, code, affectRows, httpProcessMultiSqlCallBackImp);
} }
void httpProcessMultiSql(HttpContext *pContext) { void httpProcessMultiSql(HttpContext *pContext) {
...@@ -204,7 +206,7 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) { ...@@ -204,7 +206,7 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) {
void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows); void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows);
void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int numOfRows) { void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int code, int numOfRows) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return; if (pContext == NULL) return;
...@@ -212,7 +214,7 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int ...@@ -212,7 +214,7 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int
bool isContinue = false; bool isContinue = false;
if (numOfRows > 0) { if (code == TSDB_CODE_SUCCESS && numOfRows > 0) {
if (encode->buildQueryJsonFp) { if (encode->buildQueryJsonFp) {
isContinue = (encode->buildQueryJsonFp)(pContext, &pContext->singleCmd, result, numOfRows); isContinue = (encode->buildQueryJsonFp)(pContext, &pContext->singleCmd, result, numOfRows);
} }
...@@ -227,9 +229,9 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int ...@@ -227,9 +229,9 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int
httpDebug("context:%p, fd:%d, user:%s, stop retrieve, numOfRows:%d", pContext, pContext->fd, pContext->user, httpDebug("context:%p, fd:%d, user:%s, stop retrieve, numOfRows:%d", pContext, pContext->fd, pContext->user,
numOfRows); numOfRows);
if (numOfRows < 0) { if (code < 0) {
httpError("context:%p, fd:%d, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->user, httpError("context:%p, fd:%d, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->user,
tstrerror(numOfRows)); tstrerror(code));
} }
taos_free_result(result); taos_free_result(result);
...@@ -243,29 +245,29 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int ...@@ -243,29 +245,29 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int
} }
void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
httpDispatchToResultQueue(param, result, numOfRows, httpProcessSingleSqlRetrieveCallBackImp); int32_t code = taos_errno(result);
httpDispatchToResultQueue(param, result, code, numOfRows, httpProcessSingleSqlRetrieveCallBackImp);
} }
void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCode) { void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int code, int affectRowsInput) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return; if (pContext == NULL) return;
int32_t code = taos_errno(result);
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
httpError("context:%p, fd:%d, user:%s, query error, code:%s:inprogress, sqlObj:%p", pContext, pContext->fd, httpError("context:%p, fd:%d, user:%s, query error, code:%s:inprogress, sqlObj:%p", pContext, pContext->fd,
pContext->user, tstrerror(code), (SSqlObj *)result); pContext->user, tstrerror(code), result);
return; return;
} }
if (code < 0) { if (code != TSDB_CODE_SUCCESS) {
SSqlObj *pObj = (SSqlObj *)result; SSqlObj *pObj = (SSqlObj *)result;
if (code == TSDB_CODE_TSC_INVALID_SQL) { if (code == TSDB_CODE_TSC_INVALID_SQL) {
httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p, error:%s", pContext, terrno = code;
pContext->fd, pContext->user, tstrerror(code), pObj, pObj->cmd.payload); httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p, error:%s", pContext, pContext->fd,
httpSendTaosdInvalidSqlErrorResp(pContext, pObj->cmd.payload); pContext->user, tstrerror(code), pObj, taos_errstr(pObj));
httpSendTaosdInvalidSqlErrorResp(pContext, taos_errstr(pObj));
} else { } else {
httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p", pContext, pContext->fd, httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p", pContext, pContext->fd,
pContext->user, tstrerror(code), pObj); pContext->user, tstrerror(code), pObj);
...@@ -279,6 +281,7 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo ...@@ -279,6 +281,7 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo
if (isUpdate) { if (isUpdate) {
// not select or show commands // not select or show commands
int affectRows = taos_affected_rows(result); int affectRows = taos_affected_rows(result);
assert(affectRows == affectRowsInput);
httpDebug("context:%p, fd:%d, user:%s, affect rows:%d, stop query, sqlObj:%p", pContext, pContext->fd, httpDebug("context:%p, fd:%d, user:%s, affect rows:%d, stop query, sqlObj:%p", pContext, pContext->fd,
pContext->user, affectRows, result); pContext->user, affectRows, result);
...@@ -309,7 +312,9 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo ...@@ -309,7 +312,9 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo
} }
void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) { void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) {
httpDispatchToResultQueue(param, result, unUsedCode, httpProcessSingleSqlCallBackImp); int32_t code = taos_errno(result);
int32_t affectRows = taos_affected_rows(result);
httpDispatchToResultQueue(param, result, code, affectRows, httpProcessSingleSqlCallBackImp);
} }
void httpProcessSingleSqlCmd(HttpContext *pContext) { void httpProcessSingleSqlCmd(HttpContext *pContext) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册