提交 37a0b9c7 编写于 作者: dengyihao's avatar dengyihao

support compress

上级 4507776c
...@@ -82,8 +82,8 @@ typedef struct SRpcInit { ...@@ -82,8 +82,8 @@ typedef struct SRpcInit {
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int32_t idleTime; // milliseconds, 0 means idle timer is disabled int32_t idleTime; // milliseconds, 0 means idle timer is disabled
const int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
const int8_t encryption; // encrypt or not int8_t encryption; // encrypt or not
// the following is for client app ecurity only // the following is for client app ecurity only
char *user; // user name char *user; // user name
......
...@@ -71,7 +71,8 @@ static void deregisterRequest(SRequestObj *pRequest) { ...@@ -71,7 +71,8 @@ static void deregisterRequest(SRequestObj *pRequest) {
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
int64_t duration = taosGetTimestampUs() - pRequest->metric.start; int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%.2f ms, " tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64
" elapsed:%.2f ms, "
"current:%d, app current:%d", "current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
...@@ -84,7 +85,7 @@ static void deregisterRequest(SRequestObj *pRequest) { ...@@ -84,7 +85,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
"us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%"PRIx64, "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%" PRIx64,
duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd, pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
pRequest->metric.planEnd - pRequest->metric.semanticEnd, pRequest->metric.planEnd - pRequest->metric.semanticEnd,
...@@ -144,6 +145,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { ...@@ -144,6 +145,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user; rpcInit.user = (char *)user;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.compressSize = tsCompressMsgSize;
void *pDnodeConn = rpcOpen(&rpcInit); void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) { if (pDnodeConn == NULL) {
tscError("failed to init connection to server"); tscError("failed to init connection to server");
......
...@@ -868,10 +868,10 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { ...@@ -868,10 +868,10 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
return code; return code;
} }
//todo refacto the error code mgmt // todo refacto the error code mgmt
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*)param; SRequestObj* pRequest = (SRequestObj*)param;
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
pRequest->code = code; pRequest->code = code;
if (pResult) { if (pResult) {
...@@ -899,8 +899,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { ...@@ -899,8 +899,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
pRequest->requestId); pRequest->requestId);
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) { if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self,
pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId); tstrerror(code), pRequest->retry, pRequest->requestId);
pRequest->prevCode = code; pRequest->prevCode = code;
schedulerFreeJob(&pRequest->body.queryJob, 0); schedulerFreeJob(&pRequest->body.queryJob, 0);
qDestroyQuery(pRequest->pQuery); qDestroyQuery(pRequest->pQuery);
...@@ -1970,6 +1970,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de ...@@ -1970,6 +1970,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
rpcInit.sessions = 16; rpcInit.sessions = 16;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.user = "_dnd"; rpcInit.user = "_dnd";
clientRpc = rpcOpen(&rpcInit); clientRpc = rpcOpen(&rpcInit);
......
...@@ -277,6 +277,7 @@ int32_t dmInitClient(SDnode *pDnode) { ...@@ -277,6 +277,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.parent = pDnode; rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp; rpcInit.rfp = rpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
pTrans->clientRpc = rpcOpen(&rpcInit); pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) { if (pTrans->clientRpc == NULL) {
......
...@@ -657,7 +657,8 @@ int32_t udfdOpenClientRpc() { ...@@ -657,7 +657,8 @@ int32_t udfdOpenClientRpc() {
rpcInit.user = TSDB_DEFAULT_USER; rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.parent = &global; rpcInit.parent = &global;
rpcInit.rfp = udfdRpcRfp; rpcInit.rfp = udfdRpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
global.clientRpc = rpcOpen(&rpcInit); global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) { if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client"); fnError("failed to init dnode rpc client");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册