提交 495bb309 编写于 作者: S slguan

Merge branch 'develop' into feature/slguan

......@@ -58,6 +58,12 @@
# The server and client should have the same socket type. Otherwise, connect will fail.
# sockettype udp
# The compressed rpc message, option:
# -1 (no compression)
# 0 (all message compressed),
# > 0 (rpc message body which larger than this value will be compressed)
# compressMsgSize -1
# RPC re-try timer, millisecond
# rpcTimer 300
......
......@@ -105,6 +105,7 @@ enum TSQL_TYPE {
SHOW_MODULES = 0x6c,
SHOW_CONNECTIONS = 0x6d,
SHOW_GRANTS = 0x6e,
SHOW_VNODES = 0x6f,
// create dnode
CREATE_DNODE = 0x80,
......
此差异已折叠。
......@@ -291,7 +291,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case SHOW_STREAMS:
case SHOW_SCORES:
case SHOW_GRANTS:
case SHOW_CONFIGS: {
case SHOW_CONFIGS:
case SHOW_VNODES: {
return setShowInfo(pSql, pInfo);
}
......@@ -2595,6 +2596,9 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case SHOW_CONFIGS:
pCmd->showType = TSDB_MGMT_TABLE_CONFIGS;
break;
case SHOW_VNODES:
pCmd->showType = TSDB_MGMT_TABLE_VNODES;
break;
default:
return TSDB_CODE_INVALID_SQL;
}
......@@ -2640,6 +2644,19 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
}
}
}else if (type == SHOW_VNODES) {
// show vnodes may be ip addr of dnode in payload
if (pInfo->pDCLInfo->nTokens > 0) {
SSQLToken* pDnodeIp = &pInfo->pDCLInfo->a[0];
if (pDnodeIp->n > TSDB_IPv4ADDR_LEN) { // ip addr is too long
setErrMsg(pCmd, msg);
return TSDB_CODE_INVALID_SQL;
}
strncpy(pCmd->payload, pDnodeIp->z, pDnodeIp->n);
pCmd->payloadLen = strdequote(pCmd->payload);
}
}
return TSDB_CODE_SUCCESS;
......
......@@ -2175,7 +2175,7 @@ int tscBuildShowMsg(SSqlObj *pSql) {
pShowMsg = (SShowMsg *)pMsg;
pShowMsg->type = pCmd->showType;
if ((pShowMsg->type == TSDB_MGMT_TABLE_TABLE || pShowMsg->type == TSDB_MGMT_TABLE_METRIC) && pCmd->payloadLen != 0) {
if ((pShowMsg->type == TSDB_MGMT_TABLE_TABLE || pShowMsg->type == TSDB_MGMT_TABLE_METRIC || pShowMsg->type == TSDB_MGMT_TABLE_VNODES ) && pCmd->payloadLen != 0) {
// only show tables support wildcard query
pShowMsg->payloadLen = htons(pCmd->payloadLen);
memcpy(pShowMsg->payload, payload, pCmd->payloadLen);
......@@ -3652,7 +3652,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) {
*/
if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) {
if (pMeterMetaInfo->pMeterMeta) {
tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%d, addr:%p", pSql,
tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%lld, addr:%p", pSql,
pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta);
}
tscWaitingForCreateTable(&pSql->cmd);
......
......@@ -73,6 +73,9 @@ cmd ::= SHOW CONFIGS. { setDCLSQLElems(pInfo, SHOW_CONFIGS, 0); }
cmd ::= SHOW SCORES. { setDCLSQLElems(pInfo, SHOW_SCORES, 0); }
cmd ::= SHOW GRANTS. { setDCLSQLElems(pInfo, SHOW_GRANTS, 0); }
cmd ::= SHOW VNODES. { setDCLSQLElems(pInfo, SHOW_VNODES, 0); }
cmd ::= SHOW VNODES IPTOKEN(X). { setDCLSQLElems(pInfo, SHOW_VNODES, 1, &X); }
%type dbPrefix {SSQLToken}
dbPrefix(A) ::=. {A.n = 0;}
dbPrefix(A) ::= ids(X) DOT. {A = X; }
......@@ -658,4 +661,4 @@ cmd ::= KILL QUERY IPTOKEN(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X
DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD
LIKE MATCH KEY OF OFFSET RAISE REPLACE RESTRICT ROW STATEMENT TRIGGER VIEW ALL
COUNT SUM AVG MIN MAX FIRST LAST TOP BOTTOM STDDEV PERCENTILE APERCENTILE LEASTSQUARES HISTOGRAM DIFF
SPREAD TWA INTERP LAST_ROW NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT METRIC TBNAME JOIN METRICS STABLE NULL.
\ No newline at end of file
SPREAD TWA INTERP LAST_ROW NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT METRIC TBNAME JOIN METRICS STABLE NULL.
......@@ -158,6 +158,7 @@ enum _mgmt_table {
TSDB_MGMT_TABLE_CONNS,
TSDB_MGMT_TABLE_SCORES,
TSDB_MGMT_TABLE_GRANTS,
TSDB_MGMT_TABLE_VNODES,
TSDB_MGMT_TABLE_MAX,
};
......@@ -224,7 +225,7 @@ typedef struct {
char meterId[TSDB_UNI_LEN];
uint16_t port; // for UDP only
char empty[1];
char msgType;
uint8_t msgType;
int32_t msgLen;
uint8_t content[0];
} STaosHeader;
......
......@@ -256,6 +256,8 @@ SGlobalConfig *tsGetConfigOption(const char *option);
#define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
#ifdef __cplusplus
}
#endif
......
......@@ -72,145 +72,144 @@
#define TK_CONFIGS 54
#define TK_SCORES 55
#define TK_GRANTS 56
#define TK_DOT 57
#define TK_TABLES 58
#define TK_STABLES 59
#define TK_VGROUPS 60
#define TK_DROP 61
#define TK_TABLE 62
#define TK_DATABASE 63
#define TK_DNODE 64
#define TK_IPTOKEN 65
#define TK_USER 66
#define TK_ACCOUNT 67
#define TK_USE 68
#define TK_DESCRIBE 69
#define TK_ALTER 70
#define TK_PASS 71
#define TK_PRIVILEGE 72
#define TK_LOCAL 73
#define TK_IF 74
#define TK_EXISTS 75
#define TK_CREATE 76
#define TK_PPS 77
#define TK_TSERIES 78
#define TK_DBS 79
#define TK_STORAGE 80
#define TK_QTIME 81
#define TK_CONNS 82
#define TK_STATE 83
#define TK_KEEP 84
#define TK_CACHE 85
#define TK_REPLICA 86
#define TK_DAYS 87
#define TK_ROWS 88
#define TK_ABLOCKS 89
#define TK_TBLOCKS 90
#define TK_CTIME 91
#define TK_CLOG 92
#define TK_COMP 93
#define TK_PRECISION 94
#define TK_LP 95
#define TK_RP 96
#define TK_TAGS 97
#define TK_USING 98
#define TK_AS 99
#define TK_COMMA 100
#define TK_NULL 101
#define TK_SELECT 102
#define TK_FROM 103
#define TK_VARIABLE 104
#define TK_INTERVAL 105
#define TK_FILL 106
#define TK_SLIDING 107
#define TK_ORDER 108
#define TK_BY 109
#define TK_ASC 110
#define TK_DESC 111
#define TK_GROUP 112
#define TK_HAVING 113
#define TK_LIMIT 114
#define TK_OFFSET 115
#define TK_SLIMIT 116
#define TK_SOFFSET 117
#define TK_WHERE 118
#define TK_NOW 119
#define TK_INSERT 120
#define TK_INTO 121
#define TK_VALUES 122
#define TK_RESET 123
#define TK_QUERY 124
#define TK_ADD 125
#define TK_COLUMN 126
#define TK_TAG 127
#define TK_CHANGE 128
#define TK_SET 129
#define TK_KILL 130
#define TK_CONNECTION 131
#define TK_COLON 132
#define TK_STREAM 133
#define TK_ABORT 134
#define TK_AFTER 135
#define TK_ATTACH 136
#define TK_BEFORE 137
#define TK_BEGIN 138
#define TK_CASCADE 139
#define TK_CLUSTER 140
#define TK_CONFLICT 141
#define TK_COPY 142
#define TK_DEFERRED 143
#define TK_DELIMITERS 144
#define TK_DETACH 145
#define TK_EACH 146
#define TK_END 147
#define TK_EXPLAIN 148
#define TK_FAIL 149
#define TK_FOR 150
#define TK_IGNORE 151
#define TK_IMMEDIATE 152
#define TK_INITIALLY 153
#define TK_INSTEAD 154
#define TK_MATCH 155
#define TK_KEY 156
#define TK_OF 157
#define TK_RAISE 158
#define TK_REPLACE 159
#define TK_RESTRICT 160
#define TK_ROW 161
#define TK_STATEMENT 162
#define TK_TRIGGER 163
#define TK_VIEW 164
#define TK_ALL 165
#define TK_COUNT 166
#define TK_SUM 167
#define TK_AVG 168
#define TK_MIN 169
#define TK_MAX 170
#define TK_FIRST 171
#define TK_LAST 172
#define TK_TOP 173
#define TK_BOTTOM 174
#define TK_STDDEV 175
#define TK_PERCENTILE 176
#define TK_APERCENTILE 177
#define TK_LEASTSQUARES 178
#define TK_HISTOGRAM 179
#define TK_DIFF 180
#define TK_SPREAD 181
#define TK_TWA 182
#define TK_INTERP 183
#define TK_LAST_ROW 184
#define TK_SEMI 185
#define TK_NONE 186
#define TK_PREV 187
#define TK_LINEAR 188
#define TK_IMPORT 189
#define TK_METRIC 190
#define TK_TBNAME 191
#define TK_JOIN 192
#define TK_METRICS 193
#define TK_STABLE 194
#define TK_VNODES 57
#define TK_IPTOKEN 58
#define TK_DOT 59
#define TK_TABLES 60
#define TK_STABLES 61
#define TK_VGROUPS 62
#define TK_DROP 63
#define TK_TABLE 64
#define TK_DATABASE 65
#define TK_DNODE 66
#define TK_USER 67
#define TK_ACCOUNT 68
#define TK_USE 69
#define TK_DESCRIBE 70
#define TK_ALTER 71
#define TK_PASS 72
#define TK_PRIVILEGE 73
#define TK_LOCAL 74
#define TK_IF 75
#define TK_EXISTS 76
#define TK_CREATE 77
#define TK_PPS 78
#define TK_TSERIES 79
#define TK_DBS 80
#define TK_STORAGE 81
#define TK_QTIME 82
#define TK_CONNS 83
#define TK_STATE 84
#define TK_KEEP 85
#define TK_CACHE 86
#define TK_REPLICA 87
#define TK_DAYS 88
#define TK_ROWS 89
#define TK_ABLOCKS 90
#define TK_TBLOCKS 91
#define TK_CTIME 92
#define TK_CLOG 93
#define TK_COMP 94
#define TK_PRECISION 95
#define TK_LP 96
#define TK_RP 97
#define TK_TAGS 98
#define TK_USING 99
#define TK_AS 100
#define TK_COMMA 101
#define TK_NULL 102
#define TK_SELECT 103
#define TK_FROM 104
#define TK_VARIABLE 105
#define TK_INTERVAL 106
#define TK_FILL 107
#define TK_SLIDING 108
#define TK_ORDER 109
#define TK_BY 110
#define TK_ASC 111
#define TK_DESC 112
#define TK_GROUP 113
#define TK_HAVING 114
#define TK_LIMIT 115
#define TK_OFFSET 116
#define TK_SLIMIT 117
#define TK_SOFFSET 118
#define TK_WHERE 119
#define TK_NOW 120
#define TK_INSERT 121
#define TK_INTO 122
#define TK_VALUES 123
#define TK_RESET 124
#define TK_QUERY 125
#define TK_ADD 126
#define TK_COLUMN 127
#define TK_TAG 128
#define TK_CHANGE 129
#define TK_SET 130
#define TK_KILL 131
#define TK_CONNECTION 132
#define TK_COLON 133
#define TK_STREAM 134
#define TK_ABORT 135
#define TK_AFTER 136
#define TK_ATTACH 137
#define TK_BEFORE 138
#define TK_BEGIN 139
#define TK_CASCADE 140
#define TK_CLUSTER 141
#define TK_CONFLICT 142
#define TK_COPY 143
#define TK_DEFERRED 144
#define TK_DELIMITERS 145
#define TK_DETACH 146
#define TK_EACH 147
#define TK_END 148
#define TK_EXPLAIN 149
#define TK_FAIL 150
#define TK_FOR 151
#define TK_IGNORE 152
#define TK_IMMEDIATE 153
#define TK_INITIALLY 154
#define TK_INSTEAD 155
#define TK_MATCH 156
#define TK_KEY 157
#define TK_OF 158
#define TK_RAISE 159
#define TK_REPLACE 160
#define TK_RESTRICT 161
#define TK_ROW 162
#define TK_STATEMENT 163
#define TK_TRIGGER 164
#define TK_VIEW 165
#define TK_ALL 166
#define TK_COUNT 167
#define TK_SUM 168
#define TK_AVG 169
#define TK_MIN 170
#define TK_MAX 171
#define TK_FIRST 172
#define TK_LAST 173
#define TK_TOP 174
#define TK_BOTTOM 175
#define TK_STDDEV 176
#define TK_PERCENTILE 177
#define TK_APERCENTILE 178
#define TK_LEASTSQUARES 179
#define TK_HISTOGRAM 180
#define TK_DIFF 181
#define TK_SPREAD 182
#define TK_TWA 183
#define TK_INTERP 184
#define TK_LAST_ROW 185
#define TK_SEMI 186
#define TK_NONE 187
#define TK_PREV 188
#define TK_LINEAR 189
#define TK_IMPORT 190
#define TK_METRIC 191
#define TK_TBNAME 192
#define TK_JOIN 193
#define TK_METRICS 194
#define TK_STABLE 195
#endif
......@@ -14,7 +14,6 @@
*/
#include "os.h"
#include "shash.h"
#include "taosmsg.h"
#include "tidpool.h"
......@@ -30,6 +29,7 @@
#include "ttimer.h"
#include "tudp.h"
#include "tutil.h"
#include "lz4.h"
#pragma GCC diagnostic ignored "-Wpointer-to-int-cast"
......@@ -50,8 +50,7 @@ typedef struct {
char encrypt;
uint8_t secret[TSDB_KEY_LEN];
uint8_t ckey[TSDB_KEY_LEN];
uint16_t localPort; // for UDP only
uint16_t localPort; // for UDP only
uint32_t peerUid;
uint32_t peerIp; // peer IP
uint16_t peerPort; // peer port
......@@ -66,7 +65,7 @@ typedef struct {
void * chandle; // handle passed by TCP/UDP connection layer
void * ahandle; // handle returned by upper app layter
int retry;
int tretry; // total retry
int tretry; // total retry
void * pTimer;
void * pIdleTimer;
char * pRspMsg;
......@@ -79,7 +78,7 @@ typedef struct {
typedef struct {
int sessions;
void * qhandle; // for scheduler
void * qhandle; // for scheduler
SRpcConn * connList;
void * idPool;
void * tmrCtrl;
......@@ -94,11 +93,11 @@ typedef struct rpc_server {
int mask;
int numOfChanns;
int numOfThreads;
int idMgmt; // ID management method
int idMgmt; // ID management method
int type;
int idleTime; // milliseconds;
int noFree; // do not free the request msg when rsp is received
int index; // for UDP server, next thread for new connection
int idleTime; // milliseconds;
int noFree; // do not free the request msg when rsp is received
int index; // for UDP server, next thread for new connection
uint16_t localPort;
char label[12];
void *(*fp)(char *, void *ahandle, void *thandle);
......@@ -107,8 +106,7 @@ typedef struct rpc_server {
SRpcChann *channList;
} STaosRpc;
int tsRpcProgressTime = 10; // milliseocnds
int tsRpcProgressTime = 10; // milliseocnds
// not configurable
int tsRpcMaxRetry;
......@@ -141,6 +139,89 @@ void taosProcessSchedMsg(SSchedMsg *pMsg);
int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
static int32_t taosCompressRpcMsg(char* pCont, int32_t contLen) {
STaosHeader* pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
int32_t overhead = sizeof(int32_t) * 2;
int32_t finalLen = 0;
if (!NEEDTO_COMPRESSS_MSG(contLen)) {
return contLen;
}
char *buf = malloc (contLen + overhead + 8); // 16 extra bytes
if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno));
return contLen;
}
int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
/*
* only the compressed size is less than the value of contLen - overhead, the compression is applied
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/
if (compLen < contLen - overhead) {
//tDump(pCont, contLen);
int32_t *pLen = (int32_t *)pCont;
*pLen = 0; // first 4 bytes must be zero
pLen = (int32_t *)(pCont + sizeof(int32_t));
*pLen = htonl(contLen); // contLen is encoded in second 4 bytes
memcpy(pCont + overhead, buf, compLen);
pHeader->comp = 1;
tTrace("compress rpc msg, before:%lld, after:%lld", contLen, compLen);
finalLen = compLen + overhead;
//tDump(pCont, contLen);
} else {
finalLen = contLen;
}
free(buf);
return finalLen;
}
static STaosHeader* taosDecompressRpcMsg(STaosHeader* pHeader, SSchedMsg* pSchedMsg, int32_t msgLen) {
int overhead = sizeof(int32_t) * 2;
if (pHeader->comp == 0) {
pSchedMsg->msg = (char *)(&(pHeader->destId));
return pHeader;
}
// decompress the content
assert(GET_INT32_VAL(pHeader->content) == 0);
// contLen is original message length before compression applied
int contLen = htonl(GET_INT32_VAL(pHeader->content + sizeof(int32_t)));
// prepare the temporary buffer to decompress message
char *buf = malloc(sizeof(STaosHeader) + contLen);
//tDump(pHeader->content, msgLen);
if (buf) {
int32_t originalLen = LZ4_decompress_safe(pHeader->content + overhead, buf + sizeof(STaosHeader),
msgLen - overhead, contLen);
memcpy(buf, pHeader, sizeof(STaosHeader));
free(pHeader); // free the compressed message buffer
STaosHeader* pNewHeader = (STaosHeader *) buf;
pNewHeader->msgLen = originalLen + (int) sizeof(SIntMsg);
assert(originalLen == contLen);
pSchedMsg->msg = (char *)(&(pNewHeader->destId));
//tDump(pHeader->content, contLen);
return pNewHeader;
} else {
tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno));
pSchedMsg->msg = NULL;
}
}
char *taosBuildReqHeader(void *param, char type, char *msg) {
STaosHeader *pHeader;
SRpcConn * pConn = (SRpcConn *)param;
......@@ -1074,8 +1155,9 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por
if (code != 0) {
// parsing error
if (pHeader->msgType & 1) {
if (pHeader->msgType & 1U) {
memset(pReply, 0, sizeof(pReply));
msgLen = taosBuildErrorMsgToPeer(data, code, pReply);
(*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle);
tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid,
......@@ -1090,17 +1172,17 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por
// parsing OK
// internal communication is based on TAOS protocol, a trick here to make it efficient
pHeader->msgLen = msgLen - (int)sizeof(STaosHeader) + (int)sizeof(SIntMsg);
if (pHeader->spi) pHeader->msgLen -= sizeof(STaosDigest);
if (pHeader->spi) msgLen -= sizeof(STaosDigest);
msgLen -= (int)sizeof(STaosHeader);
pHeader->msgLen = msgLen + (int)sizeof(SIntMsg);
if ((pHeader->msgType & 1) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) {
if ((pHeader->msgType & 1U) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) {
schedMsg.msg = NULL; // connection shall be closed
} else {
schedMsg.msg = (char *)(&(pHeader->destId));
// memcpy(schedMsg.msg, (char *)(&(pHeader->destId)), pHeader->msgLen);
pHeader = taosDecompressRpcMsg(pHeader, &schedMsg, msgLen);
}
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16U)) {
tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", pServer->label, chann, sid,
pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, pConn->pTimer);
}
......@@ -1132,9 +1214,12 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) {
pChann = pServer->channList + pConn->chann;
pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
msg = (char *)pHeader;
msgLen = contLen + (int32_t)sizeof(STaosHeader);
if ((pHeader->msgType & 1) == 0 && pConn->localPort) pHeader->port = pConn->localPort;
if ((pHeader->msgType & 1U) == 0 && pConn->localPort) pHeader->port = pConn->localPort;
contLen = taosCompressRpcMsg(pCont, contLen);
msgLen = contLen + (int32_t)sizeof(STaosHeader);
if (pConn->spi) {
// add auth part
......@@ -1151,7 +1236,7 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) {
pthread_mutex_lock(&pChann->mutex);
msgType = pHeader->msgType;
if ((msgType & 1) == 0) {
if ((msgType & 1U) == 0) {
// response
pConn->inType = 0;
tfree(pConn->pRspMsg);
......
......@@ -410,6 +410,9 @@ int mgmtRetrieveScores(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
int grantGetGrantsMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int grantRetrieveGrants(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
int mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
// dnode balance api
int mgmtInitBalance();
void mgmtCleanupBalance();
......
......@@ -389,3 +389,119 @@ int mgmtRetrieveConfigs(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
pShow->numOfReads += numOfRows;
return numOfRows;
}
int mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0;
if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "vnode");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "vgid");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 12;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 12;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "sync status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
// TODO: if other thread drop dnode ????
SDnodeObj *pDnode = NULL;
if (pShow->payloadLen > 0 ) {
uint32_t ip = ip2uint(pShow->payload);
pDnode = mgmtGetDnode(ip);
if (NULL == pDnode) {
return TSDB_CODE_NODE_OFFLINE;
}
pShow->numOfRows = pDnode->openVnodes;
pShow->pNode = pDnode;
} else {
while (true) {
pShow->pNode = mgmtGetNextDnode(pShow, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break;
pShow->numOfRows += pDnode->openVnodes;
if (0 == pShow->numOfRows) return TSDB_CODE_NODE_OFFLINE;
}
pShow->pNode = NULL;
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
return 0;
}
int mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int numOfRows = 0;
SDnodeObj *pDnode = NULL;
char * pWrite;
int cols = 0;
char ipstr[20];
if (0 == rows) return 0;
if (pShow->payloadLen) {
// output the vnodes info of the designated dnode. And output all vnodes of this dnode, instead of rows (max 100)
pDnode = (SDnodeObj *)(pShow->pNode);
if (pDnode != NULL) {
SVnodeLoad* pVnode;
for (int i = 0 ; i < TSDB_MAX_VNODES; i++) {
pVnode = &pDnode->vload[i];
if (0 == pVnode->vgId) {
continue;
}
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(uint32_t *)pWrite = pVnode->vnode;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(uint32_t *)pWrite = pVnode->vgId;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetVnodeStatusStr(pVnode->status));
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetVnodeSyncStatusStr(pVnode->syncStatus));
cols++;
numOfRows++;
}
}
} else {
// TODO: output all vnodes of all dnodes
numOfRows = 0;
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
......@@ -675,7 +675,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
// send create message to the selected vnode servers
if (pCreate->numOfTags == 0) {
mTrace("table:%s, send create msg to dnode, vgId:%d, sid:%d, vnode:%d",
mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d",
pMeter->meterId, pMeter->gid.vgId, pMeter->gid.sid, pVgroup->vnodeGid[0].vnode);
grantAddTimeSeries(pMeter->numOfColumns - 1);
......
......@@ -788,12 +788,14 @@ int (*mgmtGetMetaFp[])(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) = {
mgmtGetAcctMeta, mgmtGetUserMeta, mgmtGetDbMeta, mgmtGetMeterMeta, mgmtGetDnodeMeta,
mgmtGetMnodeMeta, mgmtGetVgroupMeta, mgmtGetMetricMeta, mgmtGetModuleMeta, mgmtGetQueryMeta,
mgmtGetStreamMeta, mgmtGetConfigMeta, mgmtGetConnsMeta, mgmtGetScoresMeta, grantGetGrantsMeta,
mgmtGetVnodeMeta,
};
int (*mgmtRetrieveFp[])(SShowObj *pShow, char *data, int rows, SConnObj *pConn) = {
mgmtRetrieveAccts, mgmtRetrieveUsers, mgmtRetrieveDbs, mgmtRetrieveMeters, mgmtRetrieveDnodes,
mgmtRetrieveMnodes, mgmtRetrieveVgroups, mgmtRetrieveMetrics, mgmtRetrieveModules, mgmtRetrieveQueries,
mgmtRetrieveStreams, mgmtRetrieveConfigs, mgmtRetrieveConns, mgmtRetrieveScores, grantRetrieveGrants,
mgmtRetrieveVnodes,
};
int mgmtProcessShowMsg(char *pMsg, int msgLen, SConnObj *pConn) {
......
......@@ -2952,11 +2952,11 @@ static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles
pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY);
pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY);
if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1;
pVnodeFiles->dataFileSize = fstat.st_size;
if (stat(pVnodeFiles->lastFilePath, &fstat) < 0) return -1;
pVnodeFiles->lastFileSize = fstat.st_size;
// if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1;
// pVnodeFiles->dataFileSize = fstat.st_size;
//
// if (stat(pVnodeFiles->lastFilePath, &fstat) < 0) return -1;
// pVnodeFiles->lastFileSize = fstat.st_size;
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
/* enforce kernel to preload data when the file is mapping */
......
......@@ -483,13 +483,9 @@ void vnodeFreeQInfo(void *param, bool decQueryRef) {
}
tfree(pQuery->pGroupbyExpr);
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId);
/*
* destory signature, in order to avoid the query process pass the object
* safety check
*/
//destroy signature, in order to avoid the query process pass the object safety check
memset(pQInfo, 0, sizeof(SQInfo));
tfree(pQInfo);
}
......
......@@ -644,6 +644,7 @@ static void doInitGlobalConfig() {
tsInitConfigOption(cfg++, "defaultPass", tsDefaultPass, TSDB_CFG_VTYPE_STRING,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT,
0, 0, TSDB_PASSWORD_LEN, TSDB_CFG_UTYPE_NONE);
// socket type, udp by default
tsInitConfigOption(cfg++, "sockettype", tsSocketType, TSDB_CFG_VTYPE_STRING,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW,
......
......@@ -224,6 +224,7 @@ static SKeyword keywordTable[] = {
{"METRICS", TK_METRICS},
{"STABLE", TK_STABLE},
{"FILE", TK_FILE},
{"VNODES", TK_VNODES},
};
/* This is the hash table */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册