提交 40c1d4d4 编写于 作者: L lihui

Merge branch 'develop' into feature/lihui

...@@ -58,6 +58,12 @@ ...@@ -58,6 +58,12 @@
# The server and client should have the same socket type. Otherwise, connect will fail. # The server and client should have the same socket type. Otherwise, connect will fail.
# sockettype udp # sockettype udp
# The compressed rpc message, option:
# -1 (no compression)
# 0 (all message compressed),
# > 0 (rpc message body which larger than this value will be compressed)
# compressMsgSize -1
# RPC re-try timer, millisecond # RPC re-try timer, millisecond
# rpcTimer 300 # rpcTimer 300
......
...@@ -3652,7 +3652,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) { ...@@ -3652,7 +3652,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) {
*/ */
if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) { if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) {
if (pMeterMetaInfo->pMeterMeta) { if (pMeterMetaInfo->pMeterMeta) {
tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%d, addr:%p", pSql, tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%lld, addr:%p", pSql,
pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta); pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta);
} }
tscWaitingForCreateTable(&pSql->cmd); tscWaitingForCreateTable(&pSql->cmd);
......
...@@ -225,7 +225,7 @@ typedef struct { ...@@ -225,7 +225,7 @@ typedef struct {
char meterId[TSDB_UNI_LEN]; char meterId[TSDB_UNI_LEN];
uint16_t port; // for UDP only uint16_t port; // for UDP only
char empty[1]; char empty[1];
char msgType; uint8_t msgType;
int32_t msgLen; int32_t msgLen;
uint8_t content[0]; uint8_t content[0];
} STaosHeader; } STaosHeader;
......
...@@ -256,6 +256,8 @@ SGlobalConfig *tsGetConfigOption(const char *option); ...@@ -256,6 +256,8 @@ SGlobalConfig *tsGetConfigOption(const char *option);
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
extern "C" { extern "C" {
#endif #endif
#include "unistd.h"
#include "os.h" #include "os.h"
#include "tutil.h" #include "tutil.h"
#include "tglobalcfg.h" #include "tglobalcfg.h"
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "shellCommand.h" #include "shellCommand.h"
#include "ttime.h" #include "ttime.h"
#include "tutil.h" #include "tutil.h"
#include <regex.h>
/**************** Global variables ****************/ /**************** Global variables ****************/
#ifdef WINDOWS #ifdef WINDOWS
......
...@@ -16,20 +16,30 @@ ...@@ -16,20 +16,30 @@
#ifndef TDENGINE_PLATFORM_WINDOWS_H #ifndef TDENGINE_PLATFORM_WINDOWS_H
#define TDENGINE_PLATFORM_WINDOWS_H #define TDENGINE_PLATFORM_WINDOWS_H
#include <assert.h>
#include <ctype.h>
#include <direct.h>
#include <errno.h>
#include <fcntl.h>
#include <float.h>
#include <locale.h>
#include <intrin.h>
#include <io.h> #include <io.h>
#include <math.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h> #include <signal.h>
#include <stdint.h> #include <sys/stat.h>
#include <stdbool.h> #include <sys/types.h>
#include <pthread.h> #include <time.h>
#include <direct.h>
#include "winsock2.h" #include "winsock2.h"
#include <WS2tcpip.h> #include <WS2tcpip.h>
#include <assert.h>
#include <math.h>
#include <string.h>
#include <assert.h>
#include <intrin.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -366,6 +376,8 @@ int fsendfile(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count); ...@@ -366,6 +376,8 @@ int fsendfile(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count);
char *strndup(const char *s, size_t n); char *strndup(const char *s, size_t n);
void taosSetCoreDump();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -394,4 +394,6 @@ char *strndup(const char *s, size_t n) { ...@@ -394,4 +394,6 @@ char *strndup(const char *s, size_t n) {
memcpy(r, s, len); memcpy(r, s, len);
r[len] = 0; r[len] = 0;
return r; return r;
} }
\ No newline at end of file
void taosSetCoreDump() {}
\ No newline at end of file
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
*/ */
#include "os.h" #include "os.h"
#include "shash.h" #include "shash.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tidpool.h" #include "tidpool.h"
...@@ -30,6 +29,7 @@ ...@@ -30,6 +29,7 @@
#include "ttimer.h" #include "ttimer.h"
#include "tudp.h" #include "tudp.h"
#include "tutil.h" #include "tutil.h"
#include "lz4.h"
#pragma GCC diagnostic ignored "-Wpointer-to-int-cast" #pragma GCC diagnostic ignored "-Wpointer-to-int-cast"
...@@ -50,8 +50,7 @@ typedef struct { ...@@ -50,8 +50,7 @@ typedef struct {
char encrypt; char encrypt;
uint8_t secret[TSDB_KEY_LEN]; uint8_t secret[TSDB_KEY_LEN];
uint8_t ckey[TSDB_KEY_LEN]; uint8_t ckey[TSDB_KEY_LEN];
uint16_t localPort; // for UDP only
uint16_t localPort; // for UDP only
uint32_t peerUid; uint32_t peerUid;
uint32_t peerIp; // peer IP uint32_t peerIp; // peer IP
uint16_t peerPort; // peer port uint16_t peerPort; // peer port
...@@ -66,7 +65,7 @@ typedef struct { ...@@ -66,7 +65,7 @@ typedef struct {
void * chandle; // handle passed by TCP/UDP connection layer void * chandle; // handle passed by TCP/UDP connection layer
void * ahandle; // handle returned by upper app layter void * ahandle; // handle returned by upper app layter
int retry; int retry;
int tretry; // total retry int tretry; // total retry
void * pTimer; void * pTimer;
void * pIdleTimer; void * pIdleTimer;
char * pRspMsg; char * pRspMsg;
...@@ -79,7 +78,7 @@ typedef struct { ...@@ -79,7 +78,7 @@ typedef struct {
typedef struct { typedef struct {
int sessions; int sessions;
void * qhandle; // for scheduler void * qhandle; // for scheduler
SRpcConn * connList; SRpcConn * connList;
void * idPool; void * idPool;
void * tmrCtrl; void * tmrCtrl;
...@@ -94,11 +93,11 @@ typedef struct rpc_server { ...@@ -94,11 +93,11 @@ typedef struct rpc_server {
int mask; int mask;
int numOfChanns; int numOfChanns;
int numOfThreads; int numOfThreads;
int idMgmt; // ID management method int idMgmt; // ID management method
int type; int type;
int idleTime; // milliseconds; int idleTime; // milliseconds;
int noFree; // do not free the request msg when rsp is received int noFree; // do not free the request msg when rsp is received
int index; // for UDP server, next thread for new connection int index; // for UDP server, next thread for new connection
uint16_t localPort; uint16_t localPort;
char label[12]; char label[12];
void *(*fp)(char *, void *ahandle, void *thandle); void *(*fp)(char *, void *ahandle, void *thandle);
...@@ -107,8 +106,7 @@ typedef struct rpc_server { ...@@ -107,8 +106,7 @@ typedef struct rpc_server {
SRpcChann *channList; SRpcChann *channList;
} STaosRpc; } STaosRpc;
int tsRpcProgressTime = 10; // milliseocnds
int tsRpcProgressTime = 10; // milliseocnds
// not configurable // not configurable
int tsRpcMaxRetry; int tsRpcMaxRetry;
...@@ -141,6 +139,89 @@ void taosProcessSchedMsg(SSchedMsg *pMsg); ...@@ -141,6 +139,89 @@ void taosProcessSchedMsg(SSchedMsg *pMsg);
int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey); int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
static int32_t taosCompressRpcMsg(char* pCont, int32_t contLen) {
STaosHeader* pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
int32_t overhead = sizeof(int32_t) * 2;
int32_t finalLen = 0;
if (!NEEDTO_COMPRESSS_MSG(contLen)) {
return contLen;
}
char *buf = malloc (contLen + overhead + 8); // 16 extra bytes
if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno));
return contLen;
}
int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
/*
* only the compressed size is less than the value of contLen - overhead, the compression is applied
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/
if (compLen < contLen - overhead) {
//tDump(pCont, contLen);
int32_t *pLen = (int32_t *)pCont;
*pLen = 0; // first 4 bytes must be zero
pLen = (int32_t *)(pCont + sizeof(int32_t));
*pLen = htonl(contLen); // contLen is encoded in second 4 bytes
memcpy(pCont + overhead, buf, compLen);
pHeader->comp = 1;
tTrace("compress rpc msg, before:%lld, after:%lld", contLen, compLen);
finalLen = compLen + overhead;
//tDump(pCont, contLen);
} else {
finalLen = contLen;
}
free(buf);
return finalLen;
}
static STaosHeader* taosDecompressRpcMsg(STaosHeader* pHeader, SSchedMsg* pSchedMsg, int32_t msgLen) {
int overhead = sizeof(int32_t) * 2;
if (pHeader->comp == 0) {
pSchedMsg->msg = (char *)(&(pHeader->destId));
return pHeader;
}
// decompress the content
assert(GET_INT32_VAL(pHeader->content) == 0);
// contLen is original message length before compression applied
int contLen = htonl(GET_INT32_VAL(pHeader->content + sizeof(int32_t)));
// prepare the temporary buffer to decompress message
char *buf = malloc(sizeof(STaosHeader) + contLen);
//tDump(pHeader->content, msgLen);
if (buf) {
int32_t originalLen = LZ4_decompress_safe(pHeader->content + overhead, buf + sizeof(STaosHeader),
msgLen - overhead, contLen);
memcpy(buf, pHeader, sizeof(STaosHeader));
free(pHeader); // free the compressed message buffer
STaosHeader* pNewHeader = (STaosHeader *) buf;
pNewHeader->msgLen = originalLen + (int) sizeof(SIntMsg);
assert(originalLen == contLen);
pSchedMsg->msg = (char *)(&(pNewHeader->destId));
//tDump(pHeader->content, contLen);
return pNewHeader;
} else {
tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno));
pSchedMsg->msg = NULL;
}
}
char *taosBuildReqHeader(void *param, char type, char *msg) { char *taosBuildReqHeader(void *param, char type, char *msg) {
STaosHeader *pHeader; STaosHeader *pHeader;
SRpcConn * pConn = (SRpcConn *)param; SRpcConn * pConn = (SRpcConn *)param;
...@@ -1076,8 +1157,9 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por ...@@ -1076,8 +1157,9 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por
if (code != 0) { if (code != 0) {
// parsing error // parsing error
if (pHeader->msgType & 1) { if (pHeader->msgType & 1U) {
memset(pReply, 0, sizeof(pReply)); memset(pReply, 0, sizeof(pReply));
msgLen = taosBuildErrorMsgToPeer(data, code, pReply); msgLen = taosBuildErrorMsgToPeer(data, code, pReply);
(*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle); (*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle);
tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid, tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid,
...@@ -1092,17 +1174,17 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por ...@@ -1092,17 +1174,17 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por
// parsing OK // parsing OK
// internal communication is based on TAOS protocol, a trick here to make it efficient // internal communication is based on TAOS protocol, a trick here to make it efficient
pHeader->msgLen = msgLen - (int)sizeof(STaosHeader) + (int)sizeof(SIntMsg); if (pHeader->spi) msgLen -= sizeof(STaosDigest);
if (pHeader->spi) pHeader->msgLen -= sizeof(STaosDigest); msgLen -= (int)sizeof(STaosHeader);
pHeader->msgLen = msgLen + (int)sizeof(SIntMsg);
if ((pHeader->msgType & 1) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) { if ((pHeader->msgType & 1U) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) {
schedMsg.msg = NULL; // connection shall be closed schedMsg.msg = NULL; // connection shall be closed
} else { } else {
schedMsg.msg = (char *)(&(pHeader->destId)); pHeader = taosDecompressRpcMsg(pHeader, &schedMsg, msgLen);
// memcpy(schedMsg.msg, (char *)(&(pHeader->destId)), pHeader->msgLen);
} }
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16U)) {
tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", pServer->label, chann, sid, tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", pServer->label, chann, sid,
pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, pConn->pTimer); pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, pConn->pTimer);
} }
...@@ -1134,9 +1216,12 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) { ...@@ -1134,9 +1216,12 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) {
pChann = pServer->channList + pConn->chann; pChann = pServer->channList + pConn->chann;
pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader)); pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
msg = (char *)pHeader; msg = (char *)pHeader;
msgLen = contLen + (int32_t)sizeof(STaosHeader);
if ((pHeader->msgType & 1) == 0 && pConn->localPort) pHeader->port = pConn->localPort; if ((pHeader->msgType & 1U) == 0 && pConn->localPort) pHeader->port = pConn->localPort;
contLen = taosCompressRpcMsg(pCont, contLen);
msgLen = contLen + (int32_t)sizeof(STaosHeader);
if (pConn->spi) { if (pConn->spi) {
// add auth part // add auth part
...@@ -1153,7 +1238,7 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) { ...@@ -1153,7 +1238,7 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) {
pthread_mutex_lock(&pChann->mutex); pthread_mutex_lock(&pChann->mutex);
msgType = pHeader->msgType; msgType = pHeader->msgType;
if ((msgType & 1) == 0) { if ((msgType & 1U) == 0) {
// response // response
pConn->inType = 0; pConn->inType = 0;
tfree(pConn->pRspMsg); tfree(pConn->pRspMsg);
......
...@@ -346,10 +346,16 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { ...@@ -346,10 +346,16 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
int real_size = 0; int real_size = 0;
/* char action = SDB_TYPE_INSERT; */ /* char action = SDB_TYPE_INSERT; */
if (pTable == NULL) return -1; if (pTable == NULL) {
sdbError("sdb tables is null");
return -1;
}
if ((pTable->keyType != SDB_KEYTYPE_AUTO) || *((int64_t *)row)) if ((pTable->keyType != SDB_KEYTYPE_AUTO) || *((int64_t *)row))
if (sdbGetRow(handle, row)) return -1; if (sdbGetRow(handle, row)) {
sdbError("table:%s, failed to insert record, sdbVersion:%d", pTable->name, sdbVersion);
return -1;
}
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)malloc(total_size); SRowHead *rowHead = (SRowHead *)malloc(total_size);
...@@ -408,24 +414,26 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { ...@@ -408,24 +414,26 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
pTable->numOfRows++; pTable->numOfRows++;
switch (pTable->keyType) { switch (pTable->keyType) {
case SDB_KEYTYPE_STRING: case SDB_KEYTYPE_STRING:
sdbTrace( sdbTrace("table:%s, a record is inserted:%s, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld",
"table:%s, a record is inserted:%s, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld", pTable->name, (char *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
pTable->name, (char *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size); break;
case SDB_KEYTYPE_UINT32: //dnodes or mnodes
sdbTrace("table:%s, a record is inserted:%s, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld",
pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
break; break;
case SDB_KEYTYPE_UINT32:
case SDB_KEYTYPE_AUTO: case SDB_KEYTYPE_AUTO:
sdbTrace( sdbTrace("table:%s, a record is inserted:%d, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld",
"table:%s, a record is inserted:%d, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld", pTable->name, *(int32_t *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
pTable->name, *(int32_t *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
break; break;
default: default:
sdbTrace( sdbTrace("table:%s, a record is inserted, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld",
"table:%s, a record is inserted, sdbVersion:%ld id:%ld rowSize:%d numOfRows:%d fileSize:%ld", pTable->name, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
pTable->name, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
break; break;
} }
id = rowMeta.id; id = rowMeta.id;
} else {
sdbError("table:%s, failed to insert record", pTable->name);
} }
tfree(rowHead); tfree(rowHead);
...@@ -509,15 +517,16 @@ int sdbDeleteRow(void *handle, void *row) { ...@@ -509,15 +517,16 @@ int sdbDeleteRow(void *handle, void *row) {
sdbAddIntoUpdateList(pTable, SDB_TYPE_DELETE, pMetaRow); sdbAddIntoUpdateList(pTable, SDB_TYPE_DELETE, pMetaRow);
switch (pTable->keyType) { switch (pTable->keyType) {
case SDB_KEYTYPE_STRING: case SDB_KEYTYPE_STRING:
sdbTrace( sdbTrace("table:%s, a record is deleted:%s, sdbVersion:%ld id:%ld numOfRows:%d",
"table:%s, a record is deleted:%s, sdbVersion:%ld id:%ld numOfRows:%d", pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows);
pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows); break;
case SDB_KEYTYPE_UINT32: //dnodes or mnodes
sdbTrace("table:%s, a record is deleted:%s, sdbVersion:%ld id:%ld numOfRows:%d",
pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id, pTable->numOfRows);
break; break;
case SDB_KEYTYPE_UINT32:
case SDB_KEYTYPE_AUTO: case SDB_KEYTYPE_AUTO:
sdbTrace( sdbTrace("table:%s, a record is deleted:%d, sdbVersion:%ld id:%ld numOfRows:%d",
"table:%s, a record is deleted:%d, sdbVersion:%ld id:%ld numOfRows:%d", pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows);
pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows);
break; break;
default: default:
sdbTrace("table:%s, a record is deleted, sdbVersion:%ld id:%ld numOfRows:%d", sdbTrace("table:%s, a record is deleted, sdbVersion:%ld id:%ld numOfRows:%d",
...@@ -610,15 +619,16 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) { ...@@ -610,15 +619,16 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
switch (pTable->keyType) { switch (pTable->keyType) {
case SDB_KEYTYPE_STRING: case SDB_KEYTYPE_STRING:
sdbTrace( sdbTrace("table:%s, a record is updated:%s, sdbVersion:%ld id:%ld numOfRows:%d",
"table:%s, a record is updated:%s, sdbVersion:%ld id:%ld numOfRows:%d", pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows);
pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows); break;
case SDB_KEYTYPE_UINT32: //dnodes or mnodes
sdbTrace("table:%s, a record is updated:%d, sdbVersion:%ld id:%ld numOfRows:%d",
pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id, pTable->numOfRows);
break; break;
case SDB_KEYTYPE_UINT32:
case SDB_KEYTYPE_AUTO: case SDB_KEYTYPE_AUTO:
sdbTrace( sdbTrace("table:%s, a record is updated:%d, sdbVersion:%ld id:%ld numOfRows:%d",
"table:%s, a record is updated:%d, sdbVersion:%ld id:%ld numOfRows:%d", pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows);
pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows);
break; break;
default: default:
sdbTrace("table:%s, a record is updated, sdbVersion:%ld id:%ld numOfRows:%d", pTable->name, sdbVersion, sdbTrace("table:%s, a record is updated, sdbVersion:%ld id:%ld numOfRows:%d", pTable->name, sdbVersion,
......
...@@ -675,7 +675,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) { ...@@ -675,7 +675,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
// send create message to the selected vnode servers // send create message to the selected vnode servers
if (pCreate->numOfTags == 0) { if (pCreate->numOfTags == 0) {
mTrace("table:%s, send create msg to dnode, vgId:%d, sid:%d, vnode:%d", mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d",
pMeter->meterId, pMeter->gid.vgId, pMeter->gid.sid, pVgroup->vnodeGid[0].vnode); pMeter->meterId, pMeter->gid.vgId, pMeter->gid.sid, pVgroup->vnodeGid[0].vnode);
grantAddTimeSeries(pMeter->numOfColumns - 1); grantAddTimeSeries(pMeter->numOfColumns - 1);
......
...@@ -114,6 +114,7 @@ int vnodeCreateHeadDataFile(int vnode, int fileId, char *headName, char *dataNam ...@@ -114,6 +114,7 @@ int vnodeCreateHeadDataFile(int vnode, int fileId, char *headName, char *dataNam
char *path = vnodeGetDataDir(vnode, fileId); char *path = vnodeGetDataDir(vnode, fileId);
if (path == NULL) { if (path == NULL) {
dError("vid:%d, fileId:%d, failed to get dataDir", vnode, fileId);
return -1; return -1;
} }
......
...@@ -2952,11 +2952,11 @@ static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles ...@@ -2952,11 +2952,11 @@ static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles
pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY); pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY);
pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY); pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY);
if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1; // if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1;
pVnodeFiles->dataFileSize = fstat.st_size; // pVnodeFiles->dataFileSize = fstat.st_size;
//
if (stat(pVnodeFiles->lastFilePath, &fstat) < 0) return -1; // if (stat(pVnodeFiles->lastFilePath, &fstat) < 0) return -1;
pVnodeFiles->lastFileSize = fstat.st_size; // pVnodeFiles->lastFileSize = fstat.st_size;
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP #if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
/* enforce kernel to preload data when the file is mapping */ /* enforce kernel to preload data when the file is mapping */
......
...@@ -483,13 +483,9 @@ void vnodeFreeQInfo(void *param, bool decQueryRef) { ...@@ -483,13 +483,9 @@ void vnodeFreeQInfo(void *param, bool decQueryRef) {
} }
tfree(pQuery->pGroupbyExpr); tfree(pQuery->pGroupbyExpr);
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId); dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId);
/* //destroy signature, in order to avoid the query process pass the object safety check
* destory signature, in order to avoid the query process pass the object
* safety check
*/
memset(pQInfo, 0, sizeof(SQInfo)); memset(pQInfo, 0, sizeof(SQInfo));
tfree(pQInfo); tfree(pQInfo);
} }
......
...@@ -88,28 +88,32 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { ...@@ -88,28 +88,32 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
} }
} }
// if ( vnodeList[vnode].status != TSDB_STATUS_MASTER && pMsg->msgType != TSDB_MSG_TYPE_RETRIEVE ) { dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle);
#ifdef CLUSTER if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
if (vnodeList[vnode].vnodeStatus != TSDB_VN_STATUS_MASTER) { if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER || vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) {
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
dTrace("vid:%d sid:%d, shell msg is ignored since in state:%d", vnode, sid, vnodeList[vnode].vnodeStatus);
} else {
#endif
dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle);
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
vnodeProcessQueryRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); vnodeProcessQueryRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
} else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
vnodeProcessRetrieveRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
vnodeProcessShellSubmitRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
} else { } else {
dError("%s is not processed", taosMsg[pMsg->msgType]); taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
dTrace("vid:%d sid:%d, shell query msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
}
} else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER || vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) {
vnodeProcessRetrieveRequest((char *) pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
} else {
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
dTrace("vid:%d sid:%d, shell retrieve msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
} }
#ifdef CLUSTER } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER) {
vnodeProcessShellSubmitRequest((char *) pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
} else {
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
dTrace("vid:%d sid:%d, shell submit msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
}
} else {
dError("%s is not processed", taosMsg[pMsg->msgType]);
} }
#endif
return pObj; return pObj;
} }
......
...@@ -37,7 +37,6 @@ ELSEIF (TD_WINDOWS_64) ...@@ -37,7 +37,6 @@ ELSEIF (TD_WINDOWS_64)
LIST(APPEND SRC ./src/ihash.c) LIST(APPEND SRC ./src/ihash.c)
LIST(APPEND SRC ./src/lz4.c) LIST(APPEND SRC ./src/lz4.c)
LIST(APPEND SRC ./src/shash.c) LIST(APPEND SRC ./src/shash.c)
LIST(APPEND SRC ./src/sql.c)
LIST(APPEND SRC ./src/tbase64.c) LIST(APPEND SRC ./src/tbase64.c)
LIST(APPEND SRC ./src/tcache.c) LIST(APPEND SRC ./src/tcache.c)
LIST(APPEND SRC ./src/tcompression.c) LIST(APPEND SRC ./src/tcompression.c)
...@@ -59,8 +58,6 @@ ELSEIF (TD_WINDOWS_64) ...@@ -59,8 +58,6 @@ ELSEIF (TD_WINDOWS_64)
LIST(APPEND SRC ./src/tskiplist.c) LIST(APPEND SRC ./src/tskiplist.c)
LIST(APPEND SRC ./src/tsocket.c) LIST(APPEND SRC ./src/tsocket.c)
LIST(APPEND SRC ./src/tstatus.c) LIST(APPEND SRC ./src/tstatus.c)
LIST(APPEND SRC ./src/tstoken.c)
LIST(APPEND SRC ./src/tstoken.c)
LIST(APPEND SRC ./src/tstrbuild.c) LIST(APPEND SRC ./src/tstrbuild.c)
LIST(APPEND SRC ./src/ttime.c) LIST(APPEND SRC ./src/ttime.c)
LIST(APPEND SRC ./src/ttimer.c) LIST(APPEND SRC ./src/ttimer.c)
......
...@@ -644,6 +644,7 @@ static void doInitGlobalConfig() { ...@@ -644,6 +644,7 @@ static void doInitGlobalConfig() {
tsInitConfigOption(cfg++, "defaultPass", tsDefaultPass, TSDB_CFG_VTYPE_STRING, tsInitConfigOption(cfg++, "defaultPass", tsDefaultPass, TSDB_CFG_VTYPE_STRING,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT,
0, 0, TSDB_PASSWORD_LEN, TSDB_CFG_UTYPE_NONE); 0, 0, TSDB_PASSWORD_LEN, TSDB_CFG_UTYPE_NONE);
// socket type, udp by default // socket type, udp by default
tsInitConfigOption(cfg++, "sockettype", tsSocketType, TSDB_CFG_VTYPE_STRING, tsInitConfigOption(cfg++, "sockettype", tsSocketType, TSDB_CFG_VTYPE_STRING,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW,
......
char version[64] = "1.6.4.0"; char version[64] = "1.6.4.0";
char compatible_version[64] = "1.6.1.0"; char compatible_version[64] = "1.6.1.0";
char gitinfo[128] = "d04354a8ac2f7dd9ba521d755e5d484a203783d9"; char gitinfo[128] = "b6e308866e315483915f4c42a2717547ed0b9d36";
char buildinfo[512] = "Built by root at 2019-11-11 10:23"; char buildinfo[512] = "Built by ubuntu at 2019-11-26 21:56";
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册