diff --git a/packaging/cfg/taos.cfg b/packaging/cfg/taos.cfg index 1c9674338138e56c95dd0b395417a366f06fdad8..9d61b0df68f3480834fddacd5d701ee0559352e8 100644 --- a/packaging/cfg/taos.cfg +++ b/packaging/cfg/taos.cfg @@ -58,6 +58,12 @@ # The server and client should have the same socket type. Otherwise, connect will fail. # sockettype udp +# The compressed rpc message, option: +# -1 (no compression) +# 0 (all message compressed), +# > 0 (rpc message body which larger than this value will be compressed) +# compressMsgSize -1 + # RPC re-try timer, millisecond # rpcTimer 300 diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index f6c4dd2287dfa75bb482995724498e43eaf0e8e0..76e501b2db3fc00610a0e323eb20755b024a3b85 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -3652,7 +3652,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) { */ if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) { if (pMeterMetaInfo->pMeterMeta) { - tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%d, addr:%p", pSql, + tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%lld, addr:%p", pSql, pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta); } tscWaitingForCreateTable(&pSql->cmd); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b66897d8e1e454d9076cc1c40bb06b458c436ae8..256d9b7a37a872ba3af71ec038861d9c4f70877a 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -225,7 +225,7 @@ typedef struct { char meterId[TSDB_UNI_LEN]; uint16_t port; // for UDP only char empty[1]; - char msgType; + uint8_t msgType; int32_t msgLen; uint8_t content[0]; } STaosHeader; diff --git a/src/inc/tglobalcfg.h b/src/inc/tglobalcfg.h index 8f0cf79fe63224b78be74c471b4b47eb30e7b000..ede3c97ce9f874152e8b81d78b6b1b19adc49b49 100644 --- a/src/inc/tglobalcfg.h +++ b/src/inc/tglobalcfg.h @@ -256,6 +256,8 @@ SGlobalConfig *tsGetConfigOption(const char *option); #define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_VALUE_LEN 41 +#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) + #ifdef __cplusplus } #endif diff --git a/src/inc/tnote.h b/src/inc/tnote.h index 1b53be22a29aa0cdc93bf7f7502208dbfc6cb88b..4f86736be4b1075d3a6a897ce1d996b136c11b70 100644 --- a/src/inc/tnote.h +++ b/src/inc/tnote.h @@ -20,7 +20,6 @@ extern "C" { #endif -#include "unistd.h" #include "os.h" #include "tutil.h" #include "tglobalcfg.h" diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index e2897da698b3891ddcf06e8ba17813008bef10d8..2eb98935566a74f712c3dc3db3880ecf84d066fd 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -21,6 +21,7 @@ #include "shellCommand.h" #include "ttime.h" #include "tutil.h" +#include /**************** Global variables ****************/ #ifdef WINDOWS diff --git a/src/os/windows/inc/os.h b/src/os/windows/inc/os.h index a1a4bdfa5c47e97010c05451a97ea363de928d60..9c0add2c319829e10d192de8a94dc43038c3155d 100644 --- a/src/os/windows/inc/os.h +++ b/src/os/windows/inc/os.h @@ -16,20 +16,30 @@ #ifndef TDENGINE_PLATFORM_WINDOWS_H #define TDENGINE_PLATFORM_WINDOWS_H +#include +#include +#include +#include +#include +#include +#include +#include #include +#include +#include +#include +#include +#include +#include #include +#include +#include #include -#include -#include -#include -#include +#include +#include +#include #include "winsock2.h" #include -#include -#include -#include -#include -#include #ifdef __cplusplus extern "C" { @@ -366,6 +376,8 @@ int fsendfile(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count); char *strndup(const char *s, size_t n); +void taosSetCoreDump(); + #ifdef __cplusplus } #endif diff --git a/src/os/windows/src/twindows.c b/src/os/windows/src/twindows.c index 9089f90d6ad8b0ae2c3bfeee9f1043c29926f144..98be6b60ba16e52b2177971d95930f7f717785aa 100644 --- a/src/os/windows/src/twindows.c +++ b/src/os/windows/src/twindows.c @@ -394,4 +394,6 @@ char *strndup(const char *s, size_t n) { memcpy(r, s, len); r[len] = 0; return r; -} \ No newline at end of file +} + +void taosSetCoreDump() {} \ No newline at end of file diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c old mode 100644 new mode 100755 index c855a4dd8233d94f82b159f189f53426f10e691f..d84746817dd524fbb9e7a5b4aea1631ffda1c9ab --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -14,7 +14,6 @@ */ #include "os.h" - #include "shash.h" #include "taosmsg.h" #include "tidpool.h" @@ -30,6 +29,7 @@ #include "ttimer.h" #include "tudp.h" #include "tutil.h" +#include "lz4.h" #pragma GCC diagnostic ignored "-Wpointer-to-int-cast" @@ -50,8 +50,7 @@ typedef struct { char encrypt; uint8_t secret[TSDB_KEY_LEN]; uint8_t ckey[TSDB_KEY_LEN]; - - uint16_t localPort; // for UDP only + uint16_t localPort; // for UDP only uint32_t peerUid; uint32_t peerIp; // peer IP uint16_t peerPort; // peer port @@ -66,7 +65,7 @@ typedef struct { void * chandle; // handle passed by TCP/UDP connection layer void * ahandle; // handle returned by upper app layter int retry; - int tretry; // total retry + int tretry; // total retry void * pTimer; void * pIdleTimer; char * pRspMsg; @@ -79,7 +78,7 @@ typedef struct { typedef struct { int sessions; - void * qhandle; // for scheduler + void * qhandle; // for scheduler SRpcConn * connList; void * idPool; void * tmrCtrl; @@ -94,11 +93,11 @@ typedef struct rpc_server { int mask; int numOfChanns; int numOfThreads; - int idMgmt; // ID management method + int idMgmt; // ID management method int type; - int idleTime; // milliseconds; - int noFree; // do not free the request msg when rsp is received - int index; // for UDP server, next thread for new connection + int idleTime; // milliseconds; + int noFree; // do not free the request msg when rsp is received + int index; // for UDP server, next thread for new connection uint16_t localPort; char label[12]; void *(*fp)(char *, void *ahandle, void *thandle); @@ -107,8 +106,7 @@ typedef struct rpc_server { SRpcChann *channList; } STaosRpc; - -int tsRpcProgressTime = 10; // milliseocnds +int tsRpcProgressTime = 10; // milliseocnds // not configurable int tsRpcMaxRetry; @@ -141,6 +139,89 @@ void taosProcessSchedMsg(SSchedMsg *pMsg); int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); +static int32_t taosCompressRpcMsg(char* pCont, int32_t contLen) { + STaosHeader* pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader)); + int32_t overhead = sizeof(int32_t) * 2; + int32_t finalLen = 0; + + if (!NEEDTO_COMPRESSS_MSG(contLen)) { + return contLen; + } + + char *buf = malloc (contLen + overhead + 8); // 16 extra bytes + if (buf == NULL) { + tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno)); + return contLen; + } + + int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); + + /* + * only the compressed size is less than the value of contLen - overhead, the compression is applied + * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message + */ + if (compLen < contLen - overhead) { + //tDump(pCont, contLen); + int32_t *pLen = (int32_t *)pCont; + + *pLen = 0; // first 4 bytes must be zero + pLen = (int32_t *)(pCont + sizeof(int32_t)); + + *pLen = htonl(contLen); // contLen is encoded in second 4 bytes + memcpy(pCont + overhead, buf, compLen); + + pHeader->comp = 1; + tTrace("compress rpc msg, before:%lld, after:%lld", contLen, compLen); + + finalLen = compLen + overhead; + //tDump(pCont, contLen); + } else { + finalLen = contLen; + } + + free(buf); + return finalLen; +} + +static STaosHeader* taosDecompressRpcMsg(STaosHeader* pHeader, SSchedMsg* pSchedMsg, int32_t msgLen) { + int overhead = sizeof(int32_t) * 2; + + if (pHeader->comp == 0) { + pSchedMsg->msg = (char *)(&(pHeader->destId)); + return pHeader; + } + + // decompress the content + assert(GET_INT32_VAL(pHeader->content) == 0); + + // contLen is original message length before compression applied + int contLen = htonl(GET_INT32_VAL(pHeader->content + sizeof(int32_t))); + + // prepare the temporary buffer to decompress message + char *buf = malloc(sizeof(STaosHeader) + contLen); + + //tDump(pHeader->content, msgLen); + + if (buf) { + int32_t originalLen = LZ4_decompress_safe(pHeader->content + overhead, buf + sizeof(STaosHeader), + msgLen - overhead, contLen); + + memcpy(buf, pHeader, sizeof(STaosHeader)); + free(pHeader); // free the compressed message buffer + + STaosHeader* pNewHeader = (STaosHeader *) buf; + pNewHeader->msgLen = originalLen + (int) sizeof(SIntMsg); + assert(originalLen == contLen); + + pSchedMsg->msg = (char *)(&(pNewHeader->destId)); + //tDump(pHeader->content, contLen); + return pNewHeader; + } else { + tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno)); + pSchedMsg->msg = NULL; + } +} + char *taosBuildReqHeader(void *param, char type, char *msg) { STaosHeader *pHeader; SRpcConn * pConn = (SRpcConn *)param; @@ -1076,8 +1157,9 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por if (code != 0) { // parsing error - if (pHeader->msgType & 1) { + if (pHeader->msgType & 1U) { memset(pReply, 0, sizeof(pReply)); + msgLen = taosBuildErrorMsgToPeer(data, code, pReply); (*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle); tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid, @@ -1092,17 +1174,17 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por // parsing OK // internal communication is based on TAOS protocol, a trick here to make it efficient - pHeader->msgLen = msgLen - (int)sizeof(STaosHeader) + (int)sizeof(SIntMsg); - if (pHeader->spi) pHeader->msgLen -= sizeof(STaosDigest); + if (pHeader->spi) msgLen -= sizeof(STaosDigest); + msgLen -= (int)sizeof(STaosHeader); + pHeader->msgLen = msgLen + (int)sizeof(SIntMsg); - if ((pHeader->msgType & 1) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) { + if ((pHeader->msgType & 1U) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) { schedMsg.msg = NULL; // connection shall be closed } else { - schedMsg.msg = (char *)(&(pHeader->destId)); - // memcpy(schedMsg.msg, (char *)(&(pHeader->destId)), pHeader->msgLen); + pHeader = taosDecompressRpcMsg(pHeader, &schedMsg, msgLen); } - if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { + if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16U)) { tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", pServer->label, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, pConn->pTimer); } @@ -1134,9 +1216,12 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) { pChann = pServer->channList + pConn->chann; pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader)); msg = (char *)pHeader; - msgLen = contLen + (int32_t)sizeof(STaosHeader); - if ((pHeader->msgType & 1) == 0 && pConn->localPort) pHeader->port = pConn->localPort; + if ((pHeader->msgType & 1U) == 0 && pConn->localPort) pHeader->port = pConn->localPort; + + contLen = taosCompressRpcMsg(pCont, contLen); + + msgLen = contLen + (int32_t)sizeof(STaosHeader); if (pConn->spi) { // add auth part @@ -1153,7 +1238,7 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) { pthread_mutex_lock(&pChann->mutex); msgType = pHeader->msgType; - if ((msgType & 1) == 0) { + if ((msgType & 1U) == 0) { // response pConn->inType = 0; tfree(pConn->pRspMsg); diff --git a/src/sdb/src/sdbEngine.c b/src/sdb/src/sdbEngine.c index e0a61f91c56d2b4138325190f9ffe3515729a49f..1cc487c285c7bdd3168c7558f143b651469885b6 100644 --- a/src/sdb/src/sdbEngine.c +++ b/src/sdb/src/sdbEngine.c @@ -346,10 +346,16 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { int real_size = 0; /* char action = SDB_TYPE_INSERT; */ - if (pTable == NULL) return -1; + if (pTable == NULL) { + sdbError("sdb tables is null"); + return -1; + } if ((pTable->keyType != SDB_KEYTYPE_AUTO) || *((int64_t *)row)) - if (sdbGetRow(handle, row)) return -1; + if (sdbGetRow(handle, row)) { + sdbError("table:%s, failed to insert record, sdbVersion:%d", pTable->name, sdbVersion); + return -1; + } total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); SRowHead *rowHead = (SRowHead *)malloc(total_size); @@ -408,24 +414,26 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { pTable->numOfRows++; switch (pTable->keyType) { case SDB_KEYTYPE_STRING: - sdbTrace( - "table:%s, a record is inserted:%s, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld", - pTable->name, (char *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size); + sdbTrace("table:%s, a record is inserted:%s, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld", + pTable->name, (char *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size); + break; + case SDB_KEYTYPE_UINT32: //dnodes or mnodes + sdbTrace("table:%s, a record is inserted:%s, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld", + pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size); break; - case SDB_KEYTYPE_UINT32: case SDB_KEYTYPE_AUTO: - sdbTrace( - "table:%s, a record is inserted:%d, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld", - pTable->name, *(int32_t *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size); + sdbTrace("table:%s, a record is inserted:%d, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld", + pTable->name, *(int32_t *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size); break; default: - sdbTrace( - "table:%s, a record is inserted, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld", - pTable->name, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size); + sdbTrace("table:%s, a record is inserted, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld", + pTable->name, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size); break; } id = rowMeta.id; + } else { + sdbError("table:%s, failed to insert record", pTable->name); } tfree(rowHead); @@ -509,15 +517,16 @@ int sdbDeleteRow(void *handle, void *row) { sdbAddIntoUpdateList(pTable, SDB_TYPE_DELETE, pMetaRow); switch (pTable->keyType) { case SDB_KEYTYPE_STRING: - sdbTrace( - "table:%s, a record is deleted:%s, sdbVersion:%ld id:%ld numOfRows:%d", - pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows); + sdbTrace("table:%s, a record is deleted:%s, sdbVersion:%ld id:%ld numOfRows:%d", + pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows); + break; + case SDB_KEYTYPE_UINT32: //dnodes or mnodes + sdbTrace("table:%s, a record is deleted:%s, sdbVersion:%ld id:%ld numOfRows:%d", + pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id, pTable->numOfRows); break; - case SDB_KEYTYPE_UINT32: case SDB_KEYTYPE_AUTO: - sdbTrace( - "table:%s, a record is deleted:%d, sdbVersion:%ld id:%ld numOfRows:%d", - pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows); + sdbTrace("table:%s, a record is deleted:%d, sdbVersion:%ld id:%ld numOfRows:%d", + pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows); break; default: sdbTrace("table:%s, a record is deleted, sdbVersion:%ld id:%ld numOfRows:%d", @@ -610,15 +619,16 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) { switch (pTable->keyType) { case SDB_KEYTYPE_STRING: - sdbTrace( - "table:%s, a record is updated:%s, sdbVersion:%ld id:%ld numOfRows:%d", - pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows); + sdbTrace("table:%s, a record is updated:%s, sdbVersion:%ld id:%ld numOfRows:%d", + pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows); + break; + case SDB_KEYTYPE_UINT32: //dnodes or mnodes + sdbTrace("table:%s, a record is updated:%d, sdbVersion:%ld id:%ld numOfRows:%d", + pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id, pTable->numOfRows); break; - case SDB_KEYTYPE_UINT32: case SDB_KEYTYPE_AUTO: - sdbTrace( - "table:%s, a record is updated:%d, sdbVersion:%ld id:%ld numOfRows:%d", - pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows); + sdbTrace("table:%s, a record is updated:%d, sdbVersion:%ld id:%ld numOfRows:%d", + pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows); break; default: sdbTrace("table:%s, a record is updated, sdbVersion:%ld id:%ld numOfRows:%d", pTable->name, sdbVersion, diff --git a/src/system/detail/src/mgmtMeter.c b/src/system/detail/src/mgmtMeter.c index f1e12d763f1494c6d4e4f8ffe133fda21ad7bd71..006fd58a8ad847425fa8189aa0be85e3a224ed0e 100644 --- a/src/system/detail/src/mgmtMeter.c +++ b/src/system/detail/src/mgmtMeter.c @@ -675,7 +675,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) { // send create message to the selected vnode servers if (pCreate->numOfTags == 0) { - mTrace("table:%s, send create msg to dnode, vgId:%d, sid:%d, vnode:%d", + mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d", pMeter->meterId, pMeter->gid.vgId, pMeter->gid.sid, pVgroup->vnodeGid[0].vnode); grantAddTimeSeries(pMeter->numOfColumns - 1); diff --git a/src/system/detail/src/vnodeFile.c b/src/system/detail/src/vnodeFile.c index ae92ce44a305303ce548b473256d16c00305f9d0..ecdb70de74cf0aa46eaebb528a3e24a76a81ff4a 100644 --- a/src/system/detail/src/vnodeFile.c +++ b/src/system/detail/src/vnodeFile.c @@ -114,6 +114,7 @@ int vnodeCreateHeadDataFile(int vnode, int fileId, char *headName, char *dataNam char *path = vnodeGetDataDir(vnode, fileId); if (path == NULL) { + dError("vid:%d, fileId:%d, failed to get dataDir", vnode, fileId); return -1; } diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index a78c388441bba97a5779d97a9c57eeafcf133db4..1bc0f6370ec296f62df395f146425c0f18b7d528 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -2952,11 +2952,11 @@ static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY); pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY); - if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1; - pVnodeFiles->dataFileSize = fstat.st_size; - - if (stat(pVnodeFiles->lastFilePath, &fstat) < 0) return -1; - pVnodeFiles->lastFileSize = fstat.st_size; +// if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1; +// pVnodeFiles->dataFileSize = fstat.st_size; +// +// if (stat(pVnodeFiles->lastFilePath, &fstat) < 0) return -1; +// pVnodeFiles->lastFileSize = fstat.st_size; #if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP /* enforce kernel to preload data when the file is mapping */ diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 0f9565a3b5d42db5532cea9d9d5411a90faf312b..14b50acf45134c7717fc000f6543c738d765ad35 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -483,13 +483,9 @@ void vnodeFreeQInfo(void *param, bool decQueryRef) { } tfree(pQuery->pGroupbyExpr); - dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId); - /* - * destory signature, in order to avoid the query process pass the object - * safety check - */ + //destroy signature, in order to avoid the query process pass the object safety check memset(pQInfo, 0, sizeof(SQInfo)); tfree(pQInfo); } diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 91e6c9527b2bf0ea8f33eecd64121b9a5472ced0..b963b9d1c00d656e7262e9486f0bba7b62d9b304 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -88,28 +88,32 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { } } - // if ( vnodeList[vnode].status != TSDB_STATUS_MASTER && pMsg->msgType != TSDB_MSG_TYPE_RETRIEVE ) { + dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle); -#ifdef CLUSTER - if (vnodeList[vnode].vnodeStatus != TSDB_VN_STATUS_MASTER) { - taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); - dTrace("vid:%d sid:%d, shell msg is ignored since in state:%d", vnode, sid, vnodeList[vnode].vnodeStatus); - } else { -#endif - dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle); - - if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) { + if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) { + if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER || vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) { vnodeProcessQueryRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); - } else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { - vnodeProcessRetrieveRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); - } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { - vnodeProcessShellSubmitRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); } else { - dError("%s is not processed", taosMsg[pMsg->msgType]); + taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); + dTrace("vid:%d sid:%d, shell query msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); + } + } else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { + if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER || vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) { + vnodeProcessRetrieveRequest((char *) pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); + } else { + taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); + dTrace("vid:%d sid:%d, shell retrieve msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); } -#ifdef CLUSTER + } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { + if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER) { + vnodeProcessShellSubmitRequest((char *) pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); + } else { + taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); + dTrace("vid:%d sid:%d, shell submit msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); + } + } else { + dError("%s is not processed", taosMsg[pMsg->msgType]); } -#endif return pObj; } diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index e73428353e76bfda2b11f86efeadd44b97d73138..5e84f3feadbf4f8cee66c334ac62465ed081936e 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -37,7 +37,6 @@ ELSEIF (TD_WINDOWS_64) LIST(APPEND SRC ./src/ihash.c) LIST(APPEND SRC ./src/lz4.c) LIST(APPEND SRC ./src/shash.c) - LIST(APPEND SRC ./src/sql.c) LIST(APPEND SRC ./src/tbase64.c) LIST(APPEND SRC ./src/tcache.c) LIST(APPEND SRC ./src/tcompression.c) @@ -59,8 +58,6 @@ ELSEIF (TD_WINDOWS_64) LIST(APPEND SRC ./src/tskiplist.c) LIST(APPEND SRC ./src/tsocket.c) LIST(APPEND SRC ./src/tstatus.c) - LIST(APPEND SRC ./src/tstoken.c) - LIST(APPEND SRC ./src/tstoken.c) LIST(APPEND SRC ./src/tstrbuild.c) LIST(APPEND SRC ./src/ttime.c) LIST(APPEND SRC ./src/ttimer.c) diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index 0dd0e4e2ba76994b966dd719df8ad5fd34e321a5..cef11d30cba8fe4488a0cc6adcad7b4143f4fe8d 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -644,6 +644,7 @@ static void doInitGlobalConfig() { tsInitConfigOption(cfg++, "defaultPass", tsDefaultPass, TSDB_CFG_VTYPE_STRING, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT, 0, 0, TSDB_PASSWORD_LEN, TSDB_CFG_UTYPE_NONE); + // socket type, udp by default tsInitConfigOption(cfg++, "sockettype", tsSocketType, TSDB_CFG_VTYPE_STRING, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW, diff --git a/src/util/src/version.c b/src/util/src/version.c index 96e7ad4eadd0557795ef44edaae07c3c568308f0..35cd40a942b3f6f74f3d1dd565130201a8e7b0f6 100644 --- a/src/util/src/version.c +++ b/src/util/src/version.c @@ -1,4 +1,4 @@ char version[64] = "1.6.4.0"; char compatible_version[64] = "1.6.1.0"; -char gitinfo[128] = "d04354a8ac2f7dd9ba521d755e5d484a203783d9"; -char buildinfo[512] = "Built by root at 2019-11-11 10:23"; +char gitinfo[128] = "b6e308866e315483915f4c42a2717547ed0b9d36"; +char buildinfo[512] = "Built by ubuntu at 2019-11-26 21:56";