From fbe9b5f8529fdf0cefbcf419099119d44f6e3321 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 26 Nov 2019 14:30:36 +0800 Subject: [PATCH] [tbase-933] --- src/client/src/tscServer.c | 2 +- src/inc/taosmsg.h | 2 +- src/inc/tglobalcfg.h | 2 + src/rpc/src/trpc.c | 127 +++++++++++++++++++++++++----- src/system/detail/src/mgmtMeter.c | 2 +- src/util/src/tglobalcfg.c | 1 + 6 files changed, 112 insertions(+), 24 deletions(-) mode change 100644 => 100755 src/rpc/src/trpc.c diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 6a66b860d7..d62959295f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -3652,7 +3652,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) { */ if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) { if (pMeterMetaInfo->pMeterMeta) { - tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%d, addr:%p", pSql, + tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%lld, addr:%p", pSql, pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta); } tscWaitingForCreateTable(&pSql->cmd); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 845090826a..3f6e56b682 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -224,7 +224,7 @@ typedef struct { char meterId[TSDB_UNI_LEN]; uint16_t port; // for UDP only char empty[1]; - char msgType; + uint8_t msgType; int32_t msgLen; uint8_t content[0]; } STaosHeader; diff --git a/src/inc/tglobalcfg.h b/src/inc/tglobalcfg.h index 8f0cf79fe6..ede3c97ce9 100644 --- a/src/inc/tglobalcfg.h +++ b/src/inc/tglobalcfg.h @@ -256,6 +256,8 @@ SGlobalConfig *tsGetConfigOption(const char *option); #define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_VALUE_LEN 41 +#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) + #ifdef __cplusplus } #endif diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c old mode 100644 new mode 100755 index 643622dfa7..34d94fcc09 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -14,7 +14,6 @@ */ #include "os.h" - #include "shash.h" #include "taosmsg.h" #include "tidpool.h" @@ -30,6 +29,7 @@ #include "ttimer.h" #include "tudp.h" #include "tutil.h" +#include "lz4.h" #pragma GCC diagnostic ignored "-Wpointer-to-int-cast" @@ -50,8 +50,7 @@ typedef struct { char encrypt; uint8_t secret[TSDB_KEY_LEN]; uint8_t ckey[TSDB_KEY_LEN]; - - uint16_t localPort; // for UDP only + uint16_t localPort; // for UDP only uint32_t peerUid; uint32_t peerIp; // peer IP uint16_t peerPort; // peer port @@ -66,7 +65,7 @@ typedef struct { void * chandle; // handle passed by TCP/UDP connection layer void * ahandle; // handle returned by upper app layter int retry; - int tretry; // total retry + int tretry; // total retry void * pTimer; void * pIdleTimer; char * pRspMsg; @@ -79,7 +78,7 @@ typedef struct { typedef struct { int sessions; - void * qhandle; // for scheduler + void * qhandle; // for scheduler SRpcConn * connList; void * idPool; void * tmrCtrl; @@ -94,11 +93,11 @@ typedef struct rpc_server { int mask; int numOfChanns; int numOfThreads; - int idMgmt; // ID management method + int idMgmt; // ID management method int type; - int idleTime; // milliseconds; - int noFree; // do not free the request msg when rsp is received - int index; // for UDP server, next thread for new connection + int idleTime; // milliseconds; + int noFree; // do not free the request msg when rsp is received + int index; // for UDP server, next thread for new connection uint16_t localPort; char label[12]; void *(*fp)(char *, void *ahandle, void *thandle); @@ -107,8 +106,7 @@ typedef struct rpc_server { SRpcChann *channList; } STaosRpc; - -int tsRpcProgressTime = 10; // milliseocnds +int tsRpcProgressTime = 10; // milliseocnds // not configurable int tsRpcMaxRetry; @@ -141,6 +139,89 @@ void taosProcessSchedMsg(SSchedMsg *pMsg); int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); +static int32_t taosCompressRpcMsg(char* pCont, int32_t contLen) { + STaosHeader* pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader)); + int32_t overhead = sizeof(int32_t) * 2; + int32_t finalLen = 0; + + if (!NEEDTO_COMPRESSS_MSG(contLen)) { + return contLen; + } + + char *buf = malloc (contLen + overhead + 8); // 16 extra bytes + if (buf == NULL) { + tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno)); + return contLen; + } + + int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); + + /* + * only the compressed size is less than the value of contLen - overhead, the compression is applied + * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message + */ + if (compLen < contLen - overhead) { + //tDump(pCont, contLen); + int32_t *pLen = (int32_t *)pCont; + + *pLen = 0; // first 4 bytes must be zero + pLen = (int32_t *)(pCont + sizeof(int32_t)); + + *pLen = htonl(contLen); // contLen is encoded in second 4 bytes + memcpy(pCont + overhead, buf, compLen); + + pHeader->comp = 1; + tTrace("compress rpc msg, before:%lld, after:%lld", contLen, compLen); + + finalLen = compLen + overhead; + //tDump(pCont, contLen); + } else { + finalLen = contLen; + } + + free(buf); + return finalLen; +} + +static STaosHeader* taosDecompressRpcMsg(STaosHeader* pHeader, SSchedMsg* pSchedMsg, int32_t msgLen) { + int overhead = sizeof(int32_t) * 2; + + if (pHeader->comp == 0) { + pSchedMsg->msg = (char *)(&(pHeader->destId)); + return pHeader; + } + + // decompress the content + assert(GET_INT32_VAL(pHeader->content) == 0); + + // contLen is original message length before compression applied + int contLen = htonl(GET_INT32_VAL(pHeader->content + sizeof(int32_t))); + + // prepare the temporary buffer to decompress message + char *buf = malloc(sizeof(STaosHeader) + contLen); + + //tDump(pHeader->content, msgLen); + + if (buf) { + int32_t originalLen = LZ4_decompress_safe(pHeader->content + overhead, buf + sizeof(STaosHeader), + msgLen - overhead, contLen); + + memcpy(buf, pHeader, sizeof(STaosHeader)); + free(pHeader); // free the compressed message buffer + + STaosHeader* pNewHeader = (STaosHeader *) buf; + pNewHeader->msgLen = originalLen + (int) sizeof(SIntMsg); + assert(originalLen == contLen); + + pSchedMsg->msg = (char *)(&(pNewHeader->destId)); + //tDump(pHeader->content, contLen); + return pNewHeader; + } else { + tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno)); + pSchedMsg->msg = NULL; + } +} + char *taosBuildReqHeader(void *param, char type, char *msg) { STaosHeader *pHeader; SRpcConn * pConn = (SRpcConn *)param; @@ -1074,8 +1155,9 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por if (code != 0) { // parsing error - if (pHeader->msgType & 1) { + if (pHeader->msgType & 1U) { memset(pReply, 0, sizeof(pReply)); + msgLen = taosBuildErrorMsgToPeer(data, code, pReply); (*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle); tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid, @@ -1090,17 +1172,17 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por // parsing OK // internal communication is based on TAOS protocol, a trick here to make it efficient - pHeader->msgLen = msgLen - (int)sizeof(STaosHeader) + (int)sizeof(SIntMsg); - if (pHeader->spi) pHeader->msgLen -= sizeof(STaosDigest); + if (pHeader->spi) msgLen -= sizeof(STaosDigest); + msgLen -= (int)sizeof(STaosHeader); + pHeader->msgLen = msgLen + (int)sizeof(SIntMsg); - if ((pHeader->msgType & 1) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) { + if ((pHeader->msgType & 1U) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) { schedMsg.msg = NULL; // connection shall be closed } else { - schedMsg.msg = (char *)(&(pHeader->destId)); - // memcpy(schedMsg.msg, (char *)(&(pHeader->destId)), pHeader->msgLen); + pHeader = taosDecompressRpcMsg(pHeader, &schedMsg, msgLen); } - if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { + if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16U)) { tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", pServer->label, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, pConn->pTimer); } @@ -1132,9 +1214,12 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) { pChann = pServer->channList + pConn->chann; pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader)); msg = (char *)pHeader; - msgLen = contLen + (int32_t)sizeof(STaosHeader); - if ((pHeader->msgType & 1) == 0 && pConn->localPort) pHeader->port = pConn->localPort; + if ((pHeader->msgType & 1U) == 0 && pConn->localPort) pHeader->port = pConn->localPort; + + contLen = taosCompressRpcMsg(pCont, contLen); + + msgLen = contLen + (int32_t)sizeof(STaosHeader); if (pConn->spi) { // add auth part @@ -1151,7 +1236,7 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) { pthread_mutex_lock(&pChann->mutex); msgType = pHeader->msgType; - if ((msgType & 1) == 0) { + if ((msgType & 1U) == 0) { // response pConn->inType = 0; tfree(pConn->pRspMsg); diff --git a/src/system/detail/src/mgmtMeter.c b/src/system/detail/src/mgmtMeter.c index f1e12d763f..006fd58a8a 100644 --- a/src/system/detail/src/mgmtMeter.c +++ b/src/system/detail/src/mgmtMeter.c @@ -675,7 +675,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) { // send create message to the selected vnode servers if (pCreate->numOfTags == 0) { - mTrace("table:%s, send create msg to dnode, vgId:%d, sid:%d, vnode:%d", + mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d", pMeter->meterId, pMeter->gid.vgId, pMeter->gid.sid, pVgroup->vnodeGid[0].vnode); grantAddTimeSeries(pMeter->numOfColumns - 1); diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index 0dd0e4e2ba..cef11d30cb 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -644,6 +644,7 @@ static void doInitGlobalConfig() { tsInitConfigOption(cfg++, "defaultPass", tsDefaultPass, TSDB_CFG_VTYPE_STRING, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT, 0, 0, TSDB_PASSWORD_LEN, TSDB_CFG_UTYPE_NONE); + // socket type, udp by default tsInitConfigOption(cfg++, "sockettype", tsSocketType, TSDB_CFG_VTYPE_STRING, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW, -- GitLab