提交 6fb53b30 编写于 作者: S Shengliang Guan

TD-1311

上级 5fa8bc80
...@@ -31,7 +31,4 @@ void httpCloseContextByApp(HttpContext *pContext); ...@@ -31,7 +31,4 @@ void httpCloseContextByApp(HttpContext *pContext);
void httpNotifyContextClose(HttpContext *pContext); void httpNotifyContextClose(HttpContext *pContext);
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState); bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState);
void ehttpIncContextRef(HttpContext *pContext);
void ehttpDecContextRef(HttpContext **ppContext);
#endif #endif
...@@ -212,8 +212,6 @@ typedef struct HttpContext { ...@@ -212,8 +212,6 @@ typedef struct HttpContext {
void * timer; void * timer;
HttpEncodeMethod * encodeMethod; HttpEncodeMethod * encodeMethod;
struct HttpThread *pThread; struct HttpThread *pThread;
int closed:2;
} HttpContext; } HttpContext;
typedef struct HttpThread { typedef struct HttpThread {
...@@ -244,8 +242,6 @@ typedef struct HttpServer { ...@@ -244,8 +242,6 @@ typedef struct HttpServer {
pthread_mutex_t serverMutex; pthread_mutex_t serverMutex;
HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE]; HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE];
bool (*processData)(HttpContext *pContext); bool (*processData)(HttpContext *pContext);
int fallback:2;
} HttpServer; } HttpServer;
extern const char *httpKeepAliveStr[]; extern const char *httpKeepAliveStr[];
......
...@@ -67,8 +67,7 @@ bool gcGetPassFromUrl(HttpContext* pContext) { ...@@ -67,8 +67,7 @@ bool gcGetPassFromUrl(HttpContext* pContext) {
} }
bool gcProcessLoginRequest(HttpContext* pContext) { bool gcProcessLoginRequest(HttpContext* pContext) {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process grafana login msg", pContext, pContext->fd, pContext->ipstr, httpDebug("context:%p, fd:%d, user:%s, process grafana login msg", pContext, pContext->fd, pContext->user);
pContext->user);
pContext->reqType = HTTP_REQTYPE_LOGIN; pContext->reqType = HTTP_REQTYPE_LOGIN;
return true; return true;
} }
...@@ -143,7 +142,7 @@ bool gcProcessLoginRequest(HttpContext* pContext) { ...@@ -143,7 +142,7 @@ bool gcProcessLoginRequest(HttpContext* pContext) {
//}] //}]
bool gcProcessQueryRequest(HttpContext* pContext) { bool gcProcessQueryRequest(HttpContext* pContext) {
httpDebug("context:%p, fd:%d, ip:%s, process grafana query msg", pContext, pContext->fd, pContext->ipstr); httpDebug("context:%p, fd:%d, process grafana query msg", pContext, pContext->fd);
HttpParser* pParser = &pContext->parser; HttpParser* pParser = &pContext->parser;
char* filter = pParser->data.pos; char* filter = pParser->data.pos;
...@@ -183,15 +182,13 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -183,15 +182,13 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
cJSON* refId = cJSON_GetObjectItem(query, "refId"); cJSON* refId = cJSON_GetObjectItem(query, "refId");
if (refId == NULL || refId->valuestring == NULL || strlen(refId->valuestring) == 0) { if (refId == NULL || refId->valuestring == NULL || strlen(refId->valuestring) == 0) {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, refId is null", pContext, pContext->fd, pContext->ipstr, httpDebug("context:%p, fd:%d, user:%s, refId is null", pContext, pContext->fd, pContext->user);
pContext->user);
continue; continue;
} }
int refIdBuffer = httpAddToSqlCmdBuffer(pContext, refId->valuestring); int refIdBuffer = httpAddToSqlCmdBuffer(pContext, refId->valuestring);
if (refIdBuffer == -1) { if (refIdBuffer == -1) {
httpWarn("context:%p, fd:%d, ip:%s, user:%s, refId buffer is full", pContext, pContext->fd, pContext->ipstr, httpWarn("context:%p, fd:%d, user:%s, refId buffer is full", pContext, pContext->fd, pContext->user);
pContext->user);
break; break;
} }
...@@ -200,8 +197,7 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -200,8 +197,7 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
if (!(alias == NULL || alias->valuestring == NULL || strlen(alias->valuestring) == 0)) { if (!(alias == NULL || alias->valuestring == NULL || strlen(alias->valuestring) == 0)) {
aliasBuffer = httpAddToSqlCmdBuffer(pContext, alias->valuestring); aliasBuffer = httpAddToSqlCmdBuffer(pContext, alias->valuestring);
if (aliasBuffer == -1) { if (aliasBuffer == -1) {
httpWarn("context:%p, fd:%d, ip:%s, user:%s, alias buffer is full", pContext, pContext->fd, pContext->ipstr, httpWarn("context:%p, fd:%d, user:%s, alias buffer is full", pContext, pContext->fd, pContext->user);
pContext->user);
break; break;
} }
} }
...@@ -211,15 +207,13 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -211,15 +207,13 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
cJSON* sql = cJSON_GetObjectItem(query, "sql"); cJSON* sql = cJSON_GetObjectItem(query, "sql");
if (sql == NULL || sql->valuestring == NULL || strlen(sql->valuestring) == 0) { if (sql == NULL || sql->valuestring == NULL || strlen(sql->valuestring) == 0) {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, sql is null", pContext, pContext->fd, pContext->ipstr, httpDebug("context:%p, fd:%d, user:%s, sql is null", pContext, pContext->fd, pContext->user);
pContext->user);
continue; continue;
} }
int sqlBuffer = httpAddToSqlCmdBuffer(pContext, sql->valuestring); int sqlBuffer = httpAddToSqlCmdBuffer(pContext, sql->valuestring);
if (sqlBuffer == -1) { if (sqlBuffer == -1) {
httpWarn("context:%p, fd:%d, ip:%s, user:%s, sql buffer is full", pContext, pContext->fd, pContext->ipstr, httpWarn("context:%p, fd:%d, user:%s, sql buffer is full", pContext, pContext->fd, pContext->user);
pContext->user);
break; break;
} }
...@@ -237,8 +231,8 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -237,8 +231,8 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
cmd->timestamp = httpAddToSqlCmdBufferWithSize(pContext, HTTP_GC_TARGET_SIZE + 1); // hack way cmd->timestamp = httpAddToSqlCmdBufferWithSize(pContext, HTTP_GC_TARGET_SIZE + 1); // hack way
if (cmd->timestamp == -1) { if (cmd->timestamp == -1) {
httpWarn("context:%p, fd:%d, ip:%s, user:%s, cant't malloc target size, sql buffer is full", httpWarn("context:%p, fd:%d, user:%s, cant't malloc target size, sql buffer is full", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user); pContext->user);
break; break;
} }
} }
...@@ -251,7 +245,7 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -251,7 +245,7 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
} }
bool gcProcessHeartbeatRequest(HttpContext* pContext) { bool gcProcessHeartbeatRequest(HttpContext* pContext) {
httpDebug("context:%p, fd:%d, ip:%s, process grafana heartbeat msg", pContext, pContext->fd, pContext->ipstr); httpDebug("context:%p, fd:%d, process grafana heartbeat msg", pContext, pContext->fd);
pContext->reqType = HTTP_REQTYPE_HEARTBEAT; pContext->reqType = HTTP_REQTYPE_HEARTBEAT;
pContext->encodeMethod = &gcHeartBeatMethod; pContext->encodeMethod = &gcHeartBeatMethod;
return true; return true;
......
...@@ -28,23 +28,21 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) { ...@@ -28,23 +28,21 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) {
int outlen = 0; int outlen = 0;
char *base64 = (char *)base64_decode(token, len, &outlen); char *base64 = (char *)base64_decode(token, len, &outlen);
if (base64 == NULL || outlen == 0) { if (base64 == NULL || outlen == 0) {
httpError("context:%p, fd:%d, ip:%s, basic token:%s parsed error", pContext, pContext->fd, pContext->ipstr, token); httpError("context:%p, fd:%d, basic token:%s parsed error", pContext, pContext->fd, token);
free(base64); free(base64);
return false; return false;
} }
char *user = strstr(base64, ":"); char *user = strstr(base64, ":");
if (user == NULL) { if (user == NULL) {
httpError("context:%p, fd:%d, ip:%s, basic token:%s invalid format", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, basic token:%s invalid format", pContext, pContext->fd, token);
token);
free(base64); free(base64);
return false; return false;
} }
int user_len = (int)(user - base64); int user_len = (int)(user - base64);
if (user_len < 1 || user_len >= TSDB_USER_LEN) { if (user_len < 1 || user_len >= TSDB_USER_LEN) {
httpError("context:%p, fd:%d, ip:%s, basic token:%s parse user error", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, basic token:%s parse user error", pContext, pContext->fd, token);
token);
free(base64); free(base64);
return false; return false;
} }
...@@ -54,8 +52,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) { ...@@ -54,8 +52,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) {
char *password = user + 1; char *password = user + 1;
int pass_len = (int)((base64 + outlen) - password); int pass_len = (int)((base64 + outlen) - password);
if (pass_len < 1 || pass_len >= TSDB_PASSWORD_LEN) { if (pass_len < 1 || pass_len >= TSDB_PASSWORD_LEN) {
httpError("context:%p, fd:%d, ip:%s, basic token:%s parse password error", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, basic token:%s parse password error", pContext, pContext->fd, token);
token);
free(base64); free(base64);
return false; return false;
} }
...@@ -63,8 +60,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) { ...@@ -63,8 +60,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) {
pContext->pass[pass_len] = 0; pContext->pass[pass_len] = 0;
free(base64); free(base64);
httpDebug("context:%p, fd:%d, ip:%s, basic token parsed success, user:%s", pContext, pContext->fd, pContext->ipstr, httpDebug("context:%p, fd:%d, basic token parsed success, user:%s", pContext, pContext->fd, pContext->user);
pContext->user);
return true; return true;
} }
...@@ -73,28 +69,27 @@ bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len) { ...@@ -73,28 +69,27 @@ bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len) {
int outlen = 0; int outlen = 0;
unsigned char *base64 = base64_decode(token, len, &outlen); unsigned char *base64 = base64_decode(token, len, &outlen);
if (base64 == NULL || outlen == 0) { if (base64 == NULL || outlen == 0) {
httpError("context:%p, fd:%d, ip:%s, taosd token:%s parsed error", pContext, pContext->fd, pContext->ipstr, token); httpError("context:%p, fd:%d, taosd token:%s parsed error", pContext, pContext->fd, token);
if (base64) free(base64); if (base64) free(base64);
return false; return false;
} }
if (outlen != (TSDB_USER_LEN + TSDB_PASSWORD_LEN)) { if (outlen != (TSDB_USER_LEN + TSDB_PASSWORD_LEN)) {
httpError("context:%p, fd:%d, ip:%s, taosd token:%s length error", pContext, pContext->fd, pContext->ipstr, token); httpError("context:%p, fd:%d, taosd token:%s length error", pContext, pContext->fd, token);
free(base64); free(base64);
return false; return false;
} }
char *descrypt = taosDesDecode(KEY_DES_4, (char *)base64, outlen); char *descrypt = taosDesDecode(KEY_DES_4, (char *)base64, outlen);
if (descrypt == NULL) { if (descrypt == NULL) {
httpError("context:%p, fd:%d, ip:%s, taosd token:%s descrypt error", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, taosd token:%s descrypt error", pContext, pContext->fd, token);
token);
free(base64); free(base64);
return false; return false;
} else { } else {
tstrncpy(pContext->user, descrypt, sizeof(pContext->user)); tstrncpy(pContext->user, descrypt, sizeof(pContext->user));
tstrncpy(pContext->pass, descrypt + TSDB_USER_LEN, sizeof(pContext->pass)); tstrncpy(pContext->pass, descrypt + TSDB_USER_LEN, sizeof(pContext->pass));
httpDebug("context:%p, fd:%d, ip:%s, taosd token:%s parsed success, user:%s", pContext, pContext->fd, httpDebug("context:%p, fd:%d, taosd token:%s parsed success, user:%s", pContext, pContext->fd, token,
pContext->ipstr, token, pContext->user); pContext->user);
free(base64); free(base64);
free(descrypt); free(descrypt);
return true; return true;
...@@ -116,7 +111,7 @@ bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen) { ...@@ -116,7 +111,7 @@ bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen) {
free(encrypt); free(encrypt);
free(base64); free(base64);
httpDebug("context:%p, fd:%d, ip:%s, gen taosd token:%s", pContext, pContext->fd, pContext->ipstr, token); httpDebug("context:%p, fd:%d, gen taosd token:%s", pContext, pContext->fd, token);
return true; return true;
} }
...@@ -37,16 +37,14 @@ extern bool httpParseHttpVersion(HttpContext* pContext); ...@@ -37,16 +37,14 @@ extern bool httpParseHttpVersion(HttpContext* pContext);
extern bool httpGetDecodeMethod(HttpContext* pContext); extern bool httpGetDecodeMethod(HttpContext* pContext);
extern bool httpParseHead(HttpContext* pContext); extern bool httpParseHead(HttpContext* pContext);
static void on_request_line(void *arg, const char *method, const char *target, const char *version, const char *target_raw); static void httpParseOnRequestLine(void *arg, const char *method, const char *target, const char *version, const char *target_raw);
static void on_status_line(void *arg, const char *version, int status_code, const char *reason_phrase); static void httpParseOnStatusLine(void *arg, const char *version, int status_code, const char *reason_phrase);
static void on_header_field(void *arg, const char *key, const char *val); static void httpParseOnHeaderField(void *arg, const char *key, const char *val);
static void on_body(void *arg, const char *chunk, size_t len); static void httpParseOnBody(void *arg, const char *chunk, size_t len);
static void on_end(void *arg); static void httpParseOnEnd(void *arg);
static void on_error(void *arg, int status_code); static void httpParseOnError(void *arg, int status_code);
static void httpDestroyContext(void *data); static void httpDestroyContext(void *data);
static void httpMightDestroyContext(void *data);
static void ehttpReleaseContext(HttpContext *pContext);
static void httpRemoveContextFromEpoll(HttpContext *pContext) { static void httpRemoveContextFromEpoll(HttpContext *pContext) {
HttpThread *pThread = pContext->pThread; HttpThread *pThread = pContext->pThread;
...@@ -54,15 +52,11 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) { ...@@ -54,15 +52,11 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) {
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL); epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL);
int32_t fd = atomic_val_compare_exchange_32(&pContext->fd, pContext->fd, -1); int32_t fd = atomic_val_compare_exchange_32(&pContext->fd, pContext->fd, -1);
taosCloseSocket(fd); taosCloseSocket(fd);
if (!tsHttpServer.fallback) {
ehttpDecContextRef(&pContext);
}
} }
} }
static void httpDestroyContext(void *data) { static void httpDestroyContext(void *data) {
HttpContext *pContext = *(HttpContext **)data; HttpContext *pContext = *(HttpContext **)data;
D("==context[%p] destroyed==", pContext);
if (pContext->fd > 0) taosClose(pContext->fd); if (pContext->fd > 0) taosClose(pContext->fd);
HttpThread *pThread = pContext->pThread; HttpThread *pThread = pContext->pThread;
...@@ -79,18 +73,16 @@ static void httpDestroyContext(void *data) { ...@@ -79,18 +73,16 @@ static void httpDestroyContext(void *data) {
httpFreeJsonBuf(pContext); httpFreeJsonBuf(pContext);
httpFreeMultiCmds(pContext); httpFreeMultiCmds(pContext);
if (!tsHttpServer.fallback) { if (pContext->parser.parser) {
if (pContext->parser.parser) { ehttp_parser_destroy(pContext->parser.parser);
ehttp_parser_destroy(pContext->parser.parser); pContext->parser.parser = NULL;
pContext->parser.parser = NULL;
}
} }
taosTFree(pContext); taosTFree(pContext);
} }
bool httpInitContexts() { bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpMightDestroyContext, "restc"); tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpDestroyContext, "restc");
if (tsHttpServer.contextCache == NULL) { if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache"); httpError("failed to init context cache");
return false; return false;
...@@ -135,20 +127,16 @@ HttpContext *httpCreateContext(int32_t fd) { ...@@ -135,20 +127,16 @@ HttpContext *httpCreateContext(int32_t fd) {
HttpContext *pContext = calloc(1, sizeof(HttpContext)); HttpContext *pContext = calloc(1, sizeof(HttpContext));
if (pContext == NULL) return NULL; if (pContext == NULL) return NULL;
D("==context[%p] created==", pContext);
pContext->fd = fd; pContext->fd = fd;
pContext->httpVersion = HTTP_VERSION_10; pContext->httpVersion = HTTP_VERSION_10;
pContext->lastAccessTime = taosGetTimestampSec(); pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY; pContext->state = HTTP_CONTEXT_STATE_READY;
ehttpIncContextRef(pContext);
uint64_t handleVal = (uint64_t)pContext; uint64_t handleVal = (uint64_t)pContext;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &handleVal, sizeof(int64_t), &pContext, sizeof(int64_t), 3000); HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &handleVal, sizeof(int64_t), &pContext, sizeof(int64_t), 3000);
pContext->ppContext = ppContext; pContext->ppContext = ppContext;
httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext); httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext);
ehttpIncContextRef(pContext);
// set the ref to 0 // set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false); taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false);
...@@ -164,7 +152,6 @@ HttpContext *httpGetContext(void *ptr) { ...@@ -164,7 +152,6 @@ HttpContext *httpGetContext(void *ptr) {
if (ppContext) { if (ppContext) {
HttpContext *pContext = *ppContext; HttpContext *pContext = *ppContext;
if (pContext) { if (pContext) {
if (!tsHttpServer.fallback) return pContext;
int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1);
httpDebug("context:%p, fd:%d, is accquired, data:%p refCount:%d", pContext, pContext->fd, ppContext, refCount); httpDebug("context:%p, fd:%d, is accquired, data:%p refCount:%d", pContext, pContext->fd, ppContext, refCount);
return pContext; return pContext;
...@@ -174,10 +161,6 @@ HttpContext *httpGetContext(void *ptr) { ...@@ -174,10 +161,6 @@ HttpContext *httpGetContext(void *ptr) {
} }
void httpReleaseContext(HttpContext *pContext) { void httpReleaseContext(HttpContext *pContext) {
if (!tsHttpServer.fallback) {
ehttpReleaseContext(pContext);
return;
}
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
if (refCount < 0) { if (refCount < 0) {
httpError("context:%p, is already released, refCount:%d", pContext, refCount); httpError("context:%p, is already released, refCount:%d", pContext, refCount);
...@@ -212,31 +195,25 @@ bool httpInitContext(HttpContext *pContext) { ...@@ -212,31 +195,25 @@ bool httpInitContext(HttpContext *pContext) {
memset(pParser, 0, sizeof(HttpParser)); memset(pParser, 0, sizeof(HttpParser));
pParser->pCur = pParser->pLast = pParser->buffer; pParser->pCur = pParser->pLast = pParser->buffer;
if (!tsHttpServer.fallback) { ehttp_parser_callbacks_t callbacks = {
ehttp_parser_callbacks_t callbacks = { httpParseOnRequestLine,
on_request_line, httpParseOnStatusLine,
on_status_line, httpParseOnHeaderField,
on_header_field, httpParseOnBody,
on_body, httpParseOnEnd,
on_end, httpParseOnError
on_error };
}; ehttp_parser_conf_t conf = {
ehttp_parser_conf_t conf = { .flush_block_size = 0
.flush_block_size = 0 };
}; pParser->parser = ehttp_parser_create(callbacks, conf, pContext);
pParser->parser = ehttp_parser_create(callbacks, conf, pContext); pParser->inited = 1;
pParser->inited = 1;
} httpDebug("context:%p, fd:%d, parsed:%d", pContext, pContext->fd, pContext->parsed);
httpDebug("context:%p, fd:%d, ip:%s, accessTimes:%d, parsed:%d", pContext, pContext->fd, pContext->ipstr,
pContext->accessTimes, pContext->parsed);
return true; return true;
} }
void httpCloseContextByApp(HttpContext *pContext) { void httpCloseContextByApp(HttpContext *pContext) {
if (!tsHttpServer.fallback) {
if (pContext->parsed == false) return;
}
pContext->parsed = false; pContext->parsed = false;
bool keepAlive = true; bool keepAlive = true;
...@@ -249,150 +226,132 @@ void httpCloseContextByApp(HttpContext *pContext) { ...@@ -249,150 +226,132 @@ void httpCloseContextByApp(HttpContext *pContext) {
if (keepAlive) { if (keepAlive) {
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) { if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) {
httpDebug("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse context", pContext, pContext->fd, httpDebug("context:%p, fd:%d, last state:handling, keepAlive:true, reuse context", pContext, pContext->fd);
pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_CLOSED)) { } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_CLOSED)) {
httpRemoveContextFromEpoll(pContext); httpRemoveContextFromEpoll(pContext);
httpDebug("context:%p, fd:%d, ip:%s, last state:dropping, keepAlive:true, close connect", pContext, pContext->fd, httpDebug("context:%p, fd:%d, ast state:dropping, keepAlive:true, close connect", pContext, pContext->fd);
pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) { } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
httpDebug("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, reuse context", pContext, pContext->fd, httpDebug("context:%p, fd:%d, last state:ready, keepAlive:true, reuse context", pContext, pContext->fd);
pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) { } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
httpRemoveContextFromEpoll(pContext); httpRemoveContextFromEpoll(pContext);
httpDebug("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, close connect", pContext, pContext->fd, httpDebug("context:%p, fd:%d, last state:ready, keepAlive:true, close connect", pContext, pContext->fd);
pContext->ipstr);
} else { } else {
httpRemoveContextFromEpoll(pContext); httpRemoveContextFromEpoll(pContext);
httpError("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:true, close connect", pContext, pContext->fd, httpError("context:%p, fd:%d, last state:%s:%d, keepAlive:true, close connect", pContext, pContext->fd,
pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); httpContextStateStr(pContext->state), pContext->state);
} }
} else { } else {
httpRemoveContextFromEpoll(pContext); httpRemoveContextFromEpoll(pContext);
httpDebug("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close context", pContext, pContext->fd, httpDebug("context:%p, fd:%d, ilast state:%s:%d, keepAlive:false, close context", pContext, pContext->fd,
pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); httpContextStateStr(pContext->state), pContext->state);
} }
if (tsHttpServer.fallback) httpReleaseContext(pContext); httpReleaseContext(pContext);
} }
void httpCloseContextByServer(HttpContext *pContext) { void httpCloseContextByServer(HttpContext *pContext) {
if (!tsHttpServer.fallback) {
if (pContext->closed) return;
pContext->closed = 1;
}
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) { if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) {
httpDebug("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr); httpDebug("context:%p, fd:%d, epoll finished, still used by app", pContext, pContext->fd);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) { } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) {
httpDebug("context:%p, fd:%d, ip:%s, epoll already finished, wait app finished", pContext, pContext->fd, pContext->ipstr); httpDebug("context:%p, fd:%d, epoll already finished, wait app finished", pContext, pContext->fd);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_CLOSED)) { } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_CLOSED)) {
httpDebug("context:%p, fd:%d, ip:%s, epoll finished, close connect", pContext, pContext->fd, pContext->ipstr); httpDebug("context:%p, fd:%d, epoll finished, close connect", pContext, pContext->fd);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) { } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
httpDebug("context:%p, fd:%d, ip:%s, epoll finished, will be closed soon", pContext, pContext->fd, pContext->ipstr); httpDebug("context:%p, fd:%d, epoll finished, will be closed soon", pContext, pContext->fd);
} else { } else {
httpError("context:%p, fd:%d, ip:%s, unknown state:%d", pContext, pContext->fd, pContext->ipstr, pContext->state); httpError("context:%p, fd:%d, unknown state:%d", pContext, pContext->fd, pContext->state);
} }
pContext->parsed = false; pContext->parsed = false;
httpRemoveContextFromEpoll(pContext); httpRemoveContextFromEpoll(pContext);
if (tsHttpServer.fallback) httpReleaseContext(pContext);
} }
static void httpParseOnRequestLine(void *arg, const char *method, const char *target, const char *version, const char *target_raw) {
static void on_request_line(void *arg, const char *method, const char *target, const char *version, const char *target_raw) {
HttpContext *pContext = (HttpContext*)arg; HttpContext *pContext = (HttpContext*)arg;
HttpParser *pParser = &pContext->parser; HttpParser *pParser = &pContext->parser;
int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer);
int n = snprintf(pParser->pLast, avail, int n = snprintf(pParser->pLast, avail, "%s %s %s\r\n", method, target_raw, version);
"%s %s %s\r\n", method, target_raw, version);
char *last = pParser->pLast; char *last = pParser->pLast;
do { do {
if (n>=avail) { if (n >= avail) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), exceeding buffer size", httpDebug("context:%p, fd:%d, request line(%s,%s,%s,%s), exceeding buffer size", pContext, pContext->fd, method,
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); target, version, target_raw);
break; break;
} }
pParser->bufsize += n; pParser->bufsize += n;
if (!httpGetHttpMethod(pContext)) { if (!httpGetHttpMethod(pContext)) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http method failed", httpDebug("context:%p, fd:%d, request line(%s,%s,%s,%s), parse http method failed", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); method, target, version, target_raw);
break; break;
} }
if (!httpParseURL(pContext)) { if (!httpParseURL(pContext)) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http url failed", httpDebug("context:%p, fd:%d, request line(%s,%s,%s,%s), parse http url failed", pContext, pContext->fd, method,
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); target, version, target_raw);
break; break;
} }
if (!httpParseHttpVersion(pContext)) { if (!httpParseHttpVersion(pContext)) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http version failed", httpDebug("context:%p, fd:%d, request line(%s,%s,%s,%s), parse http version failed", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); method, target, version, target_raw);
break; break;
} }
if (!httpGetDecodeMethod(pContext)) { if (!httpGetDecodeMethod(pContext)) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), get decode method failed", httpDebug("context:%p, fd:%d, request line(%s,%s,%s,%s), get decode method failed", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); method, target, version, target_raw);
break; break;
} }
last += n; last += n;
pParser->pLast = last; pParser->pLast = last;
return; return;
} while (0); } while (0);
pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED;
} }
static void on_status_line(void *arg, const char *version, int status_code, const char *reason_phrase) { static void httpParseOnStatusLine(void *arg, const char *version, int status_code, const char *reason_phrase) {
HttpContext *pContext = (HttpContext*)arg; HttpContext *pContext = (HttpContext*)arg;
HttpParser *pParser = &pContext->parser; HttpParser *pParser = &pContext->parser;
httpDebug("context:%p, fd:%d, failed to parse status line ", pContext, pContext->fd);
pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED;
} }
static void on_header_field(void *arg, const char *key, const char *val) { static void httpParseOnHeaderField(void *arg, const char *key, const char *val) {
HttpContext *pContext = (HttpContext*)arg; HttpContext *pContext = (HttpContext*)arg;
HttpParser *pParser = &pContext->parser; HttpParser *pParser = &pContext->parser;
if (pParser->failed) return; if (pParser->failed) return;
D("==key:[%s], val:[%s]==", key, val); httpDebug("context:%p, fd:%d, key:%s val:%s", pContext, pContext->fd, key, val);
int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer);
int n = snprintf(pParser->pLast, avail, int n = snprintf(pParser->pLast, avail, "%s: %s\r\n", key, val);
"%s: %s\r\n", key, val);
char *last = pParser->pLast; char *last = pParser->pLast;
do { do {
if (n>=avail) { if (n >= avail) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_header_field(%s,%s), exceeding buffer size", httpDebug("context:%p, fd:%d, header field(%s,%s), exceeding buffer size", pContext, pContext->fd, key, val);
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, key, val);
break; break;
} }
pParser->bufsize += n; pParser->bufsize += n;
pParser->pCur = pParser->pLast + n; pParser->pCur = pParser->pLast + n;
if (!httpParseHead(pContext)) { if (!httpParseHead(pContext)) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_header_field(%s,%s), parse head failed", httpDebug("context:%p, fd:%d, header field(%s,%s), parse failed", pContext, pContext->fd, key, val);
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, key, val);
break; break;
} }
last += n; last += n;
pParser->pLast = last; pParser->pLast = last;
return; return;
} while (0); } while (0);
pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED;
} }
static void on_body(void *arg, const char *chunk, size_t len) { static void httpParseOnBody(void *arg, const char *chunk, size_t len) {
HttpContext *pContext = (HttpContext*)arg; HttpContext *pContext = (HttpContext*)arg;
HttpParser *pParser = &pContext->parser; HttpParser *pParser = &pContext->parser;
...@@ -404,16 +363,18 @@ static void on_body(void *arg, const char *chunk, size_t len) { ...@@ -404,16 +363,18 @@ static void on_body(void *arg, const char *chunk, size_t len) {
} }
int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer);
if (len+1>=avail) { if (len + 1 >= avail) {
httpError("context:%p, fd:%d, failed parse body, exceeding buffer size", pContext, pContext->fd);
pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED;
return; return;
} }
memcpy(pParser->pLast, chunk, len); memcpy(pParser->pLast, chunk, len);
pParser->pLast += len; pParser->pLast += len;
pParser->data.len += len; pParser->data.len += len;
} }
static void on_end(void *arg) { static void httpParseOnEnd(void *arg) {
HttpContext *pContext = (HttpContext*)arg; HttpContext *pContext = (HttpContext*)arg;
HttpParser *pParser = &pContext->parser; HttpParser *pParser = &pContext->parser;
...@@ -424,47 +385,14 @@ static void on_end(void *arg) { ...@@ -424,47 +385,14 @@ static void on_end(void *arg) {
if (!pContext->parsed) { if (!pContext->parsed) {
pContext->parsed = true; pContext->parsed = true;
} }
}
static void on_error(void *arg, int status_code) {
HttpContext *pContext = (HttpContext*)arg;
HttpParser *pParser = &pContext->parser;
pParser->failed |= EHTTP_CONTEXT_PARSER_FAILED;
}
static void httpMightDestroyContext(void *data) {
HttpContext *pContext = *(HttpContext **)data;
if (!tsHttpServer.fallback) {
httpRemoveContextFromEpoll(pContext);
ehttpDecContextRef(&pContext);
return;
}
httpDestroyContext(data);
}
static void ehttpReleaseContext(HttpContext *pContext) {
HttpContext **ppContext = pContext->ppContext;
if (tsHttpServer.contextCache != NULL) { httpDebug("context:%p, fd:%d, parse success", pContext, pContext->fd);
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
} else {
httpDebug("context:%p, won't be destroyed for cache is already released", pContext);
// httpDestroyContext((void **)(&ppContext));
}
} }
void ehttpIncContextRef(HttpContext *pContext) { static void httpParseOnError(void *arg, int status_code) {
if (tsHttpServer.fallback) return; HttpContext *pContext = (HttpContext *)arg;
atomic_add_fetch_32(&pContext->refCount, 1); HttpParser * pParser = &pContext->parser;
}
void ehttpDecContextRef(HttpContext **ppContext) { httpError("context:%p, fd:%d, failed to parse, status_code:%d", pContext, pContext->fd, status_code);
if (tsHttpServer.fallback) return; pParser->failed |= EHTTP_CONTEXT_PARSER_FAILED;
HttpContext *pContext = *ppContext;
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
if (refCount>0) return;
EQ_ASSERT(refCount==0);
httpDestroyContext(ppContext);
} }
...@@ -103,15 +103,13 @@ bool httpParseHttpVersion(HttpContext* pContext) { ...@@ -103,15 +103,13 @@ bool httpParseHttpVersion(HttpContext* pContext) {
HttpParser* pParser = &pContext->parser; HttpParser* pParser = &pContext->parser;
char* pEnd = strchr(pParser->pLast, '1'); char* pEnd = strchr(pParser->pLast, '1');
if (pEnd == NULL) { if (pEnd == NULL) {
httpError("context:%p, fd:%d, ip:%s, can't find http version at position:%s", pContext, pContext->fd, httpError("context:%p, fd:%d, can't find http version at position:%s", pContext, pContext->fd, pParser->pLast);
pContext->ipstr, pParser->pLast);
httpSendErrorResp(pContext, HTTP_PARSE_HTTP_VERSION_ERROR); httpSendErrorResp(pContext, HTTP_PARSE_HTTP_VERSION_ERROR);
return false; return false;
} }
if (*(pEnd + 1) != '.') { if (*(pEnd + 1) != '.') {
httpError("context:%p, fd:%d, ip:%s, can't find http version at position:%s", pContext, pContext->fd, httpError("context:%p, fd:%d, can't find http version at position:%s", pContext, pContext->fd, pParser->pLast);
pContext->ipstr, pParser->pLast);
httpSendErrorResp(pContext, HTTP_PARSE_HTTP_VERSION_ERROR); httpSendErrorResp(pContext, HTTP_PARSE_HTTP_VERSION_ERROR);
return false; return false;
} }
...@@ -125,8 +123,7 @@ bool httpParseHttpVersion(HttpContext* pContext) { ...@@ -125,8 +123,7 @@ bool httpParseHttpVersion(HttpContext* pContext) {
else else
pContext->httpVersion = HTTP_VERSION_10; pContext->httpVersion = HTTP_VERSION_10;
httpDebug("context:%p, fd:%d, ip:%s, httpVersion:1.%d", pContext, pContext->fd, pContext->ipstr, httpDebug("context:%p, fd:%d, httpVersion:1.%d", pContext, pContext->fd, pContext->httpVersion);
pContext->httpVersion);
return true; return true;
} }
...@@ -147,18 +144,20 @@ bool httpGetNextLine(HttpContext* pContext) { ...@@ -147,18 +144,20 @@ bool httpGetNextLine(HttpContext* pContext) {
bool httpGetHttpMethod(HttpContext* pContext) { bool httpGetHttpMethod(HttpContext* pContext) {
HttpParser* pParser = &pContext->parser; HttpParser* pParser = &pContext->parser;
char* pSeek = strchr(pParser->pLast, ' '); char* pSeek = strchr(pParser->pLast, ' ');
if (pSeek == NULL) { if (pSeek == NULL) {
httpError("context:%p, fd:%d, failed to parse httpMethod", pContext, pContext->fd);
httpSendErrorResp(pContext, HTTP_PARSE_HTTP_METHOD_ERROR); httpSendErrorResp(pContext, HTTP_PARSE_HTTP_METHOD_ERROR);
return false; return false;
} }
pParser->method.pos = pParser->pLast; pParser->method.pos = pParser->pLast;
pParser->method.len = (int16_t)(pSeek - pParser->pLast); pParser->method.len = (int16_t)(pSeek - pParser->pLast);
pParser->method.pos[pParser->method.len] = 0; pParser->method.pos[pParser->method.len] = 0;
pParser->pLast = pSeek + 1; pParser->pLast = pSeek + 1;
httpTrace("context:%p, fd:%d, ip:%s, httpMethod:%s", pContext, pContext->fd, pContext->ipstr, pParser->method.pos); httpTrace("context:%p, fd:%d, httpMethod:%s", pContext, pContext->fd, pParser->method.pos);
return true; return true;
} }
...@@ -176,8 +175,8 @@ bool httpGetDecodeMethod(HttpContext* pContext) { ...@@ -176,8 +175,8 @@ bool httpGetDecodeMethod(HttpContext* pContext) {
return true; return true;
} }
httpError("context:%p, fd:%d, ip:%s, error:the url is not support, method:%s, path:%s", httpError("context:%p, fd:%d, error:the url is not support, method:%s, path:%s",
pContext, pContext->fd, pContext->ipstr, pParser->method.pos, pParser->path[0].pos); pContext, pContext->fd, pParser->method.pos, pParser->path[0].pos);
httpSendErrorResp(pContext, HTTP_UNSUPPORT_URL); httpSendErrorResp(pContext, HTTP_UNSUPPORT_URL);
return false; return false;
...@@ -187,23 +186,23 @@ bool httpParseHead(HttpContext* pContext) { ...@@ -187,23 +186,23 @@ bool httpParseHead(HttpContext* pContext) {
HttpParser* pParser = &pContext->parser; HttpParser* pParser = &pContext->parser;
if (strncasecmp(pParser->pLast, "Content-Length: ", 16) == 0) { if (strncasecmp(pParser->pLast, "Content-Length: ", 16) == 0) {
pParser->data.len = (int32_t)atoi(pParser->pLast + 16); pParser->data.len = (int32_t)atoi(pParser->pLast + 16);
httpTrace("context:%p, fd:%d, ip:%s, Content-Length:%d", pContext, pContext->fd, pContext->ipstr, httpTrace("context:%p, fd:%d, Content-Length:%d", pContext, pContext->fd,
pParser->data.len); pParser->data.len);
} else if (strncasecmp(pParser->pLast, "Accept-Encoding: ", 17) == 0) { } else if (strncasecmp(pParser->pLast, "Accept-Encoding: ", 17) == 0) {
if (tsHttpEnableCompress && strstr(pParser->pLast + 17, "gzip") != NULL) { if (tsHttpEnableCompress && strstr(pParser->pLast + 17, "gzip") != NULL) {
pContext->acceptEncoding = HTTP_COMPRESS_GZIP; pContext->acceptEncoding = HTTP_COMPRESS_GZIP;
httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:gzip", pContext, pContext->fd, pContext->ipstr); httpTrace("context:%p, fd:%d, Accept-Encoding:gzip", pContext, pContext->fd);
} else { } else {
pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY; pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY;
httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:identity", pContext, pContext->fd, pContext->ipstr); httpTrace("context:%p, fd:%d, Accept-Encoding:identity", pContext, pContext->fd);
} }
} else if (strncasecmp(pParser->pLast, "Content-Encoding: ", 18) == 0) { } else if (strncasecmp(pParser->pLast, "Content-Encoding: ", 18) == 0) {
if (strstr(pParser->pLast + 18, "gzip") != NULL) { if (strstr(pParser->pLast + 18, "gzip") != NULL) {
pContext->contentEncoding = HTTP_COMPRESS_GZIP; pContext->contentEncoding = HTTP_COMPRESS_GZIP;
httpTrace("context:%p, fd:%d, ip:%s, Content-Encoding:gzip", pContext, pContext->fd, pContext->ipstr); httpTrace("context:%p, fd:%d, Content-Encoding:gzip", pContext, pContext->fd);
} else { } else {
pContext->contentEncoding = HTTP_COMPRESS_IDENTITY; pContext->contentEncoding = HTTP_COMPRESS_IDENTITY;
httpTrace("context:%p, fd:%d, ip:%s, Content-Encoding:identity", pContext, pContext->fd, pContext->ipstr); httpTrace("context:%p, fd:%d, Content-Encoding:identity", pContext, pContext->fd);
} }
} else if (strncasecmp(pParser->pLast, "Connection: ", 12) == 0) { } else if (strncasecmp(pParser->pLast, "Connection: ", 12) == 0) {
if (strncasecmp(pParser->pLast + 12, "Keep-Alive", 10) == 0) { if (strncasecmp(pParser->pLast + 12, "Keep-Alive", 10) == 0) {
...@@ -211,8 +210,7 @@ bool httpParseHead(HttpContext* pContext) { ...@@ -211,8 +210,7 @@ bool httpParseHead(HttpContext* pContext) {
} else { } else {
pContext->httpKeepAlive = HTTP_KEEPALIVE_DISABLE; pContext->httpKeepAlive = HTTP_KEEPALIVE_DISABLE;
} }
httpTrace("context:%p, fd:%d, ip:%s, keepAlive:%d", pContext, pContext->fd, pContext->ipstr, httpTrace("context:%p, fd:%d, keepAlive:%d", pContext, pContext->fd, pContext->httpKeepAlive);
pContext->httpKeepAlive);
} else if (strncasecmp(pParser->pLast, "Transfer-Encoding: ", 19) == 0) { } else if (strncasecmp(pParser->pLast, "Transfer-Encoding: ", 19) == 0) {
if (strncasecmp(pParser->pLast + 19, "chunked", 7) == 0) { if (strncasecmp(pParser->pLast + 19, "chunked", 7) == 0) {
pContext->httpChunked = HTTP_CHUNKED; pContext->httpChunked = HTTP_CHUNKED;
...@@ -244,129 +242,6 @@ bool httpParseHead(HttpContext* pContext) { ...@@ -244,129 +242,6 @@ bool httpParseHead(HttpContext* pContext) {
return true; return true;
} }
bool httpParseChunkedBody(HttpContext* pContext, HttpParser* pParser, bool test) {
char* pEnd = pParser->buffer + pParser->bufsize;
char* pRet = pParser->data.pos;
char* pSize = pParser->data.pos;
size_t size = strtoul(pSize, NULL, 16);
if (size <= 0) return false;
while (size > 0) {
char* pData = strstr(pSize, "\r\n");
if (pData == NULL || pData >= pEnd) return false;
pData += 2;
pSize = strstr(pData, "\r\n");
if (pSize == NULL || pSize >= pEnd) return false;
if ((size_t)(pSize - pData) != size) return false;
pSize += 2;
if (!test) {
memmove(pRet, pData, size);
pRet += size;
}
size = strtoul(pSize, NULL, 16);
}
if (!test) {
*pRet = '\0';
}
return true;
}
int httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) {
bool parsedOk = httpParseChunkedBody(pContext, pParser, true);
if (parsedOk) {
httpParseChunkedBody(pContext, pParser, false);
return HTTP_CHECK_BODY_SUCCESS;
} else {
httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd, pContext->ipstr);
if (httpReadDataImp(pContext) != HTTP_READ_DATA_SUCCESS) {
httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr);
return HTTP_CHECK_BODY_ERROR;
} else {
return HTTP_CHECK_BODY_CONTINUE;
}
}
}
int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) {
int dataReadLen = pParser->bufsize - (int)(pParser->data.pos - pParser->buffer);
if (dataReadLen > pParser->data.len) {
httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, read size:%d dataReadLen:%d > pContext->data.len:%d",
pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len);
return HTTP_CHECK_BODY_ERROR;
} else if (dataReadLen < pParser->data.len) {
httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read",
pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len);
return HTTP_CHECK_BODY_CONTINUE;
} else {
return HTTP_CHECK_BODY_SUCCESS;
}
}
bool httpParseRequest(HttpContext* pContext) {
HttpParser *pParser = &pContext->parser;
if (pContext->parsed) {
return true;
}
httpTraceL("context:%p, fd:%d, ip:%s, thread:%s, numOfContexts:%d, read size:%d, raw data:\n%s", pContext,
pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfContexts,
pContext->parser.bufsize, pContext->parser.buffer);
if (!httpGetHttpMethod(pContext)) {
return false;
}
if (!httpParseURL(pContext)) {
return false;
}
if (!httpParseHttpVersion(pContext)) {
return false;
}
if (!httpGetDecodeMethod(pContext)) {
return false;
}
do {
if (!httpGetNextLine(pContext)) {
return false;
}
// Empty line, end of the HTTP HEAD
if (pParser->pCur - pParser->pLast == 1) {
pParser->data.pos = ++pParser->pCur;
break;
}
if (!httpParseHead(pContext)) {
return false;
}
pParser->pLast = ++pParser->pCur;
} while (1);
httpDebug("context:%p, fd:%d, ip:%s, parse http head ok", pContext, pContext->fd, pContext->ipstr);
pContext->parsed = true;
return true;
}
int httpCheckReadCompleted(HttpContext* pContext) {
HttpParser* pParser = &pContext->parser;
if (pContext->httpChunked == HTTP_UNCUNKED) {
return httpReadUnChunkedBody(pContext, pParser);
} else {
return httpReadChunkedBody(pContext, pParser);
}
}
bool httpDecodeRequest(HttpContext* pContext) { bool httpDecodeRequest(HttpContext* pContext) {
HttpParser* pParser = &pContext->parser; HttpParser* pParser = &pContext->parser;
if (pParser->pMethod->decodeFp == NULL) { if (pParser->pMethod->decodeFp == NULL) {
...@@ -380,17 +255,16 @@ bool httpDecodeRequest(HttpContext* pContext) { ...@@ -380,17 +255,16 @@ bool httpDecodeRequest(HttpContext* pContext) {
* Process the request from http pServer * Process the request from http pServer
*/ */
bool httpProcessData(HttpContext* pContext) { bool httpProcessData(HttpContext* pContext) {
if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_HANDLING)) { if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_HANDLING)) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s not in ready state, stop process request", httpDebug("context:%p, fd:%d, state:%s not in ready state, stop process request", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state)); httpContextStateStr(pContext->state));
httpCloseContextByApp(pContext); httpCloseContextByApp(pContext);
return false; return false;
} }
// handle Cross-domain request // handle Cross-domain request
if (strcmp(pContext->parser.method.pos, "OPTIONS") == 0) { if (strcmp(pContext->parser.method.pos, "OPTIONS") == 0) {
httpDebug("context:%p, fd:%d, ip:%s, process options request", pContext, pContext->fd, pContext->ipstr); httpDebug("context:%p, fd:%d, process options request", pContext, pContext->fd);
httpSendOptionResp(pContext, "process options request success"); httpSendOptionResp(pContext, "process options request success");
} else { } else {
if (!httpDecodeRequest(pContext)) { if (!httpDecodeRequest(pContext)) {
......
...@@ -52,14 +52,12 @@ int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz) { ...@@ -52,14 +52,12 @@ int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz) {
} }
if (len < 0) { if (len < 0) {
httpDebug("context:%p, fd:%d, ip:%s, socket write errno:%d, times:%d", httpDebug("context:%p, fd:%d, socket write errno:%d, times:%d", pContext, pContext->fd, errno, countWait);
pContext, pContext->fd, pContext->ipstr, errno, countWait);
if (++countWait > HTTP_WRITE_RETRY_TIMES) break; if (++countWait > HTTP_WRITE_RETRY_TIMES) break;
taosMsleep(HTTP_WRITE_WAIT_TIME_MS); taosMsleep(HTTP_WRITE_WAIT_TIME_MS);
continue; continue;
} else if (len == 0) { } else if (len == 0) {
httpDebug("context:%p, fd:%d, ip:%s, socket write errno:%d, connect already closed", httpDebug("context:%p, fd:%d, socket write errno:%d, connect already closed", pContext, pContext->fd, errno);
pContext, pContext->fd, pContext->ipstr, errno);
break; break;
} else { } else {
countWait = 0; countWait = 0;
...@@ -70,14 +68,13 @@ int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz) { ...@@ -70,14 +68,13 @@ int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz) {
return writeLen; return writeLen;
} }
int httpWriteBuf(struct HttpContext *pContext, const char *buf, int sz) { int httpWriteBuf(struct HttpContext* pContext, const char* buf, int sz) {
int writeSz = httpWriteBufByFd(pContext, buf, sz); int writeSz = httpWriteBufByFd(pContext, buf, sz);
if (writeSz != sz) { if (writeSz != sz) {
httpError("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, failed to send response:\n%s", httpError("context:%p, fd:%d, dataSize:%d, writeSize:%d, failed to send response:\n%s", pContext, pContext->fd, sz,
pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf); writeSz, buf);
} else { } else {
httpTrace("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, response:\n%s", pContext, pContext->fd, httpTrace("context:%p, fd:%d, dataSize:%d, writeSize:%d, response:\n%s", pContext, pContext->fd, sz, writeSz, buf);
pContext->ipstr, sz, writeSz, buf);
} }
return writeSz; return writeSz;
...@@ -86,8 +83,8 @@ int httpWriteBuf(struct HttpContext *pContext, const char *buf, int sz) { ...@@ -86,8 +83,8 @@ int httpWriteBuf(struct HttpContext *pContext, const char *buf, int sz) {
int httpWriteBufNoTrace(struct HttpContext *pContext, const char *buf, int sz) { int httpWriteBufNoTrace(struct HttpContext *pContext, const char *buf, int sz) {
int writeSz = httpWriteBufByFd(pContext, buf, sz); int writeSz = httpWriteBufByFd(pContext, buf, sz);
if (writeSz != sz) { if (writeSz != sz) {
httpError("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, failed to send response", httpError("context:%p, fd:%d, dataSize:%d, writeSize:%d, failed to send response", pContext, pContext->fd, sz,
pContext, pContext->fd, pContext->ipstr, sz, writeSz); writeSz);
} }
return writeSz; return writeSz;
...@@ -99,7 +96,7 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { ...@@ -99,7 +96,7 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
uint64_t srcLen = (uint64_t) (buf->lst - buf->buf); uint64_t srcLen = (uint64_t) (buf->lst - buf->buf);
if (buf->pContext->fd <= 0) { if (buf->pContext->fd <= 0) {
httpTrace("context:%p, fd:%d, ip:%s, write json body error", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); httpTrace("context:%p, fd:%d, write json body error", buf->pContext, buf->pContext->fd);
buf->pContext->fd = -1; buf->pContext->fd = -1;
} }
...@@ -113,12 +110,12 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { ...@@ -113,12 +110,12 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
if (buf->pContext->acceptEncoding == HTTP_COMPRESS_IDENTITY) { if (buf->pContext->acceptEncoding == HTTP_COMPRESS_IDENTITY) {
if (buf->lst == buf->buf) { if (buf->lst == buf->buf) {
httpTrace("context:%p, fd:%d, ip:%s, no data need dump", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); httpTrace("context:%p, fd:%d, no data need dump", buf->pContext, buf->pContext->fd);
return 0; // there is no data to dump. return 0; // there is no data to dump.
} else { } else {
int len = sprintf(sLen, "%lx\r\n", srcLen); int len = sprintf(sLen, "%lx\r\n", srcLen);
httpTrace("context:%p, fd:%d, ip:%s, write body, chunkSize:%" PRIu64 ", response:\n%s", httpTrace("context:%p, fd:%d, write body, chunkSize:%" PRIu64 ", response:\n%s", buf->pContext, buf->pContext->fd,
buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, buf->buf); srcLen, buf->buf);
httpWriteBufNoTrace(buf->pContext, sLen, len); httpWriteBufNoTrace(buf->pContext, sLen, len);
remain = httpWriteBufNoTrace(buf->pContext, buf->buf, (int) srcLen); remain = httpWriteBufNoTrace(buf->pContext, buf->buf, (int) srcLen);
} }
...@@ -129,18 +126,18 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { ...@@ -129,18 +126,18 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
if (ret == 0) { if (ret == 0) {
if (compressBufLen > 0) { if (compressBufLen > 0) {
int len = sprintf(sLen, "%x\r\n", compressBufLen); int len = sprintf(sLen, "%x\r\n", compressBufLen);
httpTrace("context:%p, fd:%d, ip:%s, write body, chunkSize:%" PRIu64 ", compressSize:%d, last:%d, response:\n%s", httpTrace("context:%p, fd:%d, write body, chunkSize:%" PRIu64 ", compressSize:%d, last:%d, response:\n%s",
buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, compressBufLen, isTheLast, buf->buf); buf->pContext, buf->pContext->fd, srcLen, compressBufLen, isTheLast, buf->buf);
httpWriteBufNoTrace(buf->pContext, sLen, len); httpWriteBufNoTrace(buf->pContext, sLen, len);
remain = httpWriteBufNoTrace(buf->pContext, (const char *) compressBuf, (int) compressBufLen); remain = httpWriteBufNoTrace(buf->pContext, (const char*)compressBuf, (int)compressBufLen);
} else { } else {
httpTrace("context:%p, fd:%d, ip:%s, last:%d, compress already dumped, response:\n%s", httpTrace("context:%p, fd:%d, last:%d, compress already dumped, response:\n%s", buf->pContext,
buf->pContext, buf->pContext->fd, buf->pContext->ipstr, isTheLast, buf->buf); buf->pContext->fd, isTheLast, buf->buf);
return 0; // there is no data to dump. return 0; // there is no data to dump.
} }
} else { } else {
httpError("context:%p, fd:%d, ip:%s, failed to compress data, chunkSize:%" PRIu64 ", last:%d, error:%d, response:\n%s", httpError("context:%p, fd:%d, failed to compress data, chunkSize:%" PRIu64 ", last:%d, error:%d, response:\n%s",
buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, isTheLast, ret, buf->buf); buf->pContext, buf->pContext->fd, srcLen, isTheLast, ret, buf->buf);
return 0; return 0;
} }
} }
...@@ -173,7 +170,7 @@ void httpWriteJsonBufHead(JsonBuf* buf) { ...@@ -173,7 +170,7 @@ void httpWriteJsonBufHead(JsonBuf* buf) {
void httpWriteJsonBufEnd(JsonBuf* buf) { void httpWriteJsonBufEnd(JsonBuf* buf) {
if (buf->pContext->fd <= 0) { if (buf->pContext->fd <= 0) {
httpTrace("context:%p, fd:%d, ip:%s, json buf fd is 0", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); httpTrace("context:%p, fd:%d, json buf fd is 0", buf->pContext, buf->pContext->fd);
buf->pContext->fd = -1; buf->pContext->fd = -1;
} }
...@@ -192,7 +189,7 @@ void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext) { ...@@ -192,7 +189,7 @@ void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext) {
httpGzipCompressInit(buf->pContext); httpGzipCompressInit(buf->pContext);
} }
httpDebug("context:%p, fd:%d, ip:%s, json buffer initialized", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); httpDebug("context:%p, fd:%d, json buffer initialized", buf->pContext, buf->pContext->fd);
} }
void httpJsonItemToken(JsonBuf* buf) { void httpJsonItemToken(JsonBuf* buf) {
......
...@@ -46,7 +46,7 @@ const char *httpRespTemplate[] = { ...@@ -46,7 +46,7 @@ const char *httpRespTemplate[] = {
}; };
static void httpSendErrorRespImp(HttpContext *pContext, int httpCode, char *httpCodeStr, int errNo, char *desc) { static void httpSendErrorRespImp(HttpContext *pContext, int httpCode, char *httpCodeStr, int errNo, char *desc) {
httpError("context:%p, fd:%d, ip:%s, code:%d, error:%s", pContext, pContext->fd, pContext->ipstr, httpCode, desc); httpError("context:%p, fd:%d, code:%d, error:%s", pContext, pContext->fd, httpCode, desc);
char head[512] = {0}; char head[512] = {0};
char body[512] = {0}; char body[512] = {0};
...@@ -169,7 +169,7 @@ void httpSendErrorRespWithDesc(HttpContext *pContext, int errNo, char *desc) { ...@@ -169,7 +169,7 @@ void httpSendErrorRespWithDesc(HttpContext *pContext, int errNo, char *desc) {
httpCodeStr = "Bad Request"; httpCodeStr = "Bad Request";
break; break;
default: default:
httpError("context:%p, fd:%d, ip:%s, error:%d not recognized", pContext, pContext->fd, pContext->ipstr, errNo); httpError("context:%p, fd:%d, error:%d not recognized", pContext, pContext->fd, errNo);
break; break;
} }
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
#define EPOLLWAKEUP (1u << 29) #define EPOLLWAKEUP (1u << 29)
#endif #endif
static bool ehttpReadData(HttpContext *pContext); static bool httpReadData(HttpContext *pContext);
static void httpStopThread(HttpThread* pThread) { static void httpStopThread(HttpThread* pThread) {
pThread->stop = true; pThread->stop = true;
...@@ -73,43 +73,9 @@ void httpCleanUpConnect() { ...@@ -73,43 +73,9 @@ void httpCleanUpConnect() {
httpDebug("http server:%s is cleaned up", pServer->label); httpDebug("http server:%s is cleaned up", pServer->label);
} }
int httpReadDataImp(HttpContext *pContext) {
HttpParser *pParser = &pContext->parser;
while (pParser->bufsize <= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) {
int nread = (int)taosReadSocket(pContext->fd, pParser->buffer + pParser->bufsize, HTTP_STEP_SIZE);
if (nread >= 0 && nread < HTTP_STEP_SIZE) {
pParser->bufsize += nread;
break;
} else if (nread < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
httpDebug("context:%p, fd:%d, ip:%s, read from socket error:%d, wait another event",
pContext, pContext->fd, pContext->ipstr, errno);
break;
} else {
httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect",
pContext, pContext->fd, pContext->ipstr, errno);
return HTTP_READ_DATA_FAILED;
}
} else {
pParser->bufsize += nread;
}
if (pParser->bufsize >= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) {
httpError("context:%p, fd:%d, ip:%s, thread:%s, request big than:%d",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, HTTP_BUFFER_SIZE);
return HTTP_REQUSET_TOO_BIG;
}
}
pParser->buffer[pParser->bufsize] = 0;
return HTTP_READ_DATA_SUCCESS;
}
static bool httpDecompressData(HttpContext *pContext) { static bool httpDecompressData(HttpContext *pContext) {
if (pContext->contentEncoding != HTTP_COMPRESS_GZIP) { if (pContext->contentEncoding != HTTP_COMPRESS_GZIP) {
httpTraceL("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos); httpTraceL("context:%p, fd:%d, content:%s", pContext, pContext->fd, pContext->parser.data.pos);
return true; return true;
} }
...@@ -125,64 +91,18 @@ static bool httpDecompressData(HttpContext *pContext) { ...@@ -125,64 +91,18 @@ static bool httpDecompressData(HttpContext *pContext) {
if (ret == 0) { if (ret == 0) {
memcpy(pContext->parser.data.pos, decompressBuf, decompressBufLen); memcpy(pContext->parser.data.pos, decompressBuf, decompressBufLen);
pContext->parser.data.pos[decompressBufLen] = 0; pContext->parser.data.pos[decompressBufLen] = 0;
httpTraceL("context:%p, fd:%d, ip:%s, rawSize:%d, decompressSize:%d, content:%s", pContext, pContext->fd, httpTraceL("context:%p, fd:%d, rawSize:%d, decompressSize:%d, content:%s", pContext, pContext->fd,
pContext->ipstr, pContext->parser.data.len, decompressBufLen, decompressBuf); pContext->parser.data.len, decompressBufLen, decompressBuf);
pContext->parser.data.len = decompressBufLen; pContext->parser.data.len = decompressBufLen;
} else { } else {
httpError("context:%p, fd:%d, ip:%s, failed to decompress data, rawSize:%d, error:%d", httpError("context:%p, fd:%d, failed to decompress data, rawSize:%d, error:%d", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->parser.data.len, ret); pContext->parser.data.len, ret);
} }
free(decompressBuf); free(decompressBuf);
return ret == 0; return ret == 0;
} }
static bool httpReadData(HttpContext *pContext) {
if (!tsHttpServer.fallback) return ehttpReadData(pContext);
if (!pContext->parsed) {
httpInitContext(pContext);
}
int32_t code = httpReadDataImp(pContext);
if (code != HTTP_READ_DATA_SUCCESS) {
if (code == HTTP_READ_DATA_FAILED) {
httpReleaseContext(pContext);
} else {
httpSendErrorResp(pContext, code);
httpNotifyContextClose(pContext);
}
return false;
}
if (!httpParseRequest(pContext)) {
httpNotifyContextClose(pContext);
return false;
}
int ret = httpCheckReadCompleted(pContext);
if (ret == HTTP_CHECK_BODY_CONTINUE) {
//httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr);
httpReleaseContext(pContext);
return false;
} else if (ret == HTTP_CHECK_BODY_SUCCESS){
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->parser.bufsize, pContext->parser.data.len);
if (httpDecompressData(pContext)) {
return true;
} else {
httpNotifyContextClose(pContext);
httpReleaseContext(pContext);
return false;
}
} else {
httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr);
httpNotifyContextClose(pContext);
httpReleaseContext(pContext);
return false;
}
}
static void httpProcessHttpData(void *param) { static void httpProcessHttpData(void *param) {
HttpServer *pServer = &tsHttpServer; HttpServer *pServer = &tsHttpServer;
HttpThread *pThread = (HttpThread *)param; HttpThread *pThread = (HttpThread *)param;
...@@ -194,8 +114,6 @@ static void httpProcessHttpData(void *param) { ...@@ -194,8 +114,6 @@ static void httpProcessHttpData(void *param) {
sigaddset(&set, SIGPIPE); sigaddset(&set, SIGPIPE);
pthread_sigmask(SIG_SETMASK, &set, NULL); pthread_sigmask(SIG_SETMASK, &set, NULL);
elog_set_thread_name("httpProcessHttpData");
while (1) { while (1) {
struct epoll_event events[HTTP_MAX_EVENTS]; struct epoll_event events[HTTP_MAX_EVENTS];
//-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1 //-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1
...@@ -215,66 +133,51 @@ static void httpProcessHttpData(void *param) { ...@@ -215,66 +133,51 @@ static void httpProcessHttpData(void *param) {
continue; continue;
} }
ehttpIncContextRef(pContext);
if (events[i].events & EPOLLPRI) { if (events[i].events & EPOLLPRI) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLPRI events occured, accessed:%d, close connect", httpDebug("context:%p, fd:%d, state:%s, EPOLLPRI events occured, accessed:%d, close connect", pContext,
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); pContext->fd, httpContextStateStr(pContext->state), pContext->accessTimes);
httpCloseContextByServer(pContext); httpCloseContextByServer(pContext);
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue; continue;
} }
if (events[i].events & EPOLLRDHUP) { if (events[i].events & EPOLLRDHUP) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect", httpDebug("context:%p, fd:%d, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect", pContext,
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); pContext->fd, httpContextStateStr(pContext->state), pContext->accessTimes);
httpCloseContextByServer(pContext); httpCloseContextByServer(pContext);
httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue; continue;
} }
if (events[i].events & EPOLLERR) { if (events[i].events & EPOLLERR) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLERR events occured, accessed:%d, close connect", httpDebug("context:%p, fd:%d, state:%s, EPOLLERR events occured, accessed:%d, close connect", pContext,
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); pContext->fd, httpContextStateStr(pContext->state), pContext->accessTimes);
httpCloseContextByServer(pContext); httpCloseContextByServer(pContext);
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue; continue;
} }
if (events[i].events & EPOLLHUP) { if (events[i].events & EPOLLHUP) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLHUP events occured, accessed:%d, close connect", httpDebug("context:%p, fd:%d, state:%s, EPOLLHUP events occured, accessed:%d, close connect", pContext,
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); pContext->fd, httpContextStateStr(pContext->state), pContext->accessTimes);
httpCloseContextByServer(pContext); httpCloseContextByServer(pContext);
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue; continue;
} }
if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) { if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, not in ready state, ignore read events", httpDebug("context:%p, fd:%d, state:%s, not in ready state, ignore read events", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state)); httpContextStateStr(pContext->state));
httpReleaseContext(pContext); httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue; continue;
} }
if (pServer->status != HTTP_SERVER_RUNNING) { if (pServer->status != HTTP_SERVER_RUNNING) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, server is not running, accessed:%d, close connect", pContext, httpDebug("context:%p, fd:%d, state:%s, server is not running, accessed:%d, close connect", pContext,
pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); pContext->fd, httpContextStateStr(pContext->state), pContext->accessTimes);
httpSendErrorResp(pContext, HTTP_SERVER_OFFLINE); httpSendErrorResp(pContext, HTTP_SERVER_OFFLINE);
httpNotifyContextClose(pContext); httpNotifyContextClose(pContext);
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
} else { } else {
if (httpReadData(pContext)) { if (httpReadData(pContext)) {
(*(pThread->processData))(pContext); (*(pThread->processData))(pContext);
atomic_fetch_add_32(&pServer->requestNum, 1); atomic_fetch_add_32(&pServer->requestNum, 1);
} }
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
} }
} }
} }
...@@ -355,8 +258,7 @@ static void *httpAcceptHttpConnection(void *arg) { ...@@ -355,8 +258,7 @@ static void *httpAcceptHttpConnection(void *arg) {
httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd,
pContext->ipstr, pThread->label, strerror(errno)); pContext->ipstr, pThread->label, strerror(errno));
taosClose(pContext->fd); taosClose(pContext->fd);
if (tsHttpServer.fallback) httpReleaseContext(pContext); httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue; continue;
} }
...@@ -430,12 +332,10 @@ bool httpInitConnect() { ...@@ -430,12 +332,10 @@ bool httpInitConnect() {
return true; return true;
} }
static bool httpReadData(HttpContext *pContext) {
static bool ehttpReadData(HttpContext *pContext) {
HttpParser *pParser = &pContext->parser; HttpParser *pParser = &pContext->parser;
EQ_ASSERT(!pContext->parsed); EQ_ASSERT(!pContext->parsed);
if (!pParser->parser) { if (!pParser->parser) {
if (!pParser->inited) { if (!pParser->inited) {
httpInitContext(pContext); httpInitContext(pContext);
...@@ -448,61 +348,44 @@ static bool ehttpReadData(HttpContext *pContext) { ...@@ -448,61 +348,44 @@ static bool ehttpReadData(HttpContext *pContext) {
pContext->accessTimes++; pContext->accessTimes++;
pContext->lastAccessTime = taosGetTimestampSec(); pContext->lastAccessTime = taosGetTimestampSec();
char buf[HTTP_STEP_SIZE+1] = {0}; char buf[HTTP_STEP_SIZE + 1] = {0};
int nread = (int)taosReadSocket(pContext->fd, buf, sizeof(buf)); int nread = (int)taosReadSocket(pContext->fd, buf, sizeof(buf));
if (nread > 0) { if (nread > 0) {
buf[nread] = '\0'; buf[nread] = '\0';
if (strstr(buf, "GET ")==buf && !strchr(buf, '\r') && !strchr(buf, '\n')) {
D("==half of request line received:\n%s\n==", buf);
}
if (ehttp_parser_parse(pParser->parser, buf, nread)) { if (ehttp_parser_parse(pParser->parser, buf, nread)) {
D("==parsing failed=="); httpError("context:%p, fd:%d, init parse failed, close connect", pContext, pContext->fd);
httpCloseContextByServer(pContext); httpNotifyContextClose(pContext);
return false; return false;
} }
if (pContext->parser.failed) { if (pContext->parser.failed) {
D("==parsing failed: [0x%x]==", pContext->parser.failed); httpError("context:%p, fd:%d, parse failed, close connect", pContext, pContext->fd);
httpNotifyContextClose(pContext); httpNotifyContextClose(pContext);
return false; return false;
} }
if (pContext->parsed) { if (pContext->parsed) {
// int ret = httpCheckReadCompleted(pContext); httpDebug("context:%p, fd:%d, read size:%d, dataLen:%d", pContext, pContext->fd, pContext->parser.bufsize,
// already done in ehttp_parser pContext->parser.data.len);
int ret = HTTP_CHECK_BODY_SUCCESS; if (httpDecompressData(pContext)) {
if (ret == HTTP_CHECK_BODY_CONTINUE) { return true;
//httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr);
return false;
} else if (ret == HTTP_CHECK_BODY_SUCCESS){
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->parser.bufsize, pContext->parser.data.len);
if (httpDecompressData(pContext)) {
return true;
} else {
httpNotifyContextClose(pContext);
return false;
}
} else { } else {
httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr);
httpNotifyContextClose(pContext); httpNotifyContextClose(pContext);
return false; return false;
} }
} }
return pContext->parsed; return pContext->parsed;
} else if (nread < 0) { } else if (nread < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
httpDebug("context:%p, fd:%d, ip:%s, read from socket error:%d, wait another event", httpDebug("context:%p, fd:%d, read from socket error:%d, wait another event", pContext, pContext->fd, errno);
pContext, pContext->fd, pContext->ipstr, errno); return false; // later again
return false; // later again
} else { } else {
httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect", httpError("context:%p, fd:%d, read from socket error:%d, close connect", pContext, pContext->fd, errno);
pContext, pContext->fd, pContext->ipstr, errno);
return false; return false;
} }
} else { } else {
// eof httpError("context:%p, fd:%d, nread:%d, wait another event", pContext, pContext->fd, nread);
return false; return false;
} }
} }
...@@ -39,15 +39,15 @@ void httpCreateSession(HttpContext *pContext, void *taos) { ...@@ -39,15 +39,15 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
// taosCacheRelease(server->sessionCache, (void **)&temp, false); // taosCacheRelease(server->sessionCache, (void **)&temp, false);
if (pContext->session == NULL) { if (pContext->session == NULL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user, httpError("context:%p, fd:%d, user:%s, error:%s", pContext, pContext->fd, pContext->user,
httpMsg[HTTP_SESSION_FULL]); httpMsg[HTTP_SESSION_FULL]);
taos_close(taos); taos_close(taos);
pthread_mutex_unlock(&server->serverMutex); pthread_mutex_unlock(&server->serverMutex);
return; return;
} }
httpDebug("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p sessionRef:%d", pContext, pContext->fd, httpDebug("context:%p, fd:%d, user:%s, create a new session:%p:%p sessionRef:%d", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
pthread_mutex_unlock(&server->serverMutex); pthread_mutex_unlock(&server->serverMutex);
} }
...@@ -61,11 +61,10 @@ static void httpFetchSessionImp(HttpContext *pContext) { ...@@ -61,11 +61,10 @@ static void httpFetchSessionImp(HttpContext *pContext) {
pContext->session = taosCacheAcquireByKey(server->sessionCache, sessionId, len); pContext->session = taosCacheAcquireByKey(server->sessionCache, sessionId, len);
if (pContext->session != NULL) { if (pContext->session != NULL) {
atomic_add_fetch_32(&pContext->session->refCount, 1); atomic_add_fetch_32(&pContext->session->refCount, 1);
httpDebug("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd, httpDebug("context:%p, fd:%d, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
} else { } else {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, session not found", pContext, pContext->fd, pContext->ipstr, httpDebug("context:%p, fd:%d, user:%s, session not found", pContext, pContext->fd, pContext->user);
pContext->user);
} }
pthread_mutex_unlock(&server->serverMutex); pthread_mutex_unlock(&server->serverMutex);
......
...@@ -56,18 +56,18 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n ...@@ -56,18 +56,18 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n
if (isContinue) { if (isContinue) {
// retrieve next batch of rows // retrieve next batch of rows
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, continue retrieve, numOfRows:%d, sql:%s", httpDebug("context:%p, fd:%d, user:%s, process pos:%d, continue retrieve, numOfRows:%d, sql:%s", pContext,
pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, numOfRows, sql); pContext->fd, pContext->user, multiCmds->pos, numOfRows, sql);
taos_fetch_rows_a(result, httpProcessMultiSqlRetrieveCallBack, param); taos_fetch_rows_a(result, httpProcessMultiSqlRetrieveCallBack, param);
} else { } else {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, stop retrieve, numOfRows:%d, sql:%s", httpDebug("context:%p, fd:%d, user:%s, process pos:%d, stop retrieve, numOfRows:%d, sql:%s", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, numOfRows, sql); pContext->user, multiCmds->pos, numOfRows, sql);
if (numOfRows < 0) { if (numOfRows < 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, retrieve failed code:%s, sql:%s", httpError("context:%p, fd:%d, user:%s, process pos:%d, retrieve failed code:%s, sql:%s", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, tstrerror(numOfRows), sql); pContext->user, multiCmds->pos, tstrerror(numOfRows), sql);
} }
taos_free_result(result); taos_free_result(result);
if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->stopJsonFp) { if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->stopJsonFp) {
...@@ -94,20 +94,20 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { ...@@ -94,20 +94,20 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) {
char * sql = httpGetCmdsString(pContext, singleCmd->sql); char * sql = httpGetCmdsString(pContext, singleCmd->sql);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
httpWarn("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, code:%s:inprogress, sql:%s", httpWarn("context:%p, fd:%d, user:%s, process pos:%d, code:%s:inprogress, sql:%s", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, tstrerror(code), sql); pContext->user, multiCmds->pos, tstrerror(code), sql);
return; return;
} }
if (code < 0) { if (code < 0) {
if (encode->checkFinishedFp != NULL && !encode->checkFinishedFp(pContext, singleCmd, code)) { if (encode->checkFinishedFp != NULL && !encode->checkFinishedFp(pContext, singleCmd, code)) {
singleCmd->code = code; singleCmd->code = code;
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos jump to:%d, last code:%s, last sql:%s", httpDebug("context:%p, fd:%d, user:%s, process pos jump to:%d, last code:%s, last sql:%s", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos + 1, tstrerror(code), sql); pContext->user, multiCmds->pos + 1, tstrerror(code), sql);
} else { } else {
singleCmd->code = code; singleCmd->code = code;
httpError("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, error code:%s, sql:%s", httpError("context:%p, fd:%d, user:%s, process pos:%d, error code:%s, sql:%s", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, tstrerror(code), sql); pContext->user, multiCmds->pos, tstrerror(code), sql);
if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN) { if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN) {
if (encode->startJsonFp) (encode->startJsonFp)(pContext, singleCmd, result); if (encode->startJsonFp) (encode->startJsonFp)(pContext, singleCmd, result);
...@@ -125,8 +125,8 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { ...@@ -125,8 +125,8 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) {
if (isUpdate) { if (isUpdate) {
// not select or show commands // not select or show commands
int affectRows = taos_affected_rows(result); int affectRows = taos_affected_rows(result);
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, affect rows:%d, sql:%s", httpDebug("context:%p, fd:%d, user:%s, process pos:%d, affect rows:%d, sql:%s", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, affectRows, sql); pContext->user, multiCmds->pos, affectRows, sql);
singleCmd->code = 0; singleCmd->code = 0;
...@@ -151,8 +151,8 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { ...@@ -151,8 +151,8 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) {
taos_free_result(result); taos_free_result(result);
httpProcessMultiSql(pContext); httpProcessMultiSql(pContext);
} else { } else {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, start retrieve, sql:%s", httpDebug("context:%p, fd:%d, user:%s, process pos:%d, start retrieve, sql:%s", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, sql); pContext->user, multiCmds->pos, sql);
if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->startJsonFp) { if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->startJsonFp) {
(encode->startJsonFp)(pContext, singleCmd, result); (encode->startJsonFp)(pContext, singleCmd, result);
...@@ -170,8 +170,8 @@ void httpProcessMultiSql(HttpContext *pContext) { ...@@ -170,8 +170,8 @@ void httpProcessMultiSql(HttpContext *pContext) {
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
if (multiCmds->pos >= multiCmds->size) { if (multiCmds->pos >= multiCmds->size) {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, size:%d, stop mulit-querys", httpDebug("context:%p, fd:%d, user:%s, process pos:%d, size:%d, stop mulit-querys", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, multiCmds->size); pContext->user, multiCmds->pos, multiCmds->size);
if (encode->cleanJsonFp) { if (encode->cleanJsonFp) {
(encode->cleanJsonFp)(pContext); (encode->cleanJsonFp)(pContext);
} }
...@@ -182,8 +182,8 @@ void httpProcessMultiSql(HttpContext *pContext) { ...@@ -182,8 +182,8 @@ void httpProcessMultiSql(HttpContext *pContext) {
HttpSqlCmd *cmd = multiCmds->cmds + multiCmds->pos; HttpSqlCmd *cmd = multiCmds->cmds + multiCmds->pos;
char *sql = httpGetCmdsString(pContext, cmd->sql); char *sql = httpGetCmdsString(pContext, cmd->sql);
httpTraceL("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, start query, sql:%s", pContext, pContext->fd, httpTraceL("context:%p, fd:%d, user:%s, process pos:%d, start query, sql:%s", pContext, pContext->fd, pContext->user,
pContext->ipstr, pContext->user, multiCmds->pos, sql); multiCmds->pos, sql);
taosNotePrintHttp(sql); taosNotePrintHttp(sql);
taos_query_a(pContext->session->taos, sql, httpProcessMultiSqlCallBack, (void *)pContext); taos_query_a(pContext->session->taos, sql, httpProcessMultiSqlCallBack, (void *)pContext);
} }
...@@ -197,8 +197,8 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) { ...@@ -197,8 +197,8 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) {
return; return;
} }
httpDebug("context:%p, fd:%d, ip:%s, user:%s, start multi-querys pos:%d, size:%d", pContext, pContext->fd, httpDebug("context:%p, fd:%d, user:%s, start multi-querys pos:%d, size:%d", pContext, pContext->fd, pContext->user,
pContext->ipstr, pContext->user, multiCmds->pos, multiCmds->size); multiCmds->pos, multiCmds->size);
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
if (encode->initJsonFp) { if (encode->initJsonFp) {
(encode->initJsonFp)(pContext); (encode->initJsonFp)(pContext);
...@@ -226,24 +226,23 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int ...@@ -226,24 +226,23 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int
#if 0 #if 0
// todo refactor // todo refactor
if (tscResultsetFetchCompleted(result)) { if (tscResultsetFetchCompleted(result)) {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, resultset fetch completed", pContext, pContext->fd, pContext->ipstr, httpDebug("context:%p, fd:%d, user:%s, resultset fetch completed", pContext, pContext->fd, pContext->user);
pContext->user);
isContinue = false; isContinue = false;
} }
#endif #endif
if (isContinue) { if (isContinue) {
// retrieve next batch of rows // retrieve next batch of rows
httpDebug("context:%p, fd:%d, ip:%s, user:%s, continue retrieve, numOfRows:%d", pContext, pContext->fd, httpDebug("context:%p, fd:%d, user:%s, continue retrieve, numOfRows:%d", pContext, pContext->fd, pContext->user,
pContext->ipstr, pContext->user, numOfRows); numOfRows);
taos_fetch_rows_a(result, httpProcessSingleSqlRetrieveCallBack, param); taos_fetch_rows_a(result, httpProcessSingleSqlRetrieveCallBack, param);
} else { } else {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, stop retrieve, numOfRows:%d", pContext, pContext->fd, pContext->ipstr, httpDebug("context:%p, fd:%d, user:%s, stop retrieve, numOfRows:%d", pContext, pContext->fd, pContext->user,
pContext->user, numOfRows); numOfRows);
if (numOfRows < 0) { if (numOfRows < 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->user,
pContext->user, tstrerror(numOfRows)); tstrerror(numOfRows));
} }
taos_free_result(result); taos_free_result(result);
...@@ -269,20 +268,20 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo ...@@ -269,20 +268,20 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
httpError("context:%p, fd:%d, ip:%s, user:%s, query error, taos:%p, code:%s:inprogress, sqlObj:%p", httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s:inprogress, sqlObj:%p", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session->taos, tstrerror(code), (SSqlObj *)result); pContext->user, pContext->session->taos, tstrerror(code), (SSqlObj *)result);
return; return;
} }
if (code < 0) { if (code < 0) {
SSqlObj *pObj = (SSqlObj *)result; SSqlObj *pObj = (SSqlObj *)result;
if (code == TSDB_CODE_TSC_INVALID_SQL) { if (code == TSDB_CODE_TSC_INVALID_SQL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, query error, taos:%p, code:%s, sqlObj:%p, error:%s", httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s, sqlObj:%p, error:%s", pContext,
pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session->taos, tstrerror(code), pObj, pObj->cmd.payload); pContext->fd, pContext->user, pContext->session->taos, tstrerror(code), pObj, pObj->cmd.payload);
httpSendTaosdInvalidSqlErrorResp(pContext, pObj->cmd.payload); httpSendTaosdInvalidSqlErrorResp(pContext, pObj->cmd.payload);
} else { } else {
httpError("context:%p, fd:%d, ip:%s, user:%s, query error, taos:%p, code:%s, sqlObj:%p", httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s, sqlObj:%p", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session->taos, tstrerror(code), pObj); pContext->user, pContext->session->taos, tstrerror(code), pObj);
httpSendTaosdErrorResp(pContext, code); httpSendTaosdErrorResp(pContext, code);
} }
taos_free_result(result); taos_free_result(result);
...@@ -294,8 +293,8 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo ...@@ -294,8 +293,8 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo
// not select or show commands // not select or show commands
int affectRows = taos_affected_rows(result); int affectRows = taos_affected_rows(result);
httpDebug("context:%p, fd:%d, ip:%s, user:%s, affect rows:%d, stop query, sqlObj:%p", httpDebug("context:%p, fd:%d, user:%s, affect rows:%d, stop query, sqlObj:%p", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user, affectRows, result); pContext->user, affectRows, result);
if (encode->startJsonFp) { if (encode->startJsonFp) {
(encode->startJsonFp)(pContext, &pContext->singleCmd, result); (encode->startJsonFp)(pContext, &pContext->singleCmd, result);
...@@ -312,8 +311,7 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo ...@@ -312,8 +311,7 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo
taos_free_result(result); taos_free_result(result);
httpCloseContextByApp(pContext); httpCloseContextByApp(pContext);
} else { } else {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, start retrieve", pContext, pContext->fd, pContext->ipstr, httpDebug("context:%p, fd:%d, user:%s, start retrieve", pContext, pContext->fd, pContext->user);
pContext->user);
if (encode->startJsonFp) { if (encode->startJsonFp) {
(encode->startJsonFp)(pContext, &pContext->singleCmd, result); (encode->startJsonFp)(pContext, &pContext->singleCmd, result);
...@@ -333,14 +331,12 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) { ...@@ -333,14 +331,12 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) {
HttpSession *pSession = pContext->session; HttpSession *pSession = pContext->session;
if (sql == NULL || sql[0] == 0) { if (sql == NULL || sql[0] == 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, error:no sql input", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, user:%s, error:no sql input", pContext, pContext->fd, pContext->user);
pContext->user);
httpSendErrorResp(pContext, HTTP_NO_SQL_INPUT); httpSendErrorResp(pContext, HTTP_NO_SQL_INPUT);
return; return;
} }
httpTraceL("context:%p, fd:%d, ip:%s, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->ipstr, httpTraceL("context:%p, fd:%d, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->user, sql);
pContext->user, sql);
taosNotePrintHttp(sql); taosNotePrintHttp(sql);
taos_query_a(pSession->taos, sql, httpProcessSingleSqlCallBack, (void *)pContext); taos_query_a(pSession->taos, sql, httpProcessSingleSqlCallBack, (void *)pContext);
} }
...@@ -350,8 +346,8 @@ void httpProcessLoginCmd(HttpContext *pContext) { ...@@ -350,8 +346,8 @@ void httpProcessLoginCmd(HttpContext *pContext) {
if (!httpGenTaosdAuthToken(pContext, token, 128)) { if (!httpGenTaosdAuthToken(pContext, token, 128)) {
httpSendErrorResp(pContext, HTTP_GEN_TAOSD_TOKEN_ERR); httpSendErrorResp(pContext, HTTP_GEN_TAOSD_TOKEN_ERR);
} else { } else {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, login via http, return token:%s", httpDebug("context:%p, fd:%d, user:%s, login via http, return token:%s", pContext, pContext->fd, pContext->user,
pContext, pContext->fd, pContext->ipstr, pContext->user, token); token);
httpSendSuccResp(pContext, token); httpSendSuccResp(pContext, token);
} }
} }
...@@ -397,17 +393,16 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int code) { ...@@ -397,17 +393,16 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int code) {
if (pContext == NULL) return; if (pContext == NULL) return;
if (code < 0) { if (code < 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, login error, code:%s", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, user:%s, login error, code:%s", pContext, pContext->fd, pContext->user,
pContext->user, tstrerror(code)); tstrerror(code));
httpSendTaosdErrorResp(pContext, code); httpSendTaosdErrorResp(pContext, code);
return; return;
} }
httpDebug("context:%p, fd:%d, ip:%s, user:%s, connect tdengine success, taos:%p", pContext, pContext->fd, httpDebug("context:%p, fd:%d, user:%s, connect tdengine success, taos:%p", pContext, pContext->fd, pContext->user,
pContext->ipstr, pContext->user, pContext->taos); pContext->taos);
if (pContext->taos == NULL) { if (pContext->taos == NULL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, login error, taos is empty", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, user:%s, login error, taos is empty", pContext, pContext->fd, pContext->user);
pContext->user);
httpSendErrorResp(pContext, HTTP_NO_ENOUGH_SESSIONS); httpSendErrorResp(pContext, HTTP_NO_ENOUGH_SESSIONS);
return; return;
} }
...@@ -428,8 +423,8 @@ void httpProcessRequest(HttpContext *pContext) { ...@@ -428,8 +423,8 @@ void httpProcessRequest(HttpContext *pContext) {
if (pContext->session == NULL || pContext->reqType == HTTP_REQTYPE_LOGIN) { if (pContext->session == NULL || pContext->reqType == HTTP_REQTYPE_LOGIN) {
taos_connect_a(NULL, pContext->user, pContext->pass, "", 0, httpProcessRequestCb, (void *)pContext, taos_connect_a(NULL, pContext->user, pContext->pass, "", 0, httpProcessRequestCb, (void *)pContext,
&(pContext->taos)); &(pContext->taos));
httpDebug("context:%p, fd:%d, ip:%s, user:%s, try connect tdengine, taos:%p", pContext, pContext->fd, httpDebug("context:%p, fd:%d, user:%s, try connect tdengine, taos:%p", pContext, pContext->fd, pContext->user,
pContext->ipstr, pContext->user, pContext->taos); pContext->taos);
} else { } else {
httpExecCmd(pContext); httpExecCmd(pContext);
} }
......
...@@ -40,12 +40,6 @@ HttpServer tsHttpServer; ...@@ -40,12 +40,6 @@ HttpServer tsHttpServer;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
int httpInitSystem() { int httpInitSystem() {
tsHttpServer.fallback = 0;
const char *v = getenv("FALLBACK");
if (v) {
tsHttpServer.fallback = 1;
}
strcpy(tsHttpServer.label, "rest"); strcpy(tsHttpServer.label, "rest");
tsHttpServer.serverIp = 0; tsHttpServer.serverIp = 0;
tsHttpServer.serverPort = tsHttpPort; tsHttpServer.serverPort = tsHttpPort;
......
...@@ -141,16 +141,15 @@ int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int mallocSize) { ...@@ -141,16 +141,15 @@ int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int mallocSize) {
bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize) { bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize) {
if (cmdSize > HTTP_MAX_CMD_SIZE) { if (cmdSize > HTTP_MAX_CMD_SIZE) {
httpError("context:%p, fd:%d, ip:%s, user:%s, mulitcmd size:%d large then %d", pContext, pContext->fd, httpError("context:%p, fd:%d, user:%s, mulitcmd size:%d large then %d", pContext, pContext->fd, pContext->user,
pContext->ipstr, pContext->user, cmdSize, HTTP_MAX_CMD_SIZE); cmdSize, HTTP_MAX_CMD_SIZE);
return false; return false;
} }
if (pContext->multiCmds == NULL) { if (pContext->multiCmds == NULL) {
pContext->multiCmds = (HttpSqlCmds *)malloc(sizeof(HttpSqlCmds)); pContext->multiCmds = (HttpSqlCmds *)malloc(sizeof(HttpSqlCmds));
if (pContext->multiCmds == NULL) { if (pContext->multiCmds == NULL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, malloc multiCmds error", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, user:%s, malloc multiCmds error", pContext, pContext->fd, pContext->user);
pContext->user);
return false; return false;
} }
memset(pContext->multiCmds, 0, sizeof(HttpSqlCmds)); memset(pContext->multiCmds, 0, sizeof(HttpSqlCmds));
...@@ -161,7 +160,7 @@ bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize) { ...@@ -161,7 +160,7 @@ bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize) {
free(multiCmds->cmds); free(multiCmds->cmds);
multiCmds->cmds = (HttpSqlCmd *)malloc((size_t)cmdSize * sizeof(HttpSqlCmd)); multiCmds->cmds = (HttpSqlCmd *)malloc((size_t)cmdSize * sizeof(HttpSqlCmd));
if (multiCmds->cmds == NULL) { if (multiCmds->cmds == NULL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, malloc cmds:%d error", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, user:%s, malloc cmds:%d error", pContext, pContext->fd,
pContext->user, cmdSize); pContext->user, cmdSize);
return false; return false;
} }
...@@ -172,8 +171,8 @@ bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize) { ...@@ -172,8 +171,8 @@ bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize) {
free(multiCmds->buffer); free(multiCmds->buffer);
multiCmds->buffer = (char *)malloc((size_t)bufferSize); multiCmds->buffer = (char *)malloc((size_t)bufferSize);
if (multiCmds->buffer == NULL) { if (multiCmds->buffer == NULL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, malloc buffer:%d error", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, user:%s, malloc buffer:%d error", pContext, pContext->fd, pContext->user,
pContext->user, bufferSize); bufferSize);
return false; return false;
} }
multiCmds->bufferSize = bufferSize; multiCmds->bufferSize = bufferSize;
...@@ -191,15 +190,14 @@ bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize) { ...@@ -191,15 +190,14 @@ bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize) {
HttpSqlCmds *multiCmds = pContext->multiCmds; HttpSqlCmds *multiCmds = pContext->multiCmds;
if (cmdSize > HTTP_MAX_CMD_SIZE) { if (cmdSize > HTTP_MAX_CMD_SIZE) {
httpError("context:%p, fd:%d, ip:%s, user:%s, mulitcmd size:%d large then %d", pContext, pContext->fd, httpError("context:%p, fd:%d, user:%s, mulitcmd size:%d large then %d", pContext, pContext->fd, pContext->user,
pContext->ipstr, pContext->user, cmdSize, HTTP_MAX_CMD_SIZE); cmdSize, HTTP_MAX_CMD_SIZE);
return false; return false;
} }
multiCmds->cmds = (HttpSqlCmd *)realloc(multiCmds->cmds, (size_t)cmdSize * sizeof(HttpSqlCmd)); multiCmds->cmds = (HttpSqlCmd *)realloc(multiCmds->cmds, (size_t)cmdSize * sizeof(HttpSqlCmd));
if (multiCmds->cmds == NULL) { if (multiCmds->cmds == NULL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, malloc cmds:%d error", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, user:%s, malloc cmds:%d error", pContext, pContext->fd, pContext->user, cmdSize);
pContext->user, cmdSize);
return false; return false;
} }
memset(multiCmds->cmds + multiCmds->maxSize, 0, (size_t)(cmdSize - multiCmds->maxSize) * sizeof(HttpSqlCmd)); memset(multiCmds->cmds + multiCmds->maxSize, 0, (size_t)(cmdSize - multiCmds->maxSize) * sizeof(HttpSqlCmd));
...@@ -212,15 +210,14 @@ bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int bufferSize) { ...@@ -212,15 +210,14 @@ bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int bufferSize) {
HttpSqlCmds *multiCmds = pContext->multiCmds; HttpSqlCmds *multiCmds = pContext->multiCmds;
if (bufferSize > HTTP_MAX_BUFFER_SIZE) { if (bufferSize > HTTP_MAX_BUFFER_SIZE) {
httpError("context:%p, fd:%d, ip:%s, user:%s, mulitcmd buffer size:%d large then %d", httpError("context:%p, fd:%d, user:%s, mulitcmd buffer size:%d large then %d", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->user, bufferSize, HTTP_MAX_BUFFER_SIZE); pContext->user, bufferSize, HTTP_MAX_BUFFER_SIZE);
return false; return false;
} }
multiCmds->buffer = (char *)realloc(multiCmds->buffer, (size_t)bufferSize); multiCmds->buffer = (char *)realloc(multiCmds->buffer, (size_t)bufferSize);
if (multiCmds->buffer == NULL) { if (multiCmds->buffer == NULL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, malloc buffer:%d error", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, user:%s, malloc buffer:%d error", pContext, pContext->fd, pContext->user, bufferSize);
pContext->user, bufferSize);
return false; return false;
} }
memset(multiCmds->buffer + multiCmds->bufferSize, 0, (size_t)(bufferSize - multiCmds->bufferSize)); memset(multiCmds->buffer + multiCmds->bufferSize, 0, (size_t)(bufferSize - multiCmds->bufferSize));
......
...@@ -80,15 +80,13 @@ bool restGetPassFromUrl(HttpContext* pContext) { ...@@ -80,15 +80,13 @@ bool restGetPassFromUrl(HttpContext* pContext) {
} }
bool restProcessLoginRequest(HttpContext* pContext) { bool restProcessLoginRequest(HttpContext* pContext) {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process restful login msg", pContext, pContext->fd, pContext->ipstr, httpDebug("context:%p, fd:%d, user:%s, process restful login msg", pContext, pContext->fd, pContext->user);
pContext->user);
pContext->reqType = HTTP_REQTYPE_LOGIN; pContext->reqType = HTTP_REQTYPE_LOGIN;
return true; return true;
} }
bool restProcessSqlRequest(HttpContext* pContext, int timestampFmt) { bool restProcessSqlRequest(HttpContext* pContext, int timestampFmt) {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process restful sql msg", pContext, pContext->fd, pContext->ipstr, httpDebug("context:%p, fd:%d, user:%s, process restful sql msg", pContext, pContext->fd, pContext->user);
pContext->user);
char* sql = pContext->parser.data.pos; char* sql = pContext->parser.data.pos;
if (sql == NULL) { if (sql == NULL) {
......
...@@ -155,19 +155,17 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -155,19 +155,17 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
} }
if (cmd->numOfRows >= tsRestRowLimit) { if (cmd->numOfRows >= tsRestRowLimit) {
httpDebug("context:%p, fd:%d, ip:%s, user:%s, retrieve rows:%d larger than limit:%d, abort retrieve", pContext, httpDebug("context:%p, fd:%d, user:%s, retrieve rows:%d larger than limit:%d, abort retrieve", pContext,
pContext->fd, pContext->ipstr, pContext->user, cmd->numOfRows, tsRestRowLimit); pContext->fd, pContext->user, cmd->numOfRows, tsRestRowLimit);
return false; return false;
} } else {
else {
if (pContext->fd <= 0) { if (pContext->fd <= 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, connection is closed, abort retrieve", pContext, pContext->fd, httpError("context:%p, fd:%d, user:%s, connection is closed, abort retrieve", pContext, pContext->fd,
pContext->ipstr, pContext->user); pContext->user);
return false; return false;
} } else {
else { httpDebug("context:%p, fd:%d, user:%s, total rows:%d retrieved", pContext, pContext->fd, pContext->user,
httpDebug("context:%p, fd:%d, ip:%s, user:%s, total rows:%d retrieved", pContext, pContext->fd, pContext->ipstr, cmd->numOfRows);
pContext->user, cmd->numOfRows);
return true; return true;
} }
} }
......
...@@ -800,7 +800,7 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) { ...@@ -800,7 +800,7 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) {
} }
*/ */
bool tgProcessQueryRequest(HttpContext *pContext, char *db) { bool tgProcessQueryRequest(HttpContext *pContext, char *db) {
httpDebug("context:%p, fd:%d, ip:%s, process telegraf query msg", pContext, pContext->fd, pContext->ipstr); httpDebug("context:%p, fd:%d, process telegraf query msg", pContext, pContext->fd);
HttpParser *pParser = &pContext->parser; HttpParser *pParser = &pContext->parser;
char * filter = pParser->data.pos; char * filter = pParser->data.pos;
...@@ -818,7 +818,7 @@ bool tgProcessQueryRequest(HttpContext *pContext, char *db) { ...@@ -818,7 +818,7 @@ bool tgProcessQueryRequest(HttpContext *pContext, char *db) {
cJSON *metrics = cJSON_GetObjectItem(root, "metrics"); cJSON *metrics = cJSON_GetObjectItem(root, "metrics");
if (metrics != NULL) { if (metrics != NULL) {
int size = cJSON_GetArraySize(metrics); int size = cJSON_GetArraySize(metrics);
httpDebug("context:%p, fd:%d, ip:%s, multiple metrics:%d at one time", pContext, pContext->fd, pContext->ipstr, httpDebug("context:%p, fd:%d, multiple metrics:%d at one time", pContext, pContext->fd,
size); size);
if (size <= 0) { if (size <= 0) {
httpSendErrorResp(pContext, HTTP_TG_METRICS_NULL); httpSendErrorResp(pContext, HTTP_TG_METRICS_NULL);
...@@ -859,7 +859,7 @@ bool tgProcessQueryRequest(HttpContext *pContext, char *db) { ...@@ -859,7 +859,7 @@ bool tgProcessQueryRequest(HttpContext *pContext, char *db) {
} }
} }
} else { } else {
httpDebug("context:%p, fd:%d, ip:%s, single metric", pContext, pContext->fd, pContext->ipstr); httpDebug("context:%p, fd:%d, single metric", pContext, pContext->fd);
if (!httpMallocMultiCmds(pContext, 3, HTTP_BUFFER_SIZE)) { if (!httpMallocMultiCmds(pContext, 3, HTTP_BUFFER_SIZE)) {
httpSendErrorResp(pContext, HTTP_NO_ENOUGH_MEMORY); httpSendErrorResp(pContext, HTTP_NO_ENOUGH_MEMORY);
......
...@@ -98,8 +98,8 @@ void tgBuildSqlAffectRowsJson(HttpContext *pContext, HttpSqlCmd *cmd, int affect ...@@ -98,8 +98,8 @@ void tgBuildSqlAffectRowsJson(HttpContext *pContext, HttpSqlCmd *cmd, int affect
bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) { bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) {
HttpSqlCmds *multiCmds = pContext->multiCmds; HttpSqlCmds *multiCmds = pContext->multiCmds;
httpDebug("context:%p, fd:%d, ip:%s, check telegraf command, code:%s, state:%d, type:%d, rettype:%d, tags:%d", httpDebug("context:%p, fd:%d, check telegraf command, code:%s, state:%d, type:%d, rettype:%d, tags:%d", pContext,
pContext, pContext->fd, pContext->ipstr, tstrerror(code), cmd->cmdState, cmd->cmdType, cmd->cmdReturnType, cmd->tagNum); pContext->fd, tstrerror(code), cmd->cmdState, cmd->cmdType, cmd->cmdReturnType, cmd->tagNum);
if (cmd->cmdType == HTTP_CMD_TYPE_INSERT) { if (cmd->cmdType == HTTP_CMD_TYPE_INSERT) {
if (cmd->cmdState == HTTP_CMD_STATE_NOT_RUN_YET) { if (cmd->cmdState == HTTP_CMD_STATE_NOT_RUN_YET) {
...@@ -107,16 +107,14 @@ bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) { ...@@ -107,16 +107,14 @@ bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) {
cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED; cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED;
if (multiCmds->cmds[0].cmdState == HTTP_CMD_STATE_NOT_RUN_YET) { if (multiCmds->cmds[0].cmdState == HTTP_CMD_STATE_NOT_RUN_YET) {
multiCmds->pos = (int16_t)-1; multiCmds->pos = (int16_t)-1;
httpDebug("context:%p, fd:%d, ip:%s, import failed, try create database", pContext, pContext->fd, httpDebug("context:%p, fd:%d, import failed, try create database", pContext, pContext->fd);
pContext->ipstr);
return false; return false;
} }
} else if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { } else if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED; cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED;
if (multiCmds->cmds[multiCmds->pos - 1].cmdState == HTTP_CMD_STATE_NOT_RUN_YET) { if (multiCmds->cmds[multiCmds->pos - 1].cmdState == HTTP_CMD_STATE_NOT_RUN_YET) {
multiCmds->pos = (int16_t)(multiCmds->pos - 2); multiCmds->pos = (int16_t)(multiCmds->pos - 2);
httpDebug("context:%p, fd:%d, ip:%s, import failed, try create stable", pContext, pContext->fd, httpDebug("context:%p, fd:%d, import failed, try create stable", pContext, pContext->fd);
pContext->ipstr);
return false; return false;
} }
} else { } else {
...@@ -125,11 +123,10 @@ bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) { ...@@ -125,11 +123,10 @@ bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) {
} }
} else if (cmd->cmdType == HTTP_CMD_TYPE_CREATE_DB) { } else if (cmd->cmdType == HTTP_CMD_TYPE_CREATE_DB) {
cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED; cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED;
httpDebug("context:%p, fd:%d, ip:%s, code:%s, create database failed", pContext, pContext->fd, pContext->ipstr, httpDebug("context:%p, fd:%d, code:%s, create database failed", pContext, pContext->fd, tstrerror(code));
tstrerror(code));
} else if (cmd->cmdType == HTTP_CMD_TYPE_CREATE_STBALE) { } else if (cmd->cmdType == HTTP_CMD_TYPE_CREATE_STBALE) {
cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED; cmd->cmdState = HTTP_CMD_STATE_RUN_FINISHED;
httpDebug("context:%p, fd:%d, ip:%s, code:%s, create stable failed", pContext, pContext->fd, pContext->ipstr, tstrerror(code)); httpDebug("context:%p, fd:%d, code:%s, create stable failed", pContext, pContext->fd, tstrerror(code));
} else { } else {
} }
...@@ -138,9 +135,9 @@ bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) { ...@@ -138,9 +135,9 @@ bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) {
void tgSetNextCmd(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) { void tgSetNextCmd(struct HttpContext *pContext, HttpSqlCmd *cmd, int code) {
HttpSqlCmds *multiCmds = pContext->multiCmds; HttpSqlCmds *multiCmds = pContext->multiCmds;
httpDebug("context:%p, fd:%d, ip:%s, get telegraf next command, pos:%d, code:%s, state:%d, type:%d, rettype:%d, tags:%d", httpDebug("context:%p, fd:%d, get telegraf next command, pos:%d, code:%s, state:%d, type:%d, rettype:%d, tags:%d",
pContext, pContext->fd, pContext->ipstr, multiCmds->pos, tstrerror(code), cmd->cmdState, cmd->cmdType, pContext, pContext->fd, multiCmds->pos, tstrerror(code), cmd->cmdState, cmd->cmdType, cmd->cmdReturnType,
cmd->cmdReturnType, cmd->tagNum); cmd->tagNum);
if (cmd->cmdType == HTTP_CMD_TYPE_INSERT) { if (cmd->cmdType == HTTP_CMD_TYPE_INSERT) {
multiCmds->pos = (int16_t)(multiCmds->pos + 2); multiCmds->pos = (int16_t)(multiCmds->pos + 2);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册