提交 fa7e9bd6 编写于 作者: S Shengliang Guan

TD-1207

上级 2079533b
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
int32_t httpParseBasicAuthToken(HttpContext *pContext, char *token, int32_t len) { int32_t httpParseBasicAuthToken(HttpContext *pContext, char *token, int32_t len) {
token[len] = '\0'; token[len] = '\0';
int32_t outlen = 0; int32_t outlen = 0;
char *base64 = (char *)base64_decode(token, len, &outlen); char * base64 = (char *)base64_decode(token, len, &outlen);
if (base64 == NULL || outlen == 0) { if (base64 == NULL || outlen == 0) {
httpError("context:%p, fd:%d, basic token:%s parsed error", pContext, pContext->fd, token); httpError("context:%p, fd:%d, basic token:%s parsed error", pContext, pContext->fd, token);
free(base64); free(base64);
...@@ -49,7 +49,7 @@ int32_t httpParseBasicAuthToken(HttpContext *pContext, char *token, int32_t len) ...@@ -49,7 +49,7 @@ int32_t httpParseBasicAuthToken(HttpContext *pContext, char *token, int32_t len)
strncpy(pContext->user, base64, (size_t)user_len); strncpy(pContext->user, base64, (size_t)user_len);
pContext->user[user_len] = 0; pContext->user[user_len] = 0;
char *password = user + 1; char * password = user + 1;
int32_t pass_len = (int32_t)((base64 + outlen) - password); int32_t pass_len = (int32_t)((base64 + outlen) - password);
if (pass_len < 1 || pass_len >= HTTP_PASSWORD_LEN) { if (pass_len < 1 || pass_len >= HTTP_PASSWORD_LEN) {
httpError("context:%p, fd:%d, basic token:%s parse password error", pContext, pContext->fd, token); httpError("context:%p, fd:%d, basic token:%s parse password error", pContext, pContext->fd, token);
...@@ -66,7 +66,7 @@ int32_t httpParseBasicAuthToken(HttpContext *pContext, char *token, int32_t len) ...@@ -66,7 +66,7 @@ int32_t httpParseBasicAuthToken(HttpContext *pContext, char *token, int32_t len)
int32_t httpParseTaosdAuthToken(HttpContext *pContext, char *token, int32_t len) { int32_t httpParseTaosdAuthToken(HttpContext *pContext, char *token, int32_t len) {
token[len] = '\0'; token[len] = '\0';
int32_t outlen = 0; int32_t outlen = 0;
unsigned char *base64 = base64_decode(token, len, &outlen); unsigned char *base64 = base64_decode(token, len, &outlen);
if (base64 == NULL || outlen == 0) { if (base64 == NULL || outlen == 0) {
httpError("context:%p, fd:%d, taosd token:%s parsed error", pContext, pContext->fd, token); httpError("context:%p, fd:%d, taosd token:%s parsed error", pContext, pContext->fd, token);
...@@ -97,7 +97,7 @@ int32_t httpParseTaosdAuthToken(HttpContext *pContext, char *token, int32_t len) ...@@ -97,7 +97,7 @@ int32_t httpParseTaosdAuthToken(HttpContext *pContext, char *token, int32_t len)
} }
int32_t httpGenTaosdAuthToken(HttpContext *pContext, char *token, int32_t maxLen) { int32_t httpGenTaosdAuthToken(HttpContext *pContext, char *token, int32_t maxLen) {
char buffer[sizeof(pContext->user) + sizeof(pContext->pass)] = {0}; char buffer[sizeof(pContext->user) + sizeof(pContext->pass)] = {0};
size_t size = sizeof(pContext->user); size_t size = sizeof(pContext->user);
tstrncpy(buffer, pContext->user, size); tstrncpy(buffer, pContext->user, size);
size = sizeof(pContext->pass); size = sizeof(pContext->pass);
......
...@@ -48,7 +48,7 @@ static void httpDestroyContext(void *data) { ...@@ -48,7 +48,7 @@ static void httpDestroyContext(void *data) {
httpRemoveContextFromEpoll(pContext); httpRemoveContextFromEpoll(pContext);
httpReleaseSession(pContext); httpReleaseSession(pContext);
atomic_sub_fetch_32(&pThread->numOfContexts, 1); atomic_sub_fetch_32(&pThread->numOfContexts, 1);
httpDebug("context:%p, is destroyed, refCount:%d data:%p thread:%s numOfContexts:%d", pContext, pContext->refCount, httpDebug("context:%p, is destroyed, refCount:%d data:%p thread:%s numOfContexts:%d", pContext, pContext->refCount,
data, pContext->pThread->label, pContext->pThread->numOfContexts); data, pContext->pThread->label, pContext->pThread->numOfContexts);
pContext->pThread = 0; pContext->pThread = 0;
...@@ -100,9 +100,7 @@ const char *httpContextStateStr(HttpContextState state) { ...@@ -100,9 +100,7 @@ const char *httpContextStateStr(HttpContextState state) {
} }
} }
void httpNotifyContextClose(HttpContext *pContext) { void httpNotifyContextClose(HttpContext *pContext) { shutdown(pContext->fd, SHUT_WR); }
shutdown(pContext->fd, SHUT_WR);
}
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) { bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) {
return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState); return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState);
...@@ -123,8 +121,8 @@ HttpContext *httpCreateContext(int32_t fd) { ...@@ -123,8 +121,8 @@ HttpContext *httpCreateContext(int32_t fd) {
pContext->ppContext = ppContext; pContext->ppContext = ppContext;
httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext); httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext);
// set the ref to 0 // set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false); taosCacheRelease(tsHttpServer.contextCache, (void **)&ppContext, false);
return pContext; return pContext;
} }
...@@ -174,7 +172,6 @@ bool httpInitContext(HttpContext *pContext) { ...@@ -174,7 +172,6 @@ bool httpInitContext(HttpContext *pContext) {
pContext->encodeMethod = NULL; pContext->encodeMethod = NULL;
memset(&pContext->singleCmd, 0, sizeof(HttpSqlCmd)); memset(&pContext->singleCmd, 0, sizeof(HttpSqlCmd));
httpTrace("context:%p, fd:%d, parsed:%d", pContext, pContext->fd, pContext->parsed); httpTrace("context:%p, fd:%d, parsed:%d", pContext, pContext->fd, pContext->parsed);
return true; return true;
} }
......
...@@ -192,7 +192,7 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -192,7 +192,7 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
break; break;
} }
cJSON* alias = cJSON_GetObjectItem(query, "alias"); cJSON* alias = cJSON_GetObjectItem(query, "alias");
int32_t aliasBuffer = -1; int32_t aliasBuffer = -1;
if (!(alias == NULL || alias->valuestring == NULL || strlen(alias->valuestring) == 0)) { if (!(alias == NULL || alias->valuestring == NULL || strlen(alias->valuestring) == 0)) {
aliasBuffer = httpAddToSqlCmdBuffer(pContext, alias->valuestring); aliasBuffer = httpAddToSqlCmdBuffer(pContext, alias->valuestring);
......
...@@ -86,7 +86,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -86,7 +86,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
JsonBuf *jsonBuf = httpMallocJsonBuf(pContext); JsonBuf *jsonBuf = httpMallocJsonBuf(pContext);
if (jsonBuf == NULL) return false; if (jsonBuf == NULL) return false;
int32_t num_fields = taos_num_fields(result); int32_t num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result); TAOS_FIELD *fields = taos_fetch_fields(result);
if (num_fields == 0) { if (num_fields == 0) {
return false; return false;
...@@ -101,7 +101,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -101,7 +101,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
// such as select count(*) count(*) from sys.cpu group by ipaddr interval(1d) // such as select count(*) count(*) from sys.cpu group by ipaddr interval(1d)
int32_t dataFields = -1; int32_t dataFields = -1;
int32_t groupFields = -1; int32_t groupFields = -1;
bool hasTimestamp = fields[0].type == TSDB_DATA_TYPE_TIMESTAMP; bool hasTimestamp = fields[0].type == TSDB_DATA_TYPE_TIMESTAMP;
if (hasTimestamp) { if (hasTimestamp) {
dataFields = 1; dataFields = 1;
if (num_fields > 2) groupFields = num_fields - 1; if (num_fields > 2) groupFields = num_fields - 1;
...@@ -125,15 +125,15 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -125,15 +125,15 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
cmd->numOfRows--; cmd->numOfRows--;
continue; continue;
} }
int32_t* length = taos_fetch_lengths(result); int32_t *length = taos_fetch_lengths(result);
// for group by // for group by
if (groupFields != -1) { if (groupFields != -1) {
char target[HTTP_GC_TARGET_SIZE] = {0}; char target[HTTP_GC_TARGET_SIZE] = {0};
int32_t len; int32_t len;
len = snprintf(target,HTTP_GC_TARGET_SIZE,"%s{",aliasBuffer); len = snprintf(target, HTTP_GC_TARGET_SIZE, "%s{", aliasBuffer);
for (int32_t i = dataFields + 1; i<num_fields; i++){ for (int32_t i = dataFields + 1; i < num_fields; i++) {
switch (fields[i].type) { switch (fields[i].type) {
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%d", fields[i].name, *((int8_t *)row[i])); len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%d", fields[i].name, *((int8_t *)row[i]));
...@@ -155,9 +155,9 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -155,9 +155,9 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
if (row[i]!= NULL){ if (row[i] != NULL) {
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:", fields[i].name); len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:", fields[i].name);
memcpy(target + len, (char *) row[i], length[i]); memcpy(target + len, (char *)row[i], length[i]);
len = (int32_t)strlen(target); len = (int32_t)strlen(target);
} }
break; break;
...@@ -165,10 +165,9 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -165,10 +165,9 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%s", fields[i].name, "-"); len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%s", fields[i].name, "-");
break; break;
} }
if(i < num_fields - 1 ){ if (i < num_fields - 1) {
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, ", "); len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, ", ");
} }
} }
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "}"); len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "}");
...@@ -217,10 +216,10 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -217,10 +216,10 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
httpJsonStringForTransMean(jsonBuf, (char*)row[i], fields[i].bytes); httpJsonStringForTransMean(jsonBuf, (char *)row[i], fields[i].bytes);
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
if (precision == TSDB_TIME_PRECISION_MILLI) { //ms if (precision == TSDB_TIME_PRECISION_MILLI) { // ms
httpJsonInt64(jsonBuf, *((int64_t *)row[i])); httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
} else { } else {
httpJsonInt64(jsonBuf, *((int64_t *)row[i]) / 1000); httpJsonInt64(jsonBuf, *((int64_t *)row[i]) / 1000);
......
...@@ -25,25 +25,24 @@ typedef enum { ...@@ -25,25 +25,24 @@ typedef enum {
} EHTTP_GZIP_STATE; } EHTTP_GZIP_STATE;
struct ehttp_gzip_s { struct ehttp_gzip_s {
ehttp_gzip_conf_t conf; ehttp_gzip_conf_t conf;
ehttp_gzip_callbacks_t callbacks; ehttp_gzip_callbacks_t callbacks;
void *arg; void * arg;
z_stream *gzip; z_stream * gzip;
gz_header *header; gz_header * header;
char *chunk; char * chunk;
int32_t state;
int32_t state;
}; };
static void dummy_on_data(ehttp_gzip_t *gzip, void *arg, const char *buf, int32_t len) { static void dummy_on_data(ehttp_gzip_t *gzip, void *arg, const char *buf, int32_t len) {}
}
static void ehttp_gzip_cleanup(ehttp_gzip_t *gzip) { static void ehttp_gzip_cleanup(ehttp_gzip_t *gzip) {
switch(gzip->state) { switch (gzip->state) {
case EHTTP_GZIP_READY: { case EHTTP_GZIP_READY: {
inflateEnd(gzip->gzip); inflateEnd(gzip->gzip);
} break; } break;
default: break; default:
break;
} }
if (gzip->gzip) { if (gzip->gzip) {
free(gzip->gzip); free(gzip->gzip);
...@@ -60,43 +59,43 @@ static void ehttp_gzip_cleanup(ehttp_gzip_t *gzip) { ...@@ -60,43 +59,43 @@ static void ehttp_gzip_cleanup(ehttp_gzip_t *gzip) {
gzip->state = EHTTP_GZIP_CLOSED; gzip->state = EHTTP_GZIP_CLOSED;
} }
ehttp_gzip_t* ehttp_gzip_create_decompressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg) { ehttp_gzip_t *ehttp_gzip_create_decompressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg) {
ehttp_gzip_t *gzip = (ehttp_gzip_t*)calloc(1, sizeof(*gzip)); ehttp_gzip_t *gzip = (ehttp_gzip_t *)calloc(1, sizeof(*gzip));
if (!gzip) return NULL; if (!gzip) return NULL;
do { do {
gzip->conf = conf; gzip->conf = conf;
gzip->callbacks = callbacks; gzip->callbacks = callbacks;
gzip->arg = arg; gzip->arg = arg;
if (gzip->callbacks.on_data == NULL) gzip->callbacks.on_data = dummy_on_data; if (gzip->callbacks.on_data == NULL) gzip->callbacks.on_data = dummy_on_data;
gzip->gzip = (z_stream*)calloc(1, sizeof(*gzip->gzip)); gzip->gzip = (z_stream *)calloc(1, sizeof(*gzip->gzip));
if (gzip->conf.get_header) { if (gzip->conf.get_header) {
gzip->header = (gz_header*)calloc(1, sizeof(*gzip->header)); gzip->header = (gz_header *)calloc(1, sizeof(*gzip->header));
} }
if (gzip->conf.chunk_size<=0) gzip->conf.chunk_size = EHTTP_GZIP_CHUNK_SIZE_DEFAULT; if (gzip->conf.chunk_size <= 0) gzip->conf.chunk_size = EHTTP_GZIP_CHUNK_SIZE_DEFAULT;
gzip->chunk = (char*)malloc(gzip->conf.chunk_size); gzip->chunk = (char *)malloc(gzip->conf.chunk_size);
if (!gzip->gzip || (gzip->conf.get_header && !gzip->header) || !gzip->chunk) break; if (!gzip->gzip || (gzip->conf.get_header && !gzip->header) || !gzip->chunk) break;
gzip->gzip->zalloc = Z_NULL; gzip->gzip->zalloc = Z_NULL;
gzip->gzip->zfree = Z_NULL; gzip->gzip->zfree = Z_NULL;
gzip->gzip->opaque = Z_NULL; gzip->gzip->opaque = Z_NULL;
// 863 windowBits can also be greater than 15 for optional gzip decoding. Add // 863 windowBits can also be greater than 15 for optional gzip decoding. Add
// 864 32 to windowBits to enable zlib and gzip decoding with automatic header // 864 32 to windowBits to enable zlib and gzip decoding with automatic header
// 865 detection, or add 16 to decode only the gzip format (the zlib format will // 865 detection, or add 16 to decode only the gzip format (the zlib format will
// 866 return a Z_DATA_ERROR). If a gzip stream is being decoded, strm->adler is a // 866 return a Z_DATA_ERROR). If a gzip stream is being decoded, strm->adler is a
// 867 CRC-32 instead of an Adler-32. Unlike the gunzip utility and gzread() (see // 867 CRC-32 instead of an Adler-32. Unlike the gunzip utility and gzread() (see
// 868 below), inflate() will not automatically decode concatenated gzip streams. // 868 below), inflate() will not automatically decode concatenated gzip streams.
// 869 inflate() will return Z_STREAM_END at the end of the gzip stream. The state // 869 inflate() will return Z_STREAM_END at the end of the gzip stream. The state
// 870 would need to be reset to continue decoding a subsequent gzip stream. // 870 would need to be reset to continue decoding a subsequent gzip stream.
int32_t ret = inflateInit2(gzip->gzip, 32); // 32/16? 32/16 + MAX_WBITS int32_t ret = inflateInit2(gzip->gzip, 32); // 32/16? 32/16 + MAX_WBITS
if (ret != Z_OK) break; if (ret != Z_OK) break;
if (gzip->header) { if (gzip->header) {
ret = inflateGetHeader(gzip->gzip, gzip->header); ret = inflateGetHeader(gzip->gzip, gzip->header);
} }
if (ret != Z_OK) break; if (ret != Z_OK) break;
gzip->gzip->next_out = (z_const Bytef*)gzip->chunk; gzip->gzip->next_out = (z_const Bytef *)gzip->chunk;
gzip->gzip->avail_out = gzip->conf.chunk_size; gzip->gzip->avail_out = gzip->conf.chunk_size;
gzip->state = EHTTP_GZIP_READY; gzip->state = EHTTP_GZIP_READY;
return gzip; return gzip;
} while (0); } while (0);
...@@ -105,7 +104,7 @@ ehttp_gzip_t* ehttp_gzip_create_decompressor(ehttp_gzip_conf_t conf, ehttp_gzip_ ...@@ -105,7 +104,7 @@ ehttp_gzip_t* ehttp_gzip_create_decompressor(ehttp_gzip_conf_t conf, ehttp_gzip_
return NULL; return NULL;
} }
ehttp_gzip_t* ehttp_gzip_create_compressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg); ehttp_gzip_t *ehttp_gzip_create_compressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg);
void ehttp_gzip_destroy(ehttp_gzip_t *gzip) { void ehttp_gzip_destroy(ehttp_gzip_t *gzip) {
ehttp_gzip_cleanup(gzip); ehttp_gzip_cleanup(gzip);
...@@ -129,16 +128,16 @@ int32_t ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, int32_t len) { ...@@ -129,16 +128,16 @@ int32_t ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, int32_t len) {
} }
if (ret != Z_OK && ret != Z_STREAM_END) return -1; if (ret != Z_OK && ret != Z_STREAM_END) return -1;
if (gzip->gzip->avail_out>0) { if (gzip->gzip->avail_out > 0) {
if (ret!=Z_STREAM_END) continue; if (ret != Z_STREAM_END) continue;
} }
int32_t len = (int32_t)(gzip->gzip->next_out - (z_const Bytef*)gzip->chunk); int32_t len = (int32_t)(gzip->gzip->next_out - (z_const Bytef *)gzip->chunk);
gzip->gzip->next_out[0] = '\0'; gzip->gzip->next_out[0] = '\0';
gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len); gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len);
gzip->gzip->next_out = (z_const Bytef*)gzip->chunk; gzip->gzip->next_out = (z_const Bytef *)gzip->chunk;
gzip->gzip->avail_out = gzip->conf.chunk_size; gzip->gzip->avail_out = gzip->conf.chunk_size;
} }
return 0; return 0;
...@@ -147,21 +146,20 @@ int32_t ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, int32_t len) { ...@@ -147,21 +146,20 @@ int32_t ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, int32_t len) {
int32_t ehttp_gzip_finish(ehttp_gzip_t *gzip) { int32_t ehttp_gzip_finish(ehttp_gzip_t *gzip) {
if (gzip->state != EHTTP_GZIP_READY) return -1; if (gzip->state != EHTTP_GZIP_READY) return -1;
gzip->gzip->next_in = NULL; gzip->gzip->next_in = NULL;
gzip->gzip->avail_in = 0; gzip->gzip->avail_in = 0;
int32_t ret; int32_t ret;
ret = inflate(gzip->gzip, Z_FINISH); ret = inflate(gzip->gzip, Z_FINISH);
if (ret != Z_STREAM_END) return -1; if (ret != Z_STREAM_END) return -1;
int32_t len = (int32_t)(gzip->gzip->next_out - (z_const Bytef*)gzip->chunk); int32_t len = (int32_t)(gzip->gzip->next_out - (z_const Bytef *)gzip->chunk);
gzip->gzip->next_out[0] = '\0'; gzip->gzip->next_out[0] = '\0';
gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len); gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len);
gzip->gzip->next_out = NULL; gzip->gzip->next_out = NULL;
gzip->gzip->avail_out = 0; gzip->gzip->avail_out = 0;
return 0; return 0;
} }
...@@ -48,7 +48,7 @@ bool httpProcessData(HttpContext* pContext) { ...@@ -48,7 +48,7 @@ bool httpProcessData(HttpContext* pContext) {
/* /*
* httpCloseContextByApp has been called when parsing the error * httpCloseContextByApp has been called when parsing the error
*/ */
//httpCloseContextByApp(pContext); // httpCloseContextByApp(pContext);
} else { } else {
httpProcessRequest(pContext); httpProcessRequest(pContext);
} }
......
...@@ -44,20 +44,21 @@ int32_t httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int32_t ...@@ -44,20 +44,21 @@ int32_t httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int32_t
int32_t writeLen = 0; int32_t writeLen = 0;
do { do {
if (pContext->fd > 2){ if (pContext->fd > 2) {
len = (int32_t)taosSend(pContext->fd, buf + writeLen, (size_t)(sz - writeLen), MSG_NOSIGNAL); len = (int32_t)taosSend(pContext->fd, buf + writeLen, (size_t)(sz - writeLen), MSG_NOSIGNAL);
} } else {
else {
return sz; return sz;
} }
if (len < 0) { if (len < 0) {
httpDebug("context:%p, fd:%d, socket write errno:%d:%s, times:%d", pContext, pContext->fd, errno, strerror(errno), countWait); httpDebug("context:%p, fd:%d, socket write errno:%d:%s, times:%d", pContext, pContext->fd, errno, strerror(errno),
countWait);
if (++countWait > HTTP_WRITE_RETRY_TIMES) break; if (++countWait > HTTP_WRITE_RETRY_TIMES) break;
taosMsleep(HTTP_WRITE_WAIT_TIME_MS); taosMsleep(HTTP_WRITE_WAIT_TIME_MS);
continue; continue;
} else if (len == 0) { } else if (len == 0) {
httpDebug("context:%p, fd:%d, socket write errno:%d:%s, connect already closed", pContext, pContext->fd, errno, strerror(errno)); httpDebug("context:%p, fd:%d, socket write errno:%d:%s, connect already closed", pContext, pContext->fd, errno,
strerror(errno));
break; break;
} else { } else {
countWait = 0; countWait = 0;
...@@ -80,7 +81,7 @@ int32_t httpWriteBuf(struct HttpContext* pContext, const char* buf, int32_t sz) ...@@ -80,7 +81,7 @@ int32_t httpWriteBuf(struct HttpContext* pContext, const char* buf, int32_t sz)
return writeSz; return writeSz;
} }
int32_t httpWriteBufNoTrace(struct HttpContext *pContext, const char *buf, int32_t sz) { int32_t httpWriteBufNoTrace(struct HttpContext* pContext, const char* buf, int32_t sz) {
int32_t writeSz = httpWriteBufByFd(pContext, buf, sz); int32_t writeSz = httpWriteBufByFd(pContext, buf, sz);
if (writeSz != sz) { if (writeSz != sz) {
httpError("context:%p, fd:%d, dataSize:%d, writeSize:%d, failed to send response", pContext, pContext->fd, sz, httpError("context:%p, fd:%d, dataSize:%d, writeSize:%d, failed to send response", pContext, pContext->fd, sz,
...@@ -92,8 +93,8 @@ int32_t httpWriteBufNoTrace(struct HttpContext *pContext, const char *buf, int32 ...@@ -92,8 +93,8 @@ int32_t httpWriteBufNoTrace(struct HttpContext *pContext, const char *buf, int32
int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
int32_t remain = 0; int32_t remain = 0;
char sLen[24]; char sLen[24];
int32_t srcLen = (int32_t) (buf->lst - buf->buf); int32_t srcLen = (int32_t)(buf->lst - buf->buf);
if (buf->pContext->fd <= 0) { if (buf->pContext->fd <= 0) {
httpTrace("context:%p, fd:%d, write json body error", buf->pContext, buf->pContext->fd); httpTrace("context:%p, fd:%d, write json body error", buf->pContext, buf->pContext->fd);
...@@ -113,21 +114,21 @@ int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { ...@@ -113,21 +114,21 @@ int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
httpTrace("context:%p, fd:%d, no data need dump", buf->pContext, buf->pContext->fd); httpTrace("context:%p, fd:%d, no data need dump", buf->pContext, buf->pContext->fd);
return 0; // there is no data to dump. return 0; // there is no data to dump.
} else { } else {
int32_t len = sprintf(sLen, "%d\r\n", srcLen); int32_t len = sprintf(sLen, "%x\r\n", srcLen);
httpTrace("context:%p, fd:%d, write body, chunkSize:%d, response:\n%s", buf->pContext, buf->pContext->fd, httpTrace("context:%p, fd:%d, write body, chunkSize:%d, response:\n%s", buf->pContext, buf->pContext->fd, srcLen,
srcLen, buf->buf); buf->buf);
httpWriteBufNoTrace(buf->pContext, sLen, len); httpWriteBufNoTrace(buf->pContext, sLen, len);
remain = httpWriteBufNoTrace(buf->pContext, buf->buf, srcLen); remain = httpWriteBufNoTrace(buf->pContext, buf->buf, srcLen);
} }
} else { } else {
char compressBuf[JSON_BUFFER_SIZE] = {0}; char compressBuf[JSON_BUFFER_SIZE] = {0};
int32_t compressBufLen = JSON_BUFFER_SIZE; int32_t compressBufLen = JSON_BUFFER_SIZE;
int32_t ret = httpGzipCompress(buf->pContext, buf->buf, srcLen, compressBuf, &compressBufLen, isTheLast); int32_t ret = httpGzipCompress(buf->pContext, buf->buf, srcLen, compressBuf, &compressBufLen, isTheLast);
if (ret == 0) { if (ret == 0) {
if (compressBufLen > 0) { if (compressBufLen > 0) {
int32_t len = sprintf(sLen, "%x\r\n", compressBufLen); int32_t len = sprintf(sLen, "%x\r\n", compressBufLen);
httpTrace("context:%p, fd:%d, write body, chunkSize:%d, compressSize:%d, last:%d, response:\n%s", httpTrace("context:%p, fd:%d, write body, chunkSize:%d, compressSize:%d, last:%d, response:\n%s", buf->pContext,
buf->pContext, buf->pContext->fd, srcLen, compressBufLen, isTheLast, buf->buf); buf->pContext->fd, srcLen, compressBufLen, isTheLast, buf->buf);
httpWriteBufNoTrace(buf->pContext, sLen, len); httpWriteBufNoTrace(buf->pContext, sLen, len);
remain = httpWriteBufNoTrace(buf->pContext, (const char*)compressBuf, compressBufLen); remain = httpWriteBufNoTrace(buf->pContext, (const char*)compressBuf, compressBufLen);
} else { } else {
...@@ -154,8 +155,8 @@ void httpWriteJsonBufHead(JsonBuf* buf) { ...@@ -154,8 +155,8 @@ void httpWriteJsonBufHead(JsonBuf* buf) {
buf->pContext->fd = -1; buf->pContext->fd = -1;
} }
char msg[1024] = {0}; char msg[1024] = {0};
int32_t len = -1; int32_t len = -1;
if (buf->pContext->parser->acceptEncodingGzip == 0 || !tsHttpEnableCompress) { if (buf->pContext->parser->acceptEncodingGzip == 0 || !tsHttpEnableCompress) {
len = sprintf(msg, httpRespTemplate[HTTP_RESPONSE_CHUNKED_UN_COMPRESS], httpVersionStr[buf->pContext->parser->httpVersion], len = sprintf(msg, httpRespTemplate[HTTP_RESPONSE_CHUNKED_UN_COMPRESS], httpVersionStr[buf->pContext->parser->httpVersion],
...@@ -256,16 +257,16 @@ void httpJsonInt64(JsonBuf* buf, int64_t num) { ...@@ -256,16 +257,16 @@ void httpJsonInt64(JsonBuf* buf, int64_t num) {
} }
void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us) { void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us) {
char ts[35] = {0}; char ts[35] = {0};
struct tm *ptm; struct tm* ptm;
int32_t precision = 1000; int32_t precision = 1000;
if (us) { if (us) {
precision = 1000000; precision = 1000000;
} }
time_t tt = t / precision; time_t tt = t / precision;
ptm = localtime(&tt); ptm = localtime(&tt);
int32_t length = (int32_t) strftime(ts, 35, "%Y-%m-%d %H:%M:%S", ptm); int32_t length = (int32_t)strftime(ts, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (us) { if (us) {
length += snprintf(ts + length, 8, ".%06" PRId64, t % precision); length += snprintf(ts + length, 8, ".%06" PRId64, t % precision);
} else { } else {
...@@ -276,9 +277,9 @@ void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us) { ...@@ -276,9 +277,9 @@ void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us) {
} }
void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, bool us) { void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, bool us) {
char ts[40] = {0}; char ts[40] = {0};
struct tm *ptm; struct tm* ptm;
int32_t precision = 1000; int32_t precision = 1000;
if (us) { if (us) {
precision = 1000000; precision = 1000000;
} }
......
...@@ -130,7 +130,7 @@ static int32_t httpAppendString(HttpString *str, const char *s, int32_t len) { ...@@ -130,7 +130,7 @@ static int32_t httpAppendString(HttpString *str, const char *s, int32_t len) {
static void httpClearString(HttpString *str) { static void httpClearString(HttpString *str) {
if (str->str) { if (str->str) {
str->str[0] = '\0'; str->str[0] = '\0';
str->pos = 0; str->pos = 0;
} }
} }
...@@ -237,7 +237,6 @@ static int32_t httpOnParseHeaderField(HttpParser *parser, const char *key, const ...@@ -237,7 +237,6 @@ static int32_t httpOnParseHeaderField(HttpParser *parser, const char *key, const
} }
httpTrace("context:%p, fd:%d, keepAlive:%d", pContext, pContext->fd, pContext->parser->keepAlive); httpTrace("context:%p, fd:%d, keepAlive:%d", pContext, pContext->fd, pContext->parser->keepAlive);
} }
#if 0 #if 0
else if (0 == strcasecmp(key, "Content-Encoding")) { else if (0 == strcasecmp(key, "Content-Encoding")) {
if (0 == strcmp(val, "gzip")) { if (0 == strcmp(val, "gzip")) {
...@@ -246,7 +245,7 @@ static int32_t httpOnParseHeaderField(HttpParser *parser, const char *key, const ...@@ -246,7 +245,7 @@ static int32_t httpOnParseHeaderField(HttpParser *parser, const char *key, const
} }
return 0; return 0;
} }
#endif #endif
else if (0 == strcasecmp(key, "Transfer-Encoding") || 0 == strcasecmp(key, "Content-Encoding")) { else if (0 == strcasecmp(key, "Transfer-Encoding") || 0 == strcasecmp(key, "Content-Encoding")) {
if (strstr(val, "gzip")) { if (strstr(val, "gzip")) {
...@@ -349,7 +348,7 @@ static int32_t httpOnBody(HttpParser *parser, const char *chunk, int32_t len) { ...@@ -349,7 +348,7 @@ static int32_t httpOnBody(HttpParser *parser, const char *chunk, int32_t len) {
newSize = MIN(newSize, HTTP_BUFFER_SIZE); newSize = MIN(newSize, HTTP_BUFFER_SIZE);
buf->str = realloc(buf->str, newSize); buf->str = realloc(buf->str, newSize);
buf->size = newSize; buf->size = newSize;
if (buf->str == NULL) { if (buf->str == NULL) {
httpError("context:%p, fd:%d, failed parse body, realloc %d failed", pContext, pContext->fd, buf->size); httpError("context:%p, fd:%d, failed parse body, realloc %d failed", pContext, pContext->fd, buf->size);
httpOnError(parser, 0, TSDB_CODE_HTTP_NO_ENOUGH_MEMORY); httpOnError(parser, 0, TSDB_CODE_HTTP_NO_ENOUGH_MEMORY);
...@@ -410,9 +409,7 @@ static int32_t httpPopStack(HttpParser *parser) { ...@@ -410,9 +409,7 @@ static int32_t httpPopStack(HttpParser *parser) {
return 0; return 0;
} }
static void httpClearStack(HttpStack *stack) { static void httpClearStack(HttpStack *stack) { stack->pos = 0; }
stack->pos = 0;
}
static int32_t httpCleanupStack(HttpStack *stack) { static int32_t httpCleanupStack(HttpStack *stack) {
free(stack->stacks); free(stack->stacks);
...@@ -451,7 +448,7 @@ void httpInitParser(HttpParser *parser) { ...@@ -451,7 +448,7 @@ void httpInitParser(HttpParser *parser) {
free(parser->key); parser->key = NULL; free(parser->key); parser->key = NULL;
free(parser->val); parser->val = NULL; free(parser->val); parser->val = NULL;
free(parser->authContent); parser->authContent = NULL; free(parser->authContent); parser->authContent = NULL;
httpClearStack(&parser->stacks); httpClearStack(&parser->stacks);
httpClearString(&parser->str); httpClearString(&parser->str);
httpClearString(&parser->body); httpClearString(&parser->body);
...@@ -497,7 +494,7 @@ void httpDestroyParser(HttpParser *parser) { ...@@ -497,7 +494,7 @@ void httpDestroyParser(HttpParser *parser) {
free(parser->key); parser->key = NULL; free(parser->key); parser->key = NULL;
free(parser->val); parser->val = NULL; free(parser->val); parser->val = NULL;
free(parser->authContent); parser->authContent = NULL; free(parser->authContent); parser->authContent = NULL;
httpCleanupStack(&parser->stacks); httpCleanupStack(&parser->stacks);
httpCleanupString(&parser->str); httpCleanupString(&parser->str);
httpCleanupString(&parser->body); httpCleanupString(&parser->body);
...@@ -513,25 +510,36 @@ void httpDestroyParser(HttpParser *parser) { ...@@ -513,25 +510,36 @@ void httpDestroyParser(HttpParser *parser) {
free(parser); free(parser);
} }
#define is_token(c) (strchr("!#$%&'*+-.^_`|~", c) || isdigit(c) || isalpha(c)) #define is_token(c) (strchr("!#$%&'*+-.^_`|~", c) || isdigit(c) || isalpha(c))
char *httpDecodeUrl(const char *enc) { char *httpDecodeUrl(const char *enc) {
int32_t ok = 1; int32_t ok = 1;
HttpString str = {0}; HttpString str = {0};
while (*enc) { while (*enc) {
char *p = strchr(enc, '%'); char *p = strchr(enc, '%');
if (!p) break; if (!p) break;
int32_t hex, cnt; int32_t hex, cnt;
int32_t n = sscanf(p+1, "%2x%n", &hex, &cnt); int32_t n = sscanf(p + 1, "%2x%n", &hex, &cnt);
if (n!=1 && cnt !=2) { ok = 0; break; } if (n != 1 && cnt != 2) {
if (httpAppendString(&str, enc, (int32_t)(p-enc))) { ok = 0; break; } ok = 0;
break;
}
if (httpAppendString(&str, enc, (int32_t)(p - enc))) {
ok = 0;
break;
}
char c = (char)hex; char c = (char)hex;
if (httpAppendString(&str, &c, 1)) { ok = 0; break; } if (httpAppendString(&str, &c, 1)) {
enc = p+3; ok = 0;
break;
}
enc = p + 3;
} }
char *dec = NULL; char *dec = NULL;
if (ok && *enc) { if (ok && *enc) {
if (httpAppendString(&str, enc, (int32_t)strlen(enc))) { ok = 0; } if (httpAppendString(&str, enc, (int32_t)strlen(enc))) {
ok = 0;
}
} }
if (ok) { if (ok) {
dec = str.str; dec = str.str;
...@@ -542,13 +550,13 @@ char *httpDecodeUrl(const char *enc) { ...@@ -542,13 +550,13 @@ char *httpDecodeUrl(const char *enc) {
} }
static void httpOnData(ehttp_gzip_t *gzip, void *arg, const char *buf, int32_t len) { static void httpOnData(ehttp_gzip_t *gzip, void *arg, const char *buf, int32_t len) {
HttpParser *parser = (HttpParser*)arg; HttpParser *parser = (HttpParser *)arg;
httpOnBody(parser, buf, len); httpOnBody(parser, buf, len);
} }
static int32_t httpParserOnBegin(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnBegin(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
if (c == 'G' || c == 'P' || c == 'H' || c == 'D' || c == 'C' || c == 'O' || c == 'T') { if (c == 'G' || c == 'P' || c == 'H' || c == 'D' || c == 'C' || c == 'O' || c == 'T') {
if (httpAppendString(&parser->str, &c, 1)) { if (httpAppendString(&parser->str, &c, 1)) {
...@@ -570,7 +578,7 @@ static int32_t httpParserOnBegin(HttpParser *parser, HTTP_PARSER_STATE state, co ...@@ -570,7 +578,7 @@ static int32_t httpParserOnBegin(HttpParser *parser, HTTP_PARSER_STATE state, co
static int32_t httpParserOnRquestOrResponse(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnRquestOrResponse(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
if (parser->str.pos == 1) { if (parser->str.pos == 1) {
if (c == 'T' && parser->str.str[0] == 'H') { if (c == 'T' && parser->str.str[0] == 'H') {
...@@ -608,7 +616,7 @@ static int32_t httpParserOnRquestOrResponse(HttpParser *parser, HTTP_PARSER_STAT ...@@ -608,7 +616,7 @@ static int32_t httpParserOnRquestOrResponse(HttpParser *parser, HTTP_PARSER_STAT
static int32_t httpParserOnMethod(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnMethod(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
if (isalnum(c) || strchr("!#$%&'*+-.^_`|~", c)) { if (isalnum(c) || strchr("!#$%&'*+-.^_`|~", c)) {
if (httpAppendString(&parser->str, &c, 1)) { if (httpAppendString(&parser->str, &c, 1)) {
...@@ -637,7 +645,7 @@ static int32_t httpParserOnMethod(HttpParser *parser, HTTP_PARSER_STATE state, c ...@@ -637,7 +645,7 @@ static int32_t httpParserOnMethod(HttpParser *parser, HTTP_PARSER_STATE state, c
static int32_t httpParserOnTarget(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnTarget(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
if (!isspace(c) && c != '\r' && c != '\n') { if (!isspace(c) && c != '\r' && c != '\n') {
if (httpAppendString(&parser->str, &c, 1)) { if (httpAppendString(&parser->str, &c, 1)) {
...@@ -664,7 +672,7 @@ static int32_t httpParserOnTarget(HttpParser *parser, HTTP_PARSER_STATE state, c ...@@ -664,7 +672,7 @@ static int32_t httpParserOnTarget(HttpParser *parser, HTTP_PARSER_STATE state, c
static int32_t httpParserOnVersion(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnVersion(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
const char *prefix = "HTTP/1."; const char *prefix = "HTTP/1.";
int32_t len = (int32_t)strlen(prefix); int32_t len = (int32_t)strlen(prefix);
...@@ -727,7 +735,7 @@ static int32_t httpParserOnVersion(HttpParser *parser, HTTP_PARSER_STATE state, ...@@ -727,7 +735,7 @@ static int32_t httpParserOnVersion(HttpParser *parser, HTTP_PARSER_STATE state,
static int32_t httpParserOnSp(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnSp(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
if (c == ' ') { if (c == ' ') {
httpPopStack(parser); httpPopStack(parser);
...@@ -742,7 +750,7 @@ static int32_t httpParserOnSp(HttpParser *parser, HTTP_PARSER_STATE state, const ...@@ -742,7 +750,7 @@ static int32_t httpParserOnSp(HttpParser *parser, HTTP_PARSER_STATE state, const
static int32_t httpParserOnStatusCode(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnStatusCode(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
if (isdigit(c)) { if (isdigit(c)) {
if (httpAppendString(&parser->str, &c, 1)) { if (httpAppendString(&parser->str, &c, 1)) {
...@@ -767,7 +775,7 @@ static int32_t httpParserOnStatusCode(HttpParser *parser, HTTP_PARSER_STATE stat ...@@ -767,7 +775,7 @@ static int32_t httpParserOnStatusCode(HttpParser *parser, HTTP_PARSER_STATE stat
static int32_t httpParserOnReasonPhrase(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnReasonPhrase(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
if (c == '\r') { if (c == '\r') {
parser->reasonPhrase = strdup(parser->str.str); parser->reasonPhrase = strdup(parser->str.str);
...@@ -808,10 +816,10 @@ static int32_t httpParserPostProcess(HttpParser *parser) { ...@@ -808,10 +816,10 @@ static int32_t httpParserPostProcess(HttpParser *parser) {
static int32_t httpParserOnCrlf(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnCrlf(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
const char *s = "\r\n"; const char *s = "\r\n";
int32_t len = (int32_t)strlen(s); int32_t len = (int32_t)strlen(s);
if (s[parser->str.pos] != c) { if (s[parser->str.pos] != c) {
httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c); httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c);
ok = -1; ok = -1;
...@@ -838,7 +846,7 @@ static int32_t httpParserOnCrlf(HttpParser *parser, HTTP_PARSER_STATE state, con ...@@ -838,7 +846,7 @@ static int32_t httpParserOnCrlf(HttpParser *parser, HTTP_PARSER_STATE state, con
static int32_t httpParserOnHeader(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnHeader(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
if (c == '\r') { if (c == '\r') {
httpPopStack(parser); httpPopStack(parser);
...@@ -876,7 +884,7 @@ static int32_t httpParserOnHeader(HttpParser *parser, HTTP_PARSER_STATE state, c ...@@ -876,7 +884,7 @@ static int32_t httpParserOnHeader(HttpParser *parser, HTTP_PARSER_STATE state, c
static int32_t httpParserOnHeaderKey(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnHeaderKey(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
if (isalnum(c) || strchr("!#$%&'*+-.^_`|~", c)) { if (isalnum(c) || strchr("!#$%&'*+-.^_`|~", c)) {
if (httpAppendString(&parser->str, &c, 1)) { if (httpAppendString(&parser->str, &c, 1)) {
...@@ -888,7 +896,7 @@ static int32_t httpParserOnHeaderKey(HttpParser *parser, HTTP_PARSER_STATE state ...@@ -888,7 +896,7 @@ static int32_t httpParserOnHeaderKey(HttpParser *parser, HTTP_PARSER_STATE state
break; break;
} }
if (c == ':') { if (c == ':') {
parser->key = strdup(parser->str.str); parser->key = strdup(parser->str.str);
if (!parser->key) { if (!parser->key) {
httpError("context:%p, fd:%d, parser state:%d, char:[%c]%02x, oom", pContext, pContext->fd, state, c, c); httpError("context:%p, fd:%d, parser state:%d, char:[%c]%02x, oom", pContext, pContext->fd, state, c, c);
ok = -1; ok = -1;
...@@ -908,7 +916,7 @@ static int32_t httpParserOnHeaderKey(HttpParser *parser, HTTP_PARSER_STATE state ...@@ -908,7 +916,7 @@ static int32_t httpParserOnHeaderKey(HttpParser *parser, HTTP_PARSER_STATE state
static int32_t httpParserOnHeaderVal(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnHeaderVal(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
if (c != '\r' && c != '\n' && (!isspace(c) || parser->str.pos > 0)) { if (c != '\r' && c != '\n' && (!isspace(c) || parser->str.pos > 0)) {
if (httpAppendString(&parser->str, &c, 1)) { if (httpAppendString(&parser->str, &c, 1)) {
...@@ -935,10 +943,10 @@ static int32_t httpParserOnHeaderVal(HttpParser *parser, HTTP_PARSER_STATE state ...@@ -935,10 +943,10 @@ static int32_t httpParserOnHeaderVal(HttpParser *parser, HTTP_PARSER_STATE state
static int32_t httpParserOnChunkSize(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnChunkSize(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
int32_t bytes; int32_t bytes;
int32_t len; int32_t len;
int32_t n; int32_t n;
do { do {
if (isxdigit(c)) { if (isxdigit(c)) {
if (httpAppendString(&parser->str, &c, 1)) { if (httpAppendString(&parser->str, &c, 1)) {
...@@ -985,7 +993,7 @@ static int32_t httpParserOnChunkSize(HttpParser *parser, HTTP_PARSER_STATE state ...@@ -985,7 +993,7 @@ static int32_t httpParserOnChunkSize(HttpParser *parser, HTTP_PARSER_STATE state
static int32_t httpParserOnChunk(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnChunk(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
if (httpAppendString(&parser->str, &c, 1)) { if (httpAppendString(&parser->str, &c, 1)) {
httpError("context:%p, fd:%d, parser state:%d, char:[%c]%02x, oom", pContext, pContext->fd, state, c, c); httpError("context:%p, fd:%d, parser state:%d, char:[%c]%02x, oom", pContext, pContext->fd, state, c, c);
...@@ -1019,7 +1027,7 @@ static int32_t httpParserOnChunk(HttpParser *parser, HTTP_PARSER_STATE state, co ...@@ -1019,7 +1027,7 @@ static int32_t httpParserOnChunk(HttpParser *parser, HTTP_PARSER_STATE state, co
static int32_t httpParserOnEnd(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) { static int32_t httpParserOnEnd(HttpParser *parser, HTTP_PARSER_STATE state, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
ok = -1; ok = -1;
httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c); httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c);
...@@ -1029,8 +1037,8 @@ static int32_t httpParserOnEnd(HttpParser *parser, HTTP_PARSER_STATE state, cons ...@@ -1029,8 +1037,8 @@ static int32_t httpParserOnEnd(HttpParser *parser, HTTP_PARSER_STATE state, cons
} }
static int32_t httpParseChar(HttpParser *parser, const char c, int32_t *again) { static int32_t httpParseChar(HttpParser *parser, const char c, int32_t *again) {
HttpContext *pContext = parser->pContext; HttpContext * pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
HTTP_PARSER_STATE state = httpTopStack(parser); HTTP_PARSER_STATE state = httpTopStack(parser);
do { do {
if (state == HTTP_PARSER_BEGIN) { if (state == HTTP_PARSER_BEGIN) {
...@@ -1119,9 +1127,9 @@ static int32_t httpParseChar(HttpParser *parser, const char c, int32_t *again) { ...@@ -1119,9 +1127,9 @@ static int32_t httpParseChar(HttpParser *parser, const char c, int32_t *again) {
int32_t httpParseBuf(HttpParser *parser, const char *buf, int32_t len) { int32_t httpParseBuf(HttpParser *parser, const char *buf, int32_t len) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
const char *p = buf; const char * p = buf;
int32_t ret = 0; int32_t ret = 0;
int32_t i = 0; int32_t i = 0;
while (i < len) { while (i < len) {
int32_t again = 0; int32_t again = 0;
......
...@@ -38,16 +38,16 @@ typedef struct { ...@@ -38,16 +38,16 @@ typedef struct {
} SHttpWorkerPool; } SHttpWorkerPool;
typedef struct { typedef struct {
void * param; void * param;
void * result; void * result;
int32_t code; int32_t code;
int32_t rows; int32_t rows;
FHttpResultFp fp; FHttpResultFp fp;
} SHttpResult; } SHttpResult;
static SHttpWorkerPool tsHttpPool; static SHttpWorkerPool tsHttpPool;
static taos_qset tsHttpQset; static taos_qset tsHttpQset;
static taos_queue tsHttpQueue; static taos_queue tsHttpQueue;
void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t code, int32_t rows, FHttpResultFp fp) { void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t code, int32_t rows, FHttpResultFp fp) {
if (tsHttpQueue != NULL) { if (tsHttpQueue != NULL) {
...@@ -105,7 +105,7 @@ static bool httpAllocateResultQueue() { ...@@ -105,7 +105,7 @@ static bool httpAllocateResultQueue() {
httpDebug("http result worker:%d is launched, total:%d", pWorker->workerId, tsHttpPool.num); httpDebug("http result worker:%d is launched, total:%d", pWorker->workerId, tsHttpPool.num);
} }
httpInfo("http result queue is opened"); httpInfo("http result queue is opened");
return true; return true;
} }
......
...@@ -160,8 +160,9 @@ void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char *errMsg) { ...@@ -160,8 +160,9 @@ void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char *errMsg) {
if (temp[i] == '\"') { if (temp[i] == '\"') {
temp[i] = '\''; temp[i] = '\'';
} else if (temp[i] == '\n') { } else if (temp[i] == '\n') {
temp[i] = ' '; temp[i] = ' ';
} else {} } else {
}
} }
httpSendErrorRespImp(pContext, httpCode, "Bad Request", TSDB_CODE_TSC_INVALID_SQL & 0XFFFF, temp); httpSendErrorRespImp(pContext, httpCode, "Bad Request", TSDB_CODE_TSC_INVALID_SQL & 0XFFFF, temp);
......
...@@ -95,7 +95,6 @@ bool restProcessSqlRequest(HttpContext* pContext, int32_t timestampFmt) { ...@@ -95,7 +95,6 @@ bool restProcessSqlRequest(HttpContext* pContext, int32_t timestampFmt) {
return false; return false;
} }
/* /*
* for async test * for async test
* *
......
...@@ -83,7 +83,8 @@ void restStartSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result) ...@@ -83,7 +83,8 @@ void restStartSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result)
httpJsonToken(jsonBuf, JsonArrStt); httpJsonToken(jsonBuf, JsonArrStt);
} }
bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int32_t numOfRows, int32_t timestampFormat) { bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int32_t numOfRows,
int32_t timestampFormat) {
JsonBuf *jsonBuf = httpMallocJsonBuf(pContext); JsonBuf *jsonBuf = httpMallocJsonBuf(pContext);
if (jsonBuf == NULL) return false; if (jsonBuf == NULL) return false;
...@@ -95,7 +96,7 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -95,7 +96,7 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
if (row == NULL) { if (row == NULL) {
continue; continue;
} }
int32_t* length = taos_fetch_lengths(result); int32_t *length = taos_fetch_lengths(result);
// data row array begin // data row array begin
httpJsonItemToken(jsonBuf); httpJsonItemToken(jsonBuf);
...@@ -131,15 +132,17 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -131,15 +132,17 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
httpJsonStringForTransMean(jsonBuf, (char*)row[i], length[i]); httpJsonStringForTransMean(jsonBuf, (char *)row[i], length[i]);
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
if (timestampFormat == REST_TIMESTAMP_FMT_LOCAL_STRING) { if (timestampFormat == REST_TIMESTAMP_FMT_LOCAL_STRING) {
httpJsonTimestamp(jsonBuf, *((int64_t *)row[i]), taos_result_precision(result) == TSDB_TIME_PRECISION_MICRO); httpJsonTimestamp(jsonBuf, *((int64_t *)row[i]),
taos_result_precision(result) == TSDB_TIME_PRECISION_MICRO);
} else if (timestampFormat == REST_TIMESTAMP_FMT_TIMESTAMP) { } else if (timestampFormat == REST_TIMESTAMP_FMT_TIMESTAMP) {
httpJsonInt64(jsonBuf, *((int64_t *)row[i])); httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
} else { } else {
httpJsonUtcTimestamp(jsonBuf, *((int64_t *)row[i]), taos_result_precision(result) == TSDB_TIME_PRECISION_MICRO); httpJsonUtcTimestamp(jsonBuf, *((int64_t *)row[i]),
taos_result_precision(result) == TSDB_TIME_PRECISION_MICRO);
} }
break; break;
default: default:
...@@ -148,8 +151,8 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -148,8 +151,8 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
} }
// data row array end // data row array end
httpJsonToken(jsonBuf, JsonArrEnd); httpJsonToken(jsonBuf, JsonArrEnd);
cmd->numOfRows ++; cmd->numOfRows++;
if (pContext->fd <= 0) { if (pContext->fd <= 0) {
httpError("context:%p, fd:%d, user:%s, conn closed, abort retrieve", pContext, pContext->fd, pContext->user); httpError("context:%p, fd:%d, user:%s, conn closed, abort retrieve", pContext, pContext->fd, pContext->user);
...@@ -168,15 +171,15 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -168,15 +171,15 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
} }
bool restBuildSqlTimestampJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int32_t numOfRows) { bool restBuildSqlTimestampJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int32_t numOfRows) {
return restBuildSqlJson(pContext,cmd, result, numOfRows, REST_TIMESTAMP_FMT_TIMESTAMP); return restBuildSqlJson(pContext, cmd, result, numOfRows, REST_TIMESTAMP_FMT_TIMESTAMP);
} }
bool restBuildSqlLocalTimeStringJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int32_t numOfRows) { bool restBuildSqlLocalTimeStringJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int32_t numOfRows) {
return restBuildSqlJson(pContext,cmd, result, numOfRows, REST_TIMESTAMP_FMT_LOCAL_STRING); return restBuildSqlJson(pContext, cmd, result, numOfRows, REST_TIMESTAMP_FMT_LOCAL_STRING);
} }
bool restBuildSqlUtcTimeStringJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int32_t numOfRows) { bool restBuildSqlUtcTimeStringJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int32_t numOfRows) {
return restBuildSqlJson(pContext,cmd, result, numOfRows, REST_TIMESTAMP_FMT_UTC_STRING); return restBuildSqlJson(pContext, cmd, result, numOfRows, REST_TIMESTAMP_FMT_UTC_STRING);
} }
void restStopSqlJson(HttpContext *pContext, HttpSqlCmd *cmd) { void restStopSqlJson(HttpContext *pContext, HttpSqlCmd *cmd) {
......
...@@ -27,19 +27,21 @@ ...@@ -27,19 +27,21 @@
static bool httpReadData(HttpContext *pContext); static bool httpReadData(HttpContext *pContext);
static void httpStopThread(HttpThread* pThread) { static void httpStopThread(HttpThread *pThread) {
pThread->stop = true; pThread->stop = true;
// signal the thread to stop, try graceful method first, // signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed // and use pthread_cancel when failed
struct epoll_event event = { .events = EPOLLIN }; struct epoll_event event = {.events = EPOLLIN};
eventfd_t fd = eventfd(1, 0); eventfd_t fd = eventfd(1, 0);
if (fd == -1) { if (fd == -1) {
httpError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s", pThread->label, strerror(errno)); httpError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s",
pThread->label, strerror(errno));
pThread->stop = true; pThread->stop = true;
pthread_cancel(pThread->thread); pthread_cancel(pThread->thread);
} else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { } else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
httpError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s", pThread->label, strerror(errno)); httpError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s",
pThread->label, strerror(errno));
pthread_cancel(pThread->thread); pthread_cancel(pThread->thread);
} }
...@@ -61,7 +63,7 @@ void httpCleanUpConnect() { ...@@ -61,7 +63,7 @@ void httpCleanUpConnect() {
} }
for (int32_t i = 0; i < pServer->numOfThreads; ++i) { for (int32_t i = 0; i < pServer->numOfThreads; ++i) {
HttpThread* pThread = pServer->pThreads + i; HttpThread *pThread = pServer->pThreads + i;
if (pThread != NULL) { if (pThread != NULL) {
httpStopThread(pThread); httpStopThread(pThread);
} }
...@@ -71,8 +73,8 @@ void httpCleanUpConnect() { ...@@ -71,8 +73,8 @@ void httpCleanUpConnect() {
} }
static void httpProcessHttpData(void *param) { static void httpProcessHttpData(void *param) {
HttpServer *pServer = &tsHttpServer; HttpServer * pServer = &tsHttpServer;
HttpThread *pThread = (HttpThread *)param; HttpThread * pThread = (HttpThread *)param;
HttpContext *pContext; HttpContext *pContext;
int32_t fdNum; int32_t fdNum;
...@@ -92,8 +94,8 @@ static void httpProcessHttpData(void *param) { ...@@ -92,8 +94,8 @@ static void httpProcessHttpData(void *param) {
pContext = httpGetContext(events[i].data.ptr); pContext = httpGetContext(events[i].data.ptr);
if (pContext == NULL) { if (pContext == NULL) {
httpError("context:%p, is already released, close connect", events[i].data.ptr); httpError("context:%p, is already released, close connect", events[i].data.ptr);
//epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, events[i].data.fd, NULL); // epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
//taosClose(events[i].data.fd); // taosClose(events[i].data.fd);
continue; continue;
} }
...@@ -200,7 +202,7 @@ static void *httpAcceptHttpConnection(void *arg) { ...@@ -200,7 +202,7 @@ static void *httpAcceptHttpConnection(void *arg) {
taosCloseSocket(connFd); taosCloseSocket(connFd);
continue; continue;
} }
#endif #endif
taosKeepTcpAlive(connFd); taosKeepTcpAlive(connFd);
taosSetNonblocking(connFd, 1); taosSetNonblocking(connFd, 1);
...@@ -210,15 +212,15 @@ static void *httpAcceptHttpConnection(void *arg) { ...@@ -210,15 +212,15 @@ static void *httpAcceptHttpConnection(void *arg) {
pContext = httpCreateContext(connFd); pContext = httpCreateContext(connFd);
if (pContext == NULL) { if (pContext == NULL) {
httpError("fd:%d, ip:%s:%u, no enough resource to allocate http context", connFd, taosInetNtoa(clientAddr.sin_addr), httpError("fd:%d, ip:%s:%u, no enough resource to allocate http context", connFd,
htons(clientAddr.sin_port)); taosInetNtoa(clientAddr.sin_addr), htons(clientAddr.sin_port));
taosCloseSocket(connFd); taosCloseSocket(connFd);
continue; continue;
} }
pContext->pThread = pThread; pContext->pThread = pThread;
sprintf(pContext->ipstr, "%s:%u", taosInetNtoa(clientAddr.sin_addr), htons(clientAddr.sin_port)); sprintf(pContext->ipstr, "%s:%u", taosInetNtoa(clientAddr.sin_addr), htons(clientAddr.sin_port));
struct epoll_event event; struct epoll_event event;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP; event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
event.data.ptr = pContext; event.data.ptr = pContext;
...@@ -276,7 +278,7 @@ bool httpInitConnect() { ...@@ -276,7 +278,7 @@ bool httpInitConnect() {
if (pthread_create(&(pThread->thread), &thattr, (void *)httpProcessHttpData, (void *)(pThread)) != 0) { if (pthread_create(&(pThread->thread), &thattr, (void *)httpProcessHttpData, (void *)(pThread)) != 0) {
httpError("http thread:%s, failed to create HTTP process data thread, reason:%s", pThread->label, httpError("http thread:%s, failed to create HTTP process data thread, reason:%s", pThread->label,
strerror(errno)); strerror(errno));
pthread_mutex_destroy(&(pThread->threadMutex)); pthread_mutex_destroy(&(pThread->threadMutex));
return false; return false;
} }
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
...@@ -307,7 +309,7 @@ static bool httpReadData(HttpContext *pContext) { ...@@ -307,7 +309,7 @@ static bool httpReadData(HttpContext *pContext) {
} }
if (pParser->parsed) { if (pParser->parsed) {
httpDebug("context:%p, fd:%d, not in ready state, parsed:%d", pContext, pContext->fd, pParser->parsed); httpDebug("context:%p, fd:%d, not in ready state, parsed:%d", pContext, pContext->fd, pParser->parsed);
return false; return false;
} }
......
...@@ -35,7 +35,8 @@ void httpCreateSession(HttpContext *pContext, void *taos) { ...@@ -35,7 +35,8 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
session.refCount = 1; session.refCount = 1;
int32_t len = snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); int32_t len = snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = taosCachePut(server->sessionCache, session.id, len, &session, sizeof(HttpSession), tsHttpSessionExpire * 1000); pContext->session =
taosCachePut(server->sessionCache, session.id, len, &session, sizeof(HttpSession), tsHttpSessionExpire * 1000);
// void *temp = pContext->session; // void *temp = pContext->session;
// taosCacheRelease(server->sessionCache, (void **)&temp, false); // taosCacheRelease(server->sessionCache, (void **)&temp, false);
...@@ -56,7 +57,7 @@ static void httpFetchSessionImp(HttpContext *pContext) { ...@@ -56,7 +57,7 @@ static void httpFetchSessionImp(HttpContext *pContext) {
HttpServer *server = &tsHttpServer; HttpServer *server = &tsHttpServer;
pthread_mutex_lock(&server->serverMutex); pthread_mutex_lock(&server->serverMutex);
char sessionId[HTTP_SESSION_ID_LEN]; char sessionId[HTTP_SESSION_ID_LEN];
int32_t len = snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); int32_t len = snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = taosCacheAcquireByKey(server->sessionCache, sessionId, len); pContext->session = taosCacheAcquireByKey(server->sessionCache, sessionId, len);
......
...@@ -64,7 +64,7 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int32 ...@@ -64,7 +64,7 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int32
} }
taos_free_result(result); taos_free_result(result);
if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->stopJsonFp) { if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->stopJsonFp) {
(encode->stopJsonFp)(pContext, singleCmd); (encode->stopJsonFp)(pContext, singleCmd);
} }
...@@ -82,7 +82,7 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int32_t code, ...@@ -82,7 +82,7 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int32_t code,
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return; if (pContext == NULL) return;
HttpSqlCmds *multiCmds = pContext->multiCmds; HttpSqlCmds * multiCmds = pContext->multiCmds;
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
HttpSqlCmd *singleCmd = multiCmds->cmds + multiCmds->pos; HttpSqlCmd *singleCmd = multiCmds->cmds + multiCmds->pos;
...@@ -269,8 +269,8 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int32_t code ...@@ -269,8 +269,8 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int32_t code
pContext->user, tstrerror(code), pObj, taos_errstr(pObj)); pContext->user, tstrerror(code), pObj, taos_errstr(pObj));
httpSendTaosdInvalidSqlErrorResp(pContext, taos_errstr(pObj)); httpSendTaosdInvalidSqlErrorResp(pContext, taos_errstr(pObj));
} else { } else {
httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p", pContext, pContext->fd, httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p", pContext, pContext->fd, pContext->user,
pContext->user, tstrerror(code), pObj); tstrerror(code), pObj);
httpSendErrorResp(pContext, code); httpSendErrorResp(pContext, code);
} }
taos_free_result(result); taos_free_result(result);
...@@ -381,7 +381,7 @@ void httpExecCmd(HttpContext *pContext) { ...@@ -381,7 +381,7 @@ void httpExecCmd(HttpContext *pContext) {
void httpProcessRequestCb(void *param, TAOS_RES *result, int32_t code) { void httpProcessRequestCb(void *param, TAOS_RES *result, int32_t code) {
HttpContext *pContext = param; HttpContext *pContext = param;
taos_free_result(result); taos_free_result(result);
if (pContext == NULL) return; if (pContext == NULL) return;
if (code < 0) { if (code < 0) {
......
...@@ -110,10 +110,8 @@ void httpCleanUpSystem() { ...@@ -110,10 +110,8 @@ void httpCleanUpSystem() {
pthread_mutex_destroy(&tsHttpServer.serverMutex); pthread_mutex_destroy(&tsHttpServer.serverMutex);
tfree(tsHttpServer.pThreads); tfree(tsHttpServer.pThreads);
tsHttpServer.pThreads = NULL; tsHttpServer.pThreads = NULL;
tsHttpServer.status = HTTP_SERVER_CLOSED; tsHttpServer.status = HTTP_SERVER_CLOSED;
} }
int32_t httpGetReqCount() { int32_t httpGetReqCount() { return atomic_exchange_32(&tsHttpServer.requestNum, 0); }
return atomic_exchange_32(&tsHttpServer.requestNum, 0);
}
...@@ -137,8 +137,8 @@ void tgInitSchemas(int32_t size) { ...@@ -137,8 +137,8 @@ void tgInitSchemas(int32_t size) {
} }
void tgParseSchemaMetric(cJSON *metric) { void tgParseSchemaMetric(cJSON *metric) {
STgSchema schema = {0}; STgSchema schema = {0};
bool parsedOk = true; bool parsedOk = true;
// name // name
cJSON *name = cJSON_GetObjectItem(metric, "name"); cJSON *name = cJSON_GetObjectItem(metric, "name");
...@@ -186,7 +186,7 @@ void tgParseSchemaMetric(cJSON *metric) { ...@@ -186,7 +186,7 @@ void tgParseSchemaMetric(cJSON *metric) {
schema.tbName = calloc(tbnameLen + 1, 1); schema.tbName = calloc(tbnameLen + 1, 1);
strcpy(schema.tbName, tbname->valuestring); strcpy(schema.tbName, tbname->valuestring);
// fields // fields
cJSON *fields = cJSON_GetObjectItem(metric, "fields"); cJSON *fields = cJSON_GetObjectItem(metric, "fields");
if (fields == NULL) { if (fields == NULL) {
goto ParseEnd; goto ParseEnd;
...@@ -227,14 +227,14 @@ ParseEnd: ...@@ -227,14 +227,14 @@ ParseEnd:
} }
} }
int32_t tgParseSchema(const char *content, char*fileName) { int32_t tgParseSchema(const char *content, char *fileName) {
cJSON *root = cJSON_Parse(content); cJSON *root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
httpError("failed to parse telegraf schema file:%s, invalid json format, content:%s", fileName, content); httpError("failed to parse telegraf schema file:%s, invalid json format, content:%s", fileName, content);
return -1; return -1;
} }
int32_t size = 0; int32_t size = 0;
cJSON *metrics = cJSON_GetObjectItem(root, "metrics"); cJSON * metrics = cJSON_GetObjectItem(root, "metrics");
if (metrics != NULL) { if (metrics != NULL) {
size = cJSON_GetArraySize(metrics); size = cJSON_GetArraySize(metrics);
if (size <= 0) { if (size <= 0) {
...@@ -277,7 +277,7 @@ int32_t tgReadSchema(char *fileName) { ...@@ -277,7 +277,7 @@ int32_t tgReadSchema(char *fileName) {
rewind(fp); rewind(fp);
char * content = (char *)calloc(contentSize + 1, 1); char * content = (char *)calloc(contentSize + 1, 1);
int32_t result = (int32_t)fread(content, 1, contentSize, fp); int32_t result = (int32_t)fread(content, 1, contentSize, fp);
if (result != contentSize) { if (result != contentSize) {
httpError("failed to read telegraf schema file:%s", fileName); httpError("failed to read telegraf schema file:%s", fileName);
fclose(fp); fclose(fp);
...@@ -296,7 +296,7 @@ int32_t tgReadSchema(char *fileName) { ...@@ -296,7 +296,7 @@ int32_t tgReadSchema(char *fileName) {
} }
void tgInitHandle(HttpServer *pServer) { void tgInitHandle(HttpServer *pServer) {
char fileName[TSDB_FILENAME_LEN*2] = {0}; char fileName[TSDB_FILENAME_LEN * 2] = {0};
sprintf(fileName, "%s/taos.telegraf.cfg", configDir); sprintf(fileName, "%s/taos.telegraf.cfg", configDir);
if (tgReadSchema(fileName) <= 0) { if (tgReadSchema(fileName) <= 0) {
tgFreeSchemas(); tgFreeSchemas();
...@@ -308,9 +308,7 @@ void tgInitHandle(HttpServer *pServer) { ...@@ -308,9 +308,7 @@ void tgInitHandle(HttpServer *pServer) {
httpAddMethod(pServer, &tgDecodeMethod); httpAddMethod(pServer, &tgDecodeMethod);
} }
void tgCleanupHandle() { void tgCleanupHandle() { tgFreeSchemas(); }
tgFreeSchemas();
}
bool tgGetUserFromUrl(HttpContext *pContext) { bool tgGetUserFromUrl(HttpContext *pContext) {
HttpParser *pParser = pContext->parser; HttpParser *pParser = pContext->parser;
...@@ -357,7 +355,7 @@ char *tgGetStableName(char *stname, cJSON *fields, int32_t fieldsSize) { ...@@ -357,7 +355,7 @@ char *tgGetStableName(char *stname, cJSON *fields, int32_t fieldsSize) {
bool schemaMatched = true; bool schemaMatched = true;
for (int32_t f = 0; f < schema->fieldNum; ++f) { for (int32_t f = 0; f < schema->fieldNum; ++f) {
char *fieldName = schema->fields[f]; char *fieldName = schema->fields[f];
bool fieldMatched = false; bool fieldMatched = false;
for (int32_t i = 0; i < fieldsSize; i++) { for (int32_t i = 0; i < fieldsSize; i++) {
cJSON *field = cJSON_GetArrayItem(fields, i); cJSON *field = cJSON_GetArrayItem(fields, i);
...@@ -469,9 +467,9 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) { ...@@ -469,9 +467,9 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) {
} }
/* /*
* tag size may be larget than TSDB_COL_NAME_LEN * tag size may be larget than TSDB_COL_NAME_LEN
* we keep the first TSDB_COL_NAME_LEN bytes * we keep the first TSDB_COL_NAME_LEN bytes
*/ */
if (0) { if (0) {
if (strlen(tag->string) >= TSDB_COL_NAME_LEN) { if (strlen(tag->string) >= TSDB_COL_NAME_LEN) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_TG_TAG_NAME_SIZE); httpSendErrorResp(pContext, TSDB_CODE_HTTP_TG_TAG_NAME_SIZE);
...@@ -540,9 +538,9 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) { ...@@ -540,9 +538,9 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) {
return false; return false;
} }
/* /*
* tag size may be larget than TSDB_COL_NAME_LEN * tag size may be larget than TSDB_COL_NAME_LEN
* we keep the first TSDB_COL_NAME_LEN bytes * we keep the first TSDB_COL_NAME_LEN bytes
*/ */
if (0) { if (0) {
if (strlen(field->string) >= TSDB_COL_NAME_LEN) { if (strlen(field->string) >= TSDB_COL_NAME_LEN) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_TG_FIELD_NAME_SIZE); httpSendErrorResp(pContext, TSDB_CODE_HTTP_TG_FIELD_NAME_SIZE);
...@@ -578,8 +576,8 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) { ...@@ -578,8 +576,8 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) {
table_cmd->cmdType = HTTP_CMD_TYPE_INSERT; table_cmd->cmdType = HTTP_CMD_TYPE_INSERT;
// order by tag name // order by tag name
cJSON *orderedTags[TG_MAX_SORT_TAG_SIZE] = {0}; cJSON * orderedTags[TG_MAX_SORT_TAG_SIZE] = {0};
int32_t orderTagsLen = 0; int32_t orderTagsLen = 0;
for (int32_t i = 0; i < tagsSize; ++i) { for (int32_t i = 0; i < tagsSize; ++i) {
cJSON *tag = cJSON_GetArrayItem(tags, i); cJSON *tag = cJSON_GetArrayItem(tags, i);
orderedTags[orderTagsLen++] = tag; orderedTags[orderTagsLen++] = tag;
...@@ -603,7 +601,8 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) { ...@@ -603,7 +601,8 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) {
if (tsTelegrafUseFieldNum == 0) { if (tsTelegrafUseFieldNum == 0) {
table_cmd->stable = stable_cmd->stable = httpAddToSqlCmdBuffer(pContext, "%s", stname); table_cmd->stable = stable_cmd->stable = httpAddToSqlCmdBuffer(pContext, "%s", stname);
} else { } else {
table_cmd->stable = stable_cmd->stable = httpAddToSqlCmdBuffer(pContext, "%s_%d_%d", stname, fieldsSize, orderTagsLen); table_cmd->stable = stable_cmd->stable =
httpAddToSqlCmdBuffer(pContext, "%s_%d_%d", stname, fieldsSize, orderTagsLen);
} }
table_cmd->stable = stable_cmd->stable = table_cmd->stable = stable_cmd->stable =
httpShrinkTableName(pContext, table_cmd->stable, httpGetCmdsString(pContext, table_cmd->stable)); httpShrinkTableName(pContext, table_cmd->stable, httpGetCmdsString(pContext, table_cmd->stable));
...@@ -627,9 +626,11 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) { ...@@ -627,9 +626,11 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) {
// table name // table name
if (tsTelegrafUseFieldNum == 0) { if (tsTelegrafUseFieldNum == 0) {
table_cmd->table = stable_cmd->table = httpAddToSqlCmdBufferNoTerminal(pContext, "%s_%s", stname, host->valuestring); table_cmd->table = stable_cmd->table =
httpAddToSqlCmdBufferNoTerminal(pContext, "%s_%s", stname, host->valuestring);
} else { } else {
table_cmd->table = stable_cmd->table = httpAddToSqlCmdBufferNoTerminal(pContext, "%s_%d_%d_%s", stname, fieldsSize, orderTagsLen, host->valuestring); table_cmd->table = stable_cmd->table =
httpAddToSqlCmdBufferNoTerminal(pContext, "%s_%d_%d_%s", stname, fieldsSize, orderTagsLen, host->valuestring);
} }
for (int32_t i = 0; i < orderTagsLen; ++i) { for (int32_t i = 0; i < orderTagsLen; ++i) {
cJSON *tag = orderedTags[i]; cJSON *tag = orderedTags[i];
......
...@@ -160,8 +160,7 @@ bool httpMallocMultiCmds(HttpContext *pContext, int32_t cmdSize, int32_t bufferS ...@@ -160,8 +160,7 @@ bool httpMallocMultiCmds(HttpContext *pContext, int32_t cmdSize, int32_t bufferS
free(multiCmds->cmds); free(multiCmds->cmds);
multiCmds->cmds = (HttpSqlCmd *)malloc((size_t)cmdSize * sizeof(HttpSqlCmd)); multiCmds->cmds = (HttpSqlCmd *)malloc((size_t)cmdSize * sizeof(HttpSqlCmd));
if (multiCmds->cmds == NULL) { if (multiCmds->cmds == NULL) {
httpError("context:%p, fd:%d, user:%s, malloc cmds:%d error", pContext, pContext->fd, httpError("context:%p, fd:%d, user:%s, malloc cmds:%d error", pContext, pContext->fd, pContext->user, cmdSize);
pContext->user, cmdSize);
return false; return false;
} }
multiCmds->maxSize = (int16_t)cmdSize; multiCmds->maxSize = (int16_t)cmdSize;
...@@ -350,38 +349,40 @@ char *httpGetCmdsString(HttpContext *pContext, int32_t pos) { ...@@ -350,38 +349,40 @@ char *httpGetCmdsString(HttpContext *pContext, int32_t pos) {
} }
int32_t httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData) { int32_t httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData) {
int32_t err = 0; int32_t err = 0;
z_stream gzipStream = {0}; z_stream gzipStream = {0};
static char dummyHead[2] = { static char dummyHead[2] = {
0x8 + 0x7 * 0x10, 0x8 + 0x7 * 0x10,
(((0x8 + 0x7 * 0x10) * 0x100 + 30) / 31 * 31) & 0xFF, (((0x8 + 0x7 * 0x10) * 0x100 + 30) / 31 * 31) & 0xFF,
}; };
gzipStream.zalloc = (alloc_func) 0; gzipStream.zalloc = (alloc_func)0;
gzipStream.zfree = (free_func) 0; gzipStream.zfree = (free_func)0;
gzipStream.opaque = (voidpf) 0; gzipStream.opaque = (voidpf)0;
gzipStream.next_in = (Bytef *) srcData; gzipStream.next_in = (Bytef *)srcData;
gzipStream.avail_in = 0; gzipStream.avail_in = 0;
gzipStream.next_out = (Bytef *) destData; gzipStream.next_out = (Bytef *)destData;
if (inflateInit2(&gzipStream, 47) != Z_OK) { if (inflateInit2(&gzipStream, 47) != Z_OK) {
return -1; return -1;
} }
while (gzipStream.total_out < *nDestData && gzipStream.total_in < nSrcData) { while (gzipStream.total_out < *nDestData && gzipStream.total_in < nSrcData) {
gzipStream.avail_in = gzipStream.avail_out = nSrcData; //1 gzipStream.avail_in = gzipStream.avail_out = nSrcData; // 1
if ((err = inflate(&gzipStream, Z_NO_FLUSH)) == Z_STREAM_END) { if ((err = inflate(&gzipStream, Z_NO_FLUSH)) == Z_STREAM_END) {
break; break;
} }
if (err != Z_OK) { if (err != Z_OK) {
if (err == Z_DATA_ERROR) { if (err == Z_DATA_ERROR) {
gzipStream.next_in = (Bytef *) dummyHead; gzipStream.next_in = (Bytef *)dummyHead;
gzipStream.avail_in = sizeof(dummyHead); gzipStream.avail_in = sizeof(dummyHead);
if ((err = inflate(&gzipStream, Z_NO_FLUSH)) != Z_OK) { if ((err = inflate(&gzipStream, Z_NO_FLUSH)) != Z_OK) {
return -2; return -2;
} }
} else return -3; } else {
return -3;
}
} }
} }
...@@ -394,23 +395,25 @@ int32_t httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int3 ...@@ -394,23 +395,25 @@ int32_t httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int3
} }
int32_t httpGzipCompressInit(HttpContext *pContext) { int32_t httpGzipCompressInit(HttpContext *pContext) {
pContext->gzipStream.zalloc = (alloc_func) 0; pContext->gzipStream.zalloc = (alloc_func)0;
pContext->gzipStream.zfree = (free_func) 0; pContext->gzipStream.zfree = (free_func)0;
pContext->gzipStream.opaque = (voidpf) 0; pContext->gzipStream.opaque = (voidpf)0;
if (deflateInit2(&pContext->gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) { if (deflateInit2(&pContext->gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) !=
Z_OK) {
return -1; return -1;
} }
return 0; return 0;
} }
int32_t httpGzipCompress(HttpContext *pContext, char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData, bool isTheLast) { int32_t httpGzipCompress(HttpContext *pContext, char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData,
bool isTheLast) {
int32_t err = 0; int32_t err = 0;
int32_t lastTotalLen = (int32_t) (pContext->gzipStream.total_out); int32_t lastTotalLen = (int32_t)(pContext->gzipStream.total_out);
pContext->gzipStream.next_in = (Bytef *) srcData; pContext->gzipStream.next_in = (Bytef *)srcData;
pContext->gzipStream.avail_in = (uLong) nSrcData; pContext->gzipStream.avail_in = (uLong)nSrcData;
pContext->gzipStream.next_out = (Bytef *) destData; pContext->gzipStream.next_out = (Bytef *)destData;
pContext->gzipStream.avail_out = (uLong) (*nDestData); pContext->gzipStream.avail_out = (uLong)(*nDestData);
while (pContext->gzipStream.avail_in != 0) { while (pContext->gzipStream.avail_in != 0) {
if (deflate(&pContext->gzipStream, Z_FULL_FLUSH) != Z_OK) { if (deflate(&pContext->gzipStream, Z_FULL_FLUSH) != Z_OK) {
...@@ -442,12 +445,12 @@ int32_t httpGzipCompress(HttpContext *pContext, char *srcData, int32_t nSrcData, ...@@ -442,12 +445,12 @@ int32_t httpGzipCompress(HttpContext *pContext, char *srcData, int32_t nSrcData,
} }
} }
*nDestData = (int32_t) (pContext->gzipStream.total_out) - lastTotalLen; *nDestData = (int32_t)(pContext->gzipStream.total_out) - lastTotalLen;
return 0; return 0;
} }
bool httpUrlMatch(HttpContext* pContext, int32_t pos, char* cmp) { bool httpUrlMatch(HttpContext *pContext, int32_t pos, char *cmp) {
HttpParser* pParser = pContext->parser; HttpParser *pParser = pContext->parser;
if (pos < 0 || pos >= HTTP_MAX_URL) { if (pos < 0 || pos >= HTTP_MAX_URL) {
return false; return false;
......
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
sleep 2000
system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c wallevel -v 0 system sh/cfg.sh -n dnode1 -c wallevel -v 0
system sh/cfg.sh -n dnode1 -c http -v 1 system sh/cfg.sh -n dnode1 -c http -v 1
system sh/cfg.sh -n dnode1 -c httpEnableRecordSql -v 1 system sh/cfg.sh -n dnode1 -c httpEnableRecordSql -v 1
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect sql connect
print ============================ dnode1 start print ============================ dnode1 start
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册