提交 bb659995 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/code_format

......@@ -140,7 +140,7 @@ extern int32_t tsTtlPushInterval;
extern int32_t tsGrantHBInterval;
extern int32_t tsUptimeInterval;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
//#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc);
......
......@@ -152,166 +152,166 @@
#define TK_TABLES 134
#define TK_STABLES 135
#define TK_MNODES 136
#define TK_MODULES 137
#define TK_QNODES 138
#define TK_FUNCTIONS 139
#define TK_INDEXES 140
#define TK_ACCOUNTS 141
#define TK_APPS 142
#define TK_CONNECTIONS 143
#define TK_LICENCES 144
#define TK_GRANTS 145
#define TK_QUERIES 146
#define TK_SCORES 147
#define TK_TOPICS 148
#define TK_VARIABLES 149
#define TK_BNODES 150
#define TK_SNODES 151
#define TK_CLUSTER 152
#define TK_TRANSACTIONS 153
#define TK_DISTRIBUTED 154
#define TK_CONSUMERS 155
#define TK_SUBSCRIPTIONS 156
#define TK_VNODES 157
#define TK_LIKE 158
#define TK_INDEX 159
#define TK_FUNCTION 160
#define TK_INTERVAL 161
#define TK_TOPIC 162
#define TK_AS 163
#define TK_WITH 164
#define TK_META 165
#define TK_CONSUMER 166
#define TK_GROUP 167
#define TK_DESC 168
#define TK_DESCRIBE 169
#define TK_RESET 170
#define TK_QUERY 171
#define TK_CACHE 172
#define TK_EXPLAIN 173
#define TK_ANALYZE 174
#define TK_VERBOSE 175
#define TK_NK_BOOL 176
#define TK_RATIO 177
#define TK_NK_FLOAT 178
#define TK_OUTPUTTYPE 179
#define TK_AGGREGATE 180
#define TK_BUFSIZE 181
#define TK_STREAM 182
#define TK_INTO 183
#define TK_TRIGGER 184
#define TK_AT_ONCE 185
#define TK_WINDOW_CLOSE 186
#define TK_IGNORE 187
#define TK_EXPIRED 188
#define TK_SUBTABLE 189
#define TK_KILL 190
#define TK_CONNECTION 191
#define TK_TRANSACTION 192
#define TK_BALANCE 193
#define TK_VGROUP 194
#define TK_MERGE 195
#define TK_REDISTRIBUTE 196
#define TK_SPLIT 197
#define TK_DELETE 198
#define TK_INSERT 199
#define TK_NULL 200
#define TK_NK_QUESTION 201
#define TK_NK_ARROW 202
#define TK_ROWTS 203
#define TK_TBNAME 204
#define TK_QSTART 205
#define TK_QEND 206
#define TK_QDURATION 207
#define TK_WSTART 208
#define TK_WEND 209
#define TK_WDURATION 210
#define TK_IROWTS 211
#define TK_QTAGS 212
#define TK_CAST 213
#define TK_NOW 214
#define TK_TODAY 215
#define TK_TIMEZONE 216
#define TK_CLIENT_VERSION 217
#define TK_SERVER_VERSION 218
#define TK_SERVER_STATUS 219
#define TK_CURRENT_USER 220
#define TK_COUNT 221
#define TK_LAST_ROW 222
#define TK_CASE 223
#define TK_END 224
#define TK_WHEN 225
#define TK_THEN 226
#define TK_ELSE 227
#define TK_BETWEEN 228
#define TK_IS 229
#define TK_NK_LT 230
#define TK_NK_GT 231
#define TK_NK_LE 232
#define TK_NK_GE 233
#define TK_NK_NE 234
#define TK_MATCH 235
#define TK_NMATCH 236
#define TK_CONTAINS 237
#define TK_IN 238
#define TK_JOIN 239
#define TK_INNER 240
#define TK_SELECT 241
#define TK_DISTINCT 242
#define TK_WHERE 243
#define TK_PARTITION 244
#define TK_BY 245
#define TK_SESSION 246
#define TK_STATE_WINDOW 247
#define TK_SLIDING 248
#define TK_FILL 249
#define TK_VALUE 250
#define TK_NONE 251
#define TK_PREV 252
#define TK_LINEAR 253
#define TK_NEXT 254
#define TK_HAVING 255
#define TK_RANGE 256
#define TK_EVERY 257
#define TK_ORDER 258
#define TK_SLIMIT 259
#define TK_SOFFSET 260
#define TK_LIMIT 261
#define TK_OFFSET 262
#define TK_ASC 263
#define TK_NULLS 264
#define TK_ABORT 265
#define TK_AFTER 266
#define TK_ATTACH 267
#define TK_BEFORE 268
#define TK_BEGIN 269
#define TK_BITAND 270
#define TK_BITNOT 271
#define TK_BITOR 272
#define TK_BLOCKS 273
#define TK_CHANGE 274
#define TK_COMMA 275
#define TK_COMPACT 276
#define TK_CONCAT 277
#define TK_CONFLICT 278
#define TK_COPY 279
#define TK_DEFERRED 280
#define TK_DELIMITERS 281
#define TK_DETACH 282
#define TK_DIVIDE 283
#define TK_DOT 284
#define TK_EACH 285
#define TK_FAIL 286
#define TK_FILE 287
#define TK_FOR 288
#define TK_GLOB 289
#define TK_ID 290
#define TK_IMMEDIATE 291
#define TK_IMPORT 292
#define TK_INITIALLY 293
#define TK_INSTEAD 294
#define TK_ISNULL 295
#define TK_KEY 296
#define TK_QNODES 137
#define TK_FUNCTIONS 138
#define TK_INDEXES 139
#define TK_ACCOUNTS 140
#define TK_APPS 141
#define TK_CONNECTIONS 142
#define TK_LICENCES 143
#define TK_GRANTS 144
#define TK_QUERIES 145
#define TK_SCORES 146
#define TK_TOPICS 147
#define TK_VARIABLES 148
#define TK_BNODES 149
#define TK_SNODES 150
#define TK_CLUSTER 151
#define TK_TRANSACTIONS 152
#define TK_DISTRIBUTED 153
#define TK_CONSUMERS 154
#define TK_SUBSCRIPTIONS 155
#define TK_VNODES 156
#define TK_LIKE 157
#define TK_INDEX 158
#define TK_FUNCTION 159
#define TK_INTERVAL 160
#define TK_TOPIC 161
#define TK_AS 162
#define TK_WITH 163
#define TK_META 164
#define TK_CONSUMER 165
#define TK_GROUP 166
#define TK_DESC 167
#define TK_DESCRIBE 168
#define TK_RESET 169
#define TK_QUERY 170
#define TK_CACHE 171
#define TK_EXPLAIN 172
#define TK_ANALYZE 173
#define TK_VERBOSE 174
#define TK_NK_BOOL 175
#define TK_RATIO 176
#define TK_NK_FLOAT 177
#define TK_OUTPUTTYPE 178
#define TK_AGGREGATE 179
#define TK_BUFSIZE 180
#define TK_STREAM 181
#define TK_INTO 182
#define TK_TRIGGER 183
#define TK_AT_ONCE 184
#define TK_WINDOW_CLOSE 185
#define TK_IGNORE 186
#define TK_EXPIRED 187
#define TK_SUBTABLE 188
#define TK_KILL 189
#define TK_CONNECTION 190
#define TK_TRANSACTION 191
#define TK_BALANCE 192
#define TK_VGROUP 193
#define TK_MERGE 194
#define TK_REDISTRIBUTE 195
#define TK_SPLIT 196
#define TK_DELETE 197
#define TK_INSERT 198
#define TK_NULL 199
#define TK_NK_QUESTION 200
#define TK_NK_ARROW 201
#define TK_ROWTS 202
#define TK_TBNAME 203
#define TK_QSTART 204
#define TK_QEND 205
#define TK_QDURATION 206
#define TK_WSTART 207
#define TK_WEND 208
#define TK_WDURATION 209
#define TK_IROWTS 210
#define TK_QTAGS 211
#define TK_CAST 212
#define TK_NOW 213
#define TK_TODAY 214
#define TK_TIMEZONE 215
#define TK_CLIENT_VERSION 216
#define TK_SERVER_VERSION 217
#define TK_SERVER_STATUS 218
#define TK_CURRENT_USER 219
#define TK_COUNT 220
#define TK_LAST_ROW 221
#define TK_CASE 222
#define TK_END 223
#define TK_WHEN 224
#define TK_THEN 225
#define TK_ELSE 226
#define TK_BETWEEN 227
#define TK_IS 228
#define TK_NK_LT 229
#define TK_NK_GT 230
#define TK_NK_LE 231
#define TK_NK_GE 232
#define TK_NK_NE 233
#define TK_MATCH 234
#define TK_NMATCH 235
#define TK_CONTAINS 236
#define TK_IN 237
#define TK_JOIN 238
#define TK_INNER 239
#define TK_SELECT 240
#define TK_DISTINCT 241
#define TK_WHERE 242
#define TK_PARTITION 243
#define TK_BY 244
#define TK_SESSION 245
#define TK_STATE_WINDOW 246
#define TK_SLIDING 247
#define TK_FILL 248
#define TK_VALUE 249
#define TK_NONE 250
#define TK_PREV 251
#define TK_LINEAR 252
#define TK_NEXT 253
#define TK_HAVING 254
#define TK_RANGE 255
#define TK_EVERY 256
#define TK_ORDER 257
#define TK_SLIMIT 258
#define TK_SOFFSET 259
#define TK_LIMIT 260
#define TK_OFFSET 261
#define TK_ASC 262
#define TK_NULLS 263
#define TK_ABORT 264
#define TK_AFTER 265
#define TK_ATTACH 266
#define TK_BEFORE 267
#define TK_BEGIN 268
#define TK_BITAND 269
#define TK_BITNOT 270
#define TK_BITOR 271
#define TK_BLOCKS 272
#define TK_CHANGE 273
#define TK_COMMA 274
#define TK_COMPACT 275
#define TK_CONCAT 276
#define TK_CONFLICT 277
#define TK_COPY 278
#define TK_DEFERRED 279
#define TK_DELIMITERS 280
#define TK_DETACH 281
#define TK_DIVIDE 282
#define TK_DOT 283
#define TK_EACH 284
#define TK_FAIL 285
#define TK_FILE 286
#define TK_FOR 287
#define TK_GLOB 288
#define TK_ID 289
#define TK_IMMEDIATE 290
#define TK_IMPORT 291
#define TK_INITIALLY 292
#define TK_INSTEAD 293
#define TK_ISNULL 294
#define TK_KEY 295
#define TK_MODULES 296
#define TK_NK_BITNOT 297
#define TK_NK_SEMI 298
#define TK_NOTNULL 299
......
......@@ -99,6 +99,7 @@ typedef struct SScanLogicNode {
int8_t cacheLastMode;
bool hasNormalCols; // neither tag column nor primary key tag column
bool sortPrimaryKey;
bool igLastNull;
} SScanLogicNode;
typedef struct SJoinLogicNode {
......@@ -115,6 +116,7 @@ typedef struct SAggLogicNode {
SNodeList* pGroupKeys;
SNodeList* pAggFuncs;
bool hasLastRow;
bool hasLast;
bool hasTimeLineFunc;
bool onlyHasKeepOrderFunc;
} SAggLogicNode;
......@@ -317,6 +319,7 @@ typedef struct SLastRowScanPhysiNode {
SScanPhysiNode scan;
SNodeList* pGroupTags;
bool groupSort;
bool ignoreNull;
} SLastRowScanPhysiNode;
typedef struct SSystemTableScanPhysiNode {
......
......@@ -291,6 +291,7 @@ typedef struct SSelectStmt {
bool hasTailFunc;
bool hasInterpFunc;
bool hasLastRowFunc;
bool hasLastFunc;
bool hasTimeLineFunc;
bool hasUdaf;
bool hasStateKey;
......
......@@ -82,6 +82,9 @@ typedef struct SRpcInit {
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int32_t idleTime; // milliseconds, 0 means idle timer is disabled
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not
// the following is for client app ecurity only
char *user; // user name
......@@ -115,10 +118,9 @@ typedef struct {
} SRpcCtx;
int32_t rpcInit();
void rpcCleanup();
void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
void rpcCloseImpl(void *);
void *rpcMallocCont(int32_t contLen);
......
......@@ -566,6 +566,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS TAOS_DEF_ERROR_CODE(0, 0x2663)
#define TSDB_CODE_PAR_NOT_SUPPORT_JOIN TAOS_DEF_ERROR_CODE(0, 0x2664)
#define TSDB_CODE_PAR_INVALID_TAGS_PC TAOS_DEF_ERROR_CODE(0, 0x2665)
#define TSDB_CODE_PAR_INVALID_TIMELINE_QUERY TAOS_DEF_ERROR_CODE(0, 0x2666)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner
......
......@@ -71,7 +71,8 @@ static void deregisterRequest(SRequestObj *pRequest) {
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%.2f ms, "
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64
" elapsed:%.2f ms, "
"current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
......@@ -84,7 +85,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
"us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%"PRIx64,
"us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%" PRIx64,
duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
pRequest->metric.planEnd - pRequest->metric.semanticEnd,
......@@ -144,6 +145,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.compressSize = tsCompressMsgSize;
void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) {
tscError("failed to init connection to server");
......
......@@ -1991,6 +1991,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
rpcInit.sessions = 16;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.user = "_dnd";
clientRpc = rpcOpen(&rpcInit);
......
......@@ -224,18 +224,17 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
dInfo(
"vgId:%d, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
" cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
"days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d, wal "
"fsync:%d level:%d retentionPeriod:%d retentionSize:%d rollPeriod:%d segSize:%d, hash method:%d begin:%u end:%u "
"prefix:%d surfix:%d replica:%d selfIndex:%d strict:%d",
req.vgId, req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, req.tsdbPageSize * 1024, req.db, req.dbUid,
req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.isTsma, req.precision, req.compression,
req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
req.replica, req.selfIndex, req.strict);
dInfo("vgId:%d, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d"
", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d strict:%d",
req.vgId, req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, req.tsdbPageSize * 1024, req.db, req.dbUid,
req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.isTsma, req.precision, req.compression,
req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix,
req.hashSuffix, req.replica, req.selfIndex, req.strict);
for (int32_t i = 0; i < req.replica; ++i) {
dInfo("vgId:%d, replica:%d fqdn:%s port:%u", req.vgId, req.replicas[i].id, req.replicas[i].fqdn,
req.replicas[i].port);
......
......@@ -277,6 +277,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) {
......
......@@ -657,7 +657,8 @@ int32_t udfdOpenClientRpc() {
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.parent = &global;
rpcInit.rfp = udfdRpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client");
......
......@@ -383,6 +383,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(groupSort);
CLONE_NODE_LIST_FIELD(pTags);
CLONE_NODE_FIELD(pSubtable);
COPY_SCALAR_FIELD(igLastNull);
return TSDB_CODE_SUCCESS;
}
......
......@@ -95,6 +95,7 @@ SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode
SNode* createColumnNode(SAstCreateContext* pCxt, SToken* pTableAlias, SToken* pColumnName);
SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* pLiteral);
SNode* createIdentifierValueNode(SAstCreateContext* pCxt, SToken* pLiteral);
SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral);
SNode* createDefaultDatabaseCondValue(SAstCreateContext* pCxt);
SNode* createPlaceholderValueNode(SAstCreateContext* pCxt, const SToken* pLiteral);
......
......@@ -393,7 +393,7 @@ cmd ::= SHOW db_name_cond_opt(A) TABLES like_pattern_opt(B).
cmd ::= SHOW db_name_cond_opt(A) STABLES like_pattern_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_STABLES_STMT, A, B, OP_TYPE_LIKE); }
cmd ::= SHOW db_name_cond_opt(A) VGROUPS. { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, A, NULL, OP_TYPE_LIKE); }
cmd ::= SHOW MNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MNODES_STMT); }
cmd ::= SHOW MODULES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MODULES_STMT); }
//cmd ::= SHOW MODULES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MODULES_STMT); }
cmd ::= SHOW QNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_QNODES_STMT); }
cmd ::= SHOW FUNCTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_FUNCTIONS_STMT); }
cmd ::= SHOW INDEXES FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_INDEXES_STMT, B, A, OP_TYPE_EQUAL); }
......@@ -425,15 +425,15 @@ cmd ::= SHOW VNODES NK_INTEGER(A).
cmd ::= SHOW VNODES NK_STRING(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, NULL, createValueNode(pCxt, TSDB_DATA_TYPE_VARCHAR, &A)); }
db_name_cond_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); }
db_name_cond_opt(A) ::= db_name(B) NK_DOT. { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); }
db_name_cond_opt(A) ::= db_name(B) NK_DOT. { A = createIdentifierValueNode(pCxt, &B); }
like_pattern_opt(A) ::= . { A = NULL; }
like_pattern_opt(A) ::= LIKE NK_STRING(B). { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); }
table_name_cond(A) ::= table_name(B). { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); }
table_name_cond(A) ::= table_name(B). { A = createIdentifierValueNode(pCxt, &B); }
from_db_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); }
from_db_opt(A) ::= FROM db_name(B). { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); }
from_db_opt(A) ::= FROM db_name(B). { A = createIdentifierValueNode(pCxt, &B); }
/************************************************ create index ********************************************************/
cmd ::= CREATE SMA INDEX not_exists_opt(D)
......@@ -1043,5 +1043,5 @@ null_ordering_opt(A) ::= NULLS FIRST.
null_ordering_opt(A) ::= NULLS LAST. { A = NULL_ORDER_LAST; }
%fallback ABORT AFTER ATTACH BEFORE BEGIN BITAND BITNOT BITOR BLOCKS CHANGE COMMA COMPACT CONCAT CONFLICT COPY DEFERRED DELIMITERS DETACH DIVIDE DOT EACH END FAIL
FILE FOR GLOB ID IMMEDIATE IMPORT INITIALLY INSTEAD ISNULL KEY NK_BITNOT NK_SEMI NOTNULL OF PLUS PRIVILEGE RAISE REPLACE RESTRICT ROW SEMI STAR STATEMENT STRING
FILE FOR GLOB ID IMMEDIATE IMPORT INITIALLY INSTEAD ISNULL KEY MODULES NK_BITNOT NK_SEMI NOTNULL OF PLUS PRIVILEGE RAISE REPLACE RESTRICT ROW SEMI STAR STATEMENT STRING
TIMES UPDATE VALUES VARIABLE VIEW WAL.
......@@ -310,6 +310,11 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken*
return (SNode*)val;
}
SNode* createIdentifierValueNode(SAstCreateContext* pCxt, SToken* pLiteral) {
trimEscape(pLiteral);
return createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, pLiteral);
}
SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral) {
CHECK_PARSER_STATUS(pCxt);
SValueNode* val = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
......
......@@ -97,7 +97,7 @@ typedef struct SCollectMetaKeyCxt {
typedef struct SCollectMetaKeyFromExprCxt {
SCollectMetaKeyCxt* pComCxt;
bool hasLastRow;
bool hasLastRowOrLast;
int32_t errCode;
} SCollectMetaKeyFromExprCxt;
......@@ -106,7 +106,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt);
static EDealRes collectMetaKeyFromFunction(SCollectMetaKeyFromExprCxt* pCxt, SFunctionNode* pFunc) {
switch (fmGetFuncType(pFunc->functionName)) {
case FUNCTION_TYPE_LAST_ROW:
pCxt->hasLastRow = true;
case FUNCTION_TYPE_LAST:
pCxt->hasLastRowOrLast = true;
break;
case FUNCTION_TYPE_UDF:
pCxt->errCode = reserveUdfInCache(pFunc->functionName, pCxt->pComCxt->pMetaCache);
......@@ -126,6 +127,9 @@ static bool needGetTableIndex(SNode* pStmt) {
}
static int32_t collectMetaKeyFromInsTagsImpl(SCollectMetaKeyCxt* pCxt, SName* pName) {
if (0 == pName->type) {
return TSDB_CODE_SUCCESS;
}
if (TSDB_DB_NAME_T == pName->type) {
return reserveDbVgInfoInCache(pName->acctId, pName->dbname, pCxt->pMetaCache);
}
......@@ -218,9 +222,9 @@ static int32_t reserveDbCfgForLastRow(SCollectMetaKeyCxt* pCxt, SNode* pTable) {
}
static int32_t collectMetaKeyFromSelect(SCollectMetaKeyCxt* pCxt, SSelectStmt* pStmt) {
SCollectMetaKeyFromExprCxt cxt = {.pComCxt = pCxt, .hasLastRow = false, .errCode = TSDB_CODE_SUCCESS};
SCollectMetaKeyFromExprCxt cxt = {.pComCxt = pCxt, .hasLastRowOrLast = false, .errCode = TSDB_CODE_SUCCESS};
nodesWalkSelectStmt(pStmt, SQL_CLAUSE_FROM, collectMetaKeyFromExprImpl, &cxt);
if (TSDB_CODE_SUCCESS == cxt.errCode && cxt.hasLastRow) {
if (TSDB_CODE_SUCCESS == cxt.errCode && cxt.hasLastRowOrLast) {
cxt.errCode = reserveDbCfgForLastRow(pCxt, pStmt->pFromTable);
}
return cxt.errCode;
......
......@@ -721,6 +721,14 @@ static bool isTimeLineQuery(SNode* pStmt) {
}
}
static bool isGlobalTimeLineQuery(SNode* pStmt) {
if (!isTimeLineQuery(pStmt)) {
return false;
}
SSelectStmt* pSelect = (SSelectStmt*)pStmt;
return NULL == pSelect->pPartitionByList || NULL != pSelect->pOrderByList;
}
static bool isPrimaryKeyImpl(SNode* pExpr) {
if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
return (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId);
......@@ -1601,6 +1609,7 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
pSelect->hasTailFunc = pSelect->hasTailFunc ? true : (FUNCTION_TYPE_TAIL == pFunc->funcType);
pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType);
pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType);
pSelect->hasLastFunc = pSelect->hasLastFunc ? true : (FUNCTION_TYPE_LAST == pFunc->funcType);
pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId);
pSelect->hasUdaf = pSelect->hasUdaf ? true : fmIsUserDefinedFunc(pFunc->funcId) && fmIsAggFunc(pFunc->funcId);
pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false;
......@@ -2341,7 +2350,7 @@ static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNo
}
static int32_t setTableCacheLastMode(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!pSelect->hasLastRowFunc || QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable)) {
if ((!pSelect->hasLastRowFunc && !pSelect->hasLastFunc) || QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable)) {
return TSDB_CODE_SUCCESS;
}
......@@ -3012,8 +3021,9 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
return TSDB_CODE_SUCCESS;
}
static int32_t translateIntervalWindow(STranslateContext* pCxt, SSelectStmt* pSelect, SIntervalWindowNode* pInterval) {
int32_t code = checkIntervalWindow(pCxt, pInterval);
static int32_t translateIntervalWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pSelect->pWindow;
int32_t code = checkIntervalWindow(pCxt, pInterval);
if (TSDB_CODE_SUCCESS == code) {
code = translateFill(pCxt, pSelect, pInterval);
}
......@@ -3056,6 +3066,12 @@ static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* p
}
static int32_t translateStateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
!isGlobalTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_QUERY,
"STATE_WINDOW requires valid time series input");
}
SStateWindowNode* pState = (SStateWindowNode*)pSelect->pWindow;
nodesWalkExprPostOrder(pState->pExpr, checkStateExpr, pCxt);
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
......@@ -3064,7 +3080,14 @@ static int32_t translateStateWindow(STranslateContext* pCxt, SSelectStmt* pSelec
return pCxt->errCode;
}
static int32_t translateSessionWindow(STranslateContext* pCxt, SSessionWindowNode* pSession) {
static int32_t translateSessionWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
!isGlobalTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_QUERY,
"SESSION requires valid time series input");
}
SSessionWindowNode* pSession = (SSessionWindowNode*)pSelect->pWindow;
if ('y' == pSession->pGap->unit || 'n' == pSession->pGap->unit || 0 == pSession->pGap->datum.i) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SESSION_GAP);
}
......@@ -3079,9 +3102,9 @@ static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSe
case QUERY_NODE_STATE_WINDOW:
return translateStateWindow(pCxt, pSelect);
case QUERY_NODE_SESSION_WINDOW:
return translateSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow);
return translateSessionWindow(pCxt, pSelect);
case QUERY_NODE_INTERVAL_WINDOW:
return translateIntervalWindow(pCxt, pSelect, (SIntervalWindowNode*)pSelect->pWindow);
return translateIntervalWindow(pCxt, pSelect);
default:
break;
}
......@@ -5156,6 +5179,13 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen
(FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &dropReq);
}
static int32_t checkTopicQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (pSelect->hasAggFuncs || pSelect->hasInterpFunc || pSelect->hasIndefiniteRowsFunc) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TOPIC_QUERY);
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) {
SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
......@@ -5186,6 +5216,9 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
tNameGetFullDbName(&name, pReq->subDbName);
pCxt->pParseCxt->topicQuery = true;
code = translateQuery(pCxt, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = checkTopicQuery(pCxt, (SSelectStmt*)pStmt->pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
}
......
此差异已折叠。
......@@ -130,12 +130,6 @@ TEST_F(ParserShowToUseTest, showMnodes) {
run("SHOW mnodes");
}
TEST_F(ParserShowToUseTest, showModules) {
useDb("root", "test");
run("SHOW modules");
}
TEST_F(ParserShowToUseTest, showQnodes) {
useDb("root", "test");
......
......@@ -514,6 +514,7 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
}
pAgg->hasLastRow = pSelect->hasLastRowFunc;
pAgg->hasLast = pSelect->hasLastFunc;
pAgg->hasTimeLineFunc = pSelect->hasTimeLineFunc;
pAgg->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc;
pAgg->node.groupAction = getGroupAction(pCxt, pSelect);
......
......@@ -124,7 +124,8 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order) {
EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
// *((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType);
// *((bool*)pContext) =
// (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType);
*((bool*)pContext) = true;
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
}
......@@ -2195,14 +2196,16 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
SAggLogicNode* pAgg = (SAggLogicNode*)pNode;
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
if (!pAgg->hasLastRow || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions || 0 == pScan->cacheLastMode ||
IS_TSWINDOW_SPECIFIED(pScan->scanRange)) {
// Only one of LAST and LASTROW can appear
if (pAgg->hasLastRow == pAgg->hasLast || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions ||
0 == pScan->cacheLastMode || IS_TSWINDOW_SPECIFIED(pScan->scanRange)) {
return false;
}
SNode* pFunc = NULL;
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType &&
// FUNCTION_TYPE_LAST != ((SFunctionNode*)pFunc)->funcType &&
FUNCTION_TYPE_SELECT_VALUE != ((SFunctionNode*)pFunc)->funcType &&
FUNCTION_TYPE_GROUP_KEY != ((SFunctionNode*)pFunc)->funcType) {
return false;
......@@ -2222,7 +2225,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
SNode* pNode = NULL;
FOREACH(pNode, pAgg->pAggFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
if (FUNCTION_TYPE_LAST_ROW == pFunc->funcType) {
if (FUNCTION_TYPE_LAST_ROW == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) {
int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName), "_cache_last_row");
pFunc->functionName[len] = '\0';
int32_t code = fmGetFuncInfo(pFunc, NULL, 0);
......@@ -2231,9 +2234,12 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
}
}
}
pAgg->hasLastRow = false;
((SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))->scanType = SCAN_TYPE_LAST_ROW;
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0);
pScan->scanType = SCAN_TYPE_LAST_ROW;
pScan->igLastNull = pAgg->hasLast ? true : false;
pAgg->hasLastRow = false;
pAgg->hasLast = false;
pCxt->optimized = true;
return TSDB_CODE_SUCCESS;
......@@ -2405,8 +2411,8 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize},
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize},
{.pName = "TagScan", .optimizeFunc = tagScanOptimize}
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize},
{.pName = "TagScan", .optimizeFunc = tagScanOptimize}
};
// clang-format on
......
......@@ -521,6 +521,7 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu
}
pScan->groupSort = pScanLogicNode->groupSort;
pScan->ignoreNull = pScanLogicNode->igLastNull;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
......
......@@ -200,15 +200,13 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define transContFromHead(msg) (((char*)msg) + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)
#define transLabel(trans) ((STrans*)trans)->label
void transFreeMsg(void* msg);
//
typedef struct SConnBuffer {
char* buf;
int len;
......@@ -415,6 +413,10 @@ void transThreadOnce();
void transInit();
void transCleanup();
void transFreeMsg(void* msg);
int32_t transCompressMsg(char* msg, int32_t len);
int32_t transDecompressMsg(char** msg, int32_t len);
int32_t transOpenRefMgt(int size, void (*func)(void*));
void transCloseRefMgt(int32_t refMgt);
int64_t transAddExHandle(int32_t refMgt, void* p);
......
......@@ -16,9 +16,7 @@
#ifndef _TD_TRANSPORT_INT_H_
#define _TD_TRANSPORT_INT_H_
#ifdef USE_UV
#include <uv.h>
#endif
#include "lz4.h"
#include "os.h"
#include "taoserror.h"
......@@ -34,8 +32,6 @@
extern "C" {
#endif
#ifdef USE_UV
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
......@@ -51,19 +47,20 @@ typedef struct {
char label[TSDB_LABEL_LEN];
char user[TSDB_UNI_LEN]; // meter ID
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType);
bool (*startTimer)(int32_t code, tmsg_t msgType);
int index;
int index;
void* parent;
void* tcphandle; // returned handle from TCP initialization
int64_t refId;
TdThreadMutex mutex;
} SRpcInfo;
#endif // USE_LIBUV
#ifdef __cplusplus
}
#endif
......
......@@ -45,6 +45,10 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->label) {
tstrncpy(pRpc->label, pInit->label, TSDB_LABEL_LEN);
}
pRpc->compressSize = pInit->compressSize;
pRpc->encryption = pInit->encryption;
// register callback handle
pRpc->cfp = pInit->cfp;
pRpc->retry = pInit->rfp;
......@@ -130,9 +134,6 @@ void* rpcReallocCont(void* ptr, int32_t contLen) {
return st + TRANS_MSG_OVERHEAD;
}
int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
return transSendRequest(shandle, pEpSet, pMsg, NULL);
}
......
......@@ -319,13 +319,18 @@ void cliHandleResp(SCliConn* conn) {
}
STransMsgHead* pHead = NULL;
if (transDumpFromBuffer(&conn->readBuf, (char**)&pHead) <= 0) {
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
if (msgLen <= 0) {
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
return;
}
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
}
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
if (cliRecvReleaseReq(conn, pHead)) {
return;
}
......@@ -374,7 +379,7 @@ void cliHandleResp(SCliConn* conn) {
STraceId* trace = &transMsg.info.traceId;
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, tstrerror(transMsg.code));
TMSG_INFO(pHead->msgType), conn->dst, conn->src, msgLen, tstrerror(transMsg.code));
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
......@@ -553,7 +558,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->list->size >= 50) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = thrd;
arg->param2 = NULL;
STrans* pTransInst = thrd->pTransInst;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
......@@ -772,20 +777,17 @@ void cliSend(SCliConn* pConn) {
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
pHead->traceId = pMsg->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
STraceId* trace = &pMsg->info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen);
if (pHead->persist == 1) {
CONN_SET_PERSIST_BY_APP(pConn);
}
STraceId* trace = &pMsg->info.traceId;
if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) {
tDebug("no avaiable timer, create");
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
tDebug("no available timer, create a timer %p", timer);
uv_timer_init(pThrd->loop, timer);
}
timer->data = pConn;
......@@ -795,6 +797,13 @@ void cliSend(SCliConn* pConn) {
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
}
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
......@@ -1275,17 +1284,13 @@ FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
}
static FORCE_INLINE void doDelayTask(void* param) {
STaskArg* arg = param;
SCliMsg* pMsg = arg->param1;
SCliThrd* pThrd = arg->param2;
cliHandleReq((SCliMsg*)arg->param1, (SCliThrd*)arg->param2);
taosMemoryFree(arg);
cliHandleReq(pMsg, pThrd);
}
static void doCloseIdleConn(void* param) {
STaskArg* arg = param;
SCliConn* conn = arg->param1;
SCliThrd* pThrd = arg->param2;
tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
conn->task = NULL;
cliDestroyConn(conn, true);
......
......@@ -23,52 +23,63 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static int32_t refMgt;
static int32_t instMgt;
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
return false;
// SRpcHead* pHead = rpcHeadFromCont(pCont);
bool succ = false;
int overhead = sizeof(STransCompMsg);
if (!NEEDTO_COMPRESSS_MSG(len)) {
return succ;
}
int32_t transCompressMsg(char* msg, int32_t len) {
int32_t ret = 0;
int compHdr = sizeof(STransCompMsg);
STransMsgHead* pHead = transHeadFromCont(msg);
char* buf = taosMemoryMalloc(len + overhead + 8); // 8 extra bytes
char* buf = taosMemoryMalloc(len + compHdr + 8); // 8 extra bytes
if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
*flen = len;
return succ;
ret = len;
return ret;
}
int32_t clen = LZ4_compress_default(msg, buf, len, len + overhead);
tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, overhead);
int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
/*
* only the compressed size is less than the value of contLen - overhead, the compression is applied
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/
if (clen > 0 && clen < len - overhead) {
if (clen > 0 && clen < len - compHdr) {
STransCompMsg* pComp = (STransCompMsg*)msg;
pComp->reserved = 0;
pComp->contLen = htonl(len);
memcpy(msg + overhead, buf, clen);
memcpy(msg + compHdr, buf, clen);
tDebug("compress rpc msg, before:%d, after:%d", len, clen);
*flen = clen + overhead;
succ = true;
ret = clen + compHdr;
pHead->comp = 1;
} else {
*flen = len;
succ = false;
ret = len;
pHead->comp = 0;
}
taosMemoryFree(buf);
return succ;
return ret;
}
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) {
// impl later
return false;
STransCompMsg* pComp = (STransCompMsg*)msg;
int32_t transDecompressMsg(char** msg, int32_t len) {
STransMsgHead* pHead = (STransMsgHead*)(*msg);
if (pHead->comp == 0) return 0;
char* pCont = transContFromHead(pHead);
STransCompMsg* pComp = (STransCompMsg*)pCont;
int32_t oriLen = htonl(pComp->contLen);
char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
STransMsgHead* pNewHead = (STransMsgHead*)buf;
int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), pNewHead->content,
len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
int overhead = sizeof(STransCompMsg);
int clen = 0;
return false;
pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead));
taosMemoryFree(pHead);
*msg = buf;
if (decompLen != oriLen) {
return -1;
}
return 0;
}
void transFreeMsg(void* msg) {
......
......@@ -186,16 +186,22 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
static bool uvHandleReq(SSvrConn* pConn) {
STrans* pTransInst = pConn->pTransInst;
STransMsgHead* msg = NULL;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
STransMsgHead* pHead = NULL;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead);
if (msgLen <= 0) {
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
return false;
}
STransMsgHead* pHead = (STransMsgHead*)msg;
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
return false;
}
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
memcpy(pConn->user, pHead->user, strlen(pHead->user));
if (uvRecvReleaseReq(pConn, pHead)) {
......@@ -229,10 +235,10 @@ static bool uvHandleReq(SSvrConn* pConn) {
transRefSrvHandle(pConn);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen);
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen, pHead->noResp, transMsg.code);
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, transMsg.code);
}
// pHead->noResp = 1,
......@@ -399,17 +405,22 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->release = smsg->type == Release ? 1 : 0;
pHead->code = htonl(pMsg->code);
pHead->msgLen = htonl(pMsg->contLen + sizeof(STransMsgHead));
char* msg = (char*)pHead;
int32_t len = transMsgLenFromCont(pMsg->contLen);
STrans* pTransInst = pConn->pTransInst;
STrans* pTransInst = pConn->pTransInst;
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)len);
}
STraceId* trace = &pMsg->info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen);
pHead->msgLen = htonl(len);
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len);
wb->base = msg;
wb->base = (char*)pHead;
wb->len = len;
}
......@@ -1160,6 +1171,11 @@ _return2:
return -1;
}
int transSendResponse(const STransMsg* msg) {
if (msg->info.noResp) {
rpcFreeCont(msg->pCont);
tTrace("no need send resp");
return 0;
}
SExHandle* exh = msg->info.handle;
int64_t refId = msg->info.refId;
ASYNC_CHECK_HANDLE(exh, refId);
......@@ -1198,6 +1214,8 @@ int transRegisterMsg(const STransMsg* msg) {
ASYNC_CHECK_HANDLE(exh, refId);
STransMsg tmsg = *msg;
tmsg.info.noResp = 1;
tmsg.info.refId = refId;
SWorkThrd* pThrd = exh->pThrd;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册