提交 d2e2a658 编写于 作者: S Steven Li

Merge remote-tracking branch 'origin/3.0' into enh/crash_gen

...@@ -22,6 +22,7 @@ mac/ ...@@ -22,6 +22,7 @@ mac/
.mypy_cache .mypy_cache
*.tmp *.tmp
*.swp *.swp
*.swo
*.orig *.orig
src/connector/nodejs/node_modules/ src/connector/nodejs/node_modules/
src/connector/nodejs/out/ src/connector/nodejs/out/
......
此差异已折叠。
...@@ -3,6 +3,7 @@ title: 立即开始 ...@@ -3,6 +3,7 @@ title: 立即开始
description: '快速设置 TDengine 环境并体验其高效写入和查询' description: '快速设置 TDengine 环境并体验其高效写入和查询'
--- ---
TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](/reference/taosadapter) 提供 [RESTful 接口](/reference/rest-api)
本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。 本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。
......
...@@ -153,11 +153,10 @@ typedef struct SQueryTableDataCond { ...@@ -153,11 +153,10 @@ typedef struct SQueryTableDataCond {
int32_t order; // desc|asc order to iterate the data block int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols; int32_t numOfCols;
SColumnInfo* colList; SColumnInfo* colList;
int32_t type; // data block load type: int32_t type; // data block load type:
// int32_t numOfTWindows; STimeWindow twindows;
STimeWindow twindows; int64_t startVersion;
int64_t startVersion; int64_t endVersion;
int64_t endVersion;
} SQueryTableDataCond; } SQueryTableDataCond;
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
......
...@@ -152,7 +152,10 @@ void taosCfgDynamicOptions(const char *option, const char *value); ...@@ -152,7 +152,10 @@ void taosCfgDynamicOptions(const char *option, const char *value);
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary); void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary);
struct SConfig *taosGetCfg(); struct SConfig *taosGetCfg();
int32_t taosSetCfg(SConfig *pCfg, char* name);
void taosSetAllDebugFlag(int32_t flag);
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal);
int32_t taosSetCfg(SConfig *pCfg, char *name);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -748,6 +748,10 @@ typedef struct { ...@@ -748,6 +748,10 @@ typedef struct {
int8_t ignoreExist; int8_t ignoreExist;
int32_t numOfRetensions; int32_t numOfRetensions;
SArray* pRetensions; // SRetention SArray* pRetensions; // SRetention
int32_t walRetentionPeriod;
int32_t walRetentionSize;
int32_t walRollPeriod;
int32_t walSegmentSize;
} SCreateDbReq; } SCreateDbReq;
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq); int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
...@@ -1977,7 +1981,7 @@ typedef struct SVCreateTbReq { ...@@ -1977,7 +1981,7 @@ typedef struct SVCreateTbReq {
union { union {
struct { struct {
char* name; // super table name char* name; // super table name
uint8_t tagNum; uint8_t tagNum;
tb_uid_t suid; tb_uid_t suid;
SArray* tagName; SArray* tagName;
uint8_t* pTag; uint8_t* pTag;
......
...@@ -16,261 +16,265 @@ ...@@ -16,261 +16,265 @@
#ifndef _TD_COMMON_TOKEN_H_ #ifndef _TD_COMMON_TOKEN_H_
#define _TD_COMMON_TOKEN_H_ #define _TD_COMMON_TOKEN_H_
#define TK_OR 1 #define TK_OR 1
#define TK_AND 2 #define TK_AND 2
#define TK_UNION 3 #define TK_UNION 3
#define TK_ALL 4 #define TK_ALL 4
#define TK_MINUS 5 #define TK_MINUS 5
#define TK_EXCEPT 6 #define TK_EXCEPT 6
#define TK_INTERSECT 7 #define TK_INTERSECT 7
#define TK_NK_BITAND 8 #define TK_NK_BITAND 8
#define TK_NK_BITOR 9 #define TK_NK_BITOR 9
#define TK_NK_LSHIFT 10 #define TK_NK_LSHIFT 10
#define TK_NK_RSHIFT 11 #define TK_NK_RSHIFT 11
#define TK_NK_PLUS 12 #define TK_NK_PLUS 12
#define TK_NK_MINUS 13 #define TK_NK_MINUS 13
#define TK_NK_STAR 14 #define TK_NK_STAR 14
#define TK_NK_SLASH 15 #define TK_NK_SLASH 15
#define TK_NK_REM 16 #define TK_NK_REM 16
#define TK_NK_CONCAT 17 #define TK_NK_CONCAT 17
#define TK_CREATE 18 #define TK_CREATE 18
#define TK_ACCOUNT 19 #define TK_ACCOUNT 19
#define TK_NK_ID 20 #define TK_NK_ID 20
#define TK_PASS 21 #define TK_PASS 21
#define TK_NK_STRING 22 #define TK_NK_STRING 22
#define TK_ALTER 23 #define TK_ALTER 23
#define TK_PPS 24 #define TK_PPS 24
#define TK_TSERIES 25 #define TK_TSERIES 25
#define TK_STORAGE 26 #define TK_STORAGE 26
#define TK_STREAMS 27 #define TK_STREAMS 27
#define TK_QTIME 28 #define TK_QTIME 28
#define TK_DBS 29 #define TK_DBS 29
#define TK_USERS 30 #define TK_USERS 30
#define TK_CONNS 31 #define TK_CONNS 31
#define TK_STATE 32 #define TK_STATE 32
#define TK_USER 33 #define TK_USER 33
#define TK_ENABLE 34 #define TK_ENABLE 34
#define TK_NK_INTEGER 35 #define TK_NK_INTEGER 35
#define TK_SYSINFO 36 #define TK_SYSINFO 36
#define TK_DROP 37 #define TK_DROP 37
#define TK_GRANT 38 #define TK_GRANT 38
#define TK_ON 39 #define TK_ON 39
#define TK_TO 40 #define TK_TO 40
#define TK_REVOKE 41 #define TK_REVOKE 41
#define TK_FROM 42 #define TK_FROM 42
#define TK_NK_COMMA 43 #define TK_NK_COMMA 43
#define TK_READ 44 #define TK_READ 44
#define TK_WRITE 45 #define TK_WRITE 45
#define TK_NK_DOT 46 #define TK_NK_DOT 46
#define TK_DNODE 47 #define TK_DNODE 47
#define TK_PORT 48 #define TK_PORT 48
#define TK_DNODES 49 #define TK_DNODES 49
#define TK_NK_IPTOKEN 50 #define TK_NK_IPTOKEN 50
#define TK_LOCAL 51 #define TK_LOCAL 51
#define TK_QNODE 52 #define TK_QNODE 52
#define TK_BNODE 53 #define TK_BNODE 53
#define TK_SNODE 54 #define TK_SNODE 54
#define TK_MNODE 55 #define TK_MNODE 55
#define TK_DATABASE 56 #define TK_DATABASE 56
#define TK_USE 57 #define TK_USE 57
#define TK_FLUSH 58 #define TK_FLUSH 58
#define TK_TRIM 59 #define TK_TRIM 59
#define TK_IF 60 #define TK_IF 60
#define TK_NOT 61 #define TK_NOT 61
#define TK_EXISTS 62 #define TK_EXISTS 62
#define TK_BUFFER 63 #define TK_BUFFER 63
#define TK_CACHEMODEL 64 #define TK_CACHEMODEL 64
#define TK_CACHESIZE 65 #define TK_CACHESIZE 65
#define TK_COMP 66 #define TK_COMP 66
#define TK_DURATION 67 #define TK_DURATION 67
#define TK_NK_VARIABLE 68 #define TK_NK_VARIABLE 68
#define TK_FSYNC 69 #define TK_FSYNC 69
#define TK_MAXROWS 70 #define TK_MAXROWS 70
#define TK_MINROWS 71 #define TK_MINROWS 71
#define TK_KEEP 72 #define TK_KEEP 72
#define TK_PAGES 73 #define TK_PAGES 73
#define TK_PAGESIZE 74 #define TK_PAGESIZE 74
#define TK_PRECISION 75 #define TK_PRECISION 75
#define TK_REPLICA 76 #define TK_REPLICA 76
#define TK_STRICT 77 #define TK_STRICT 77
#define TK_WAL 78 #define TK_WAL 78
#define TK_VGROUPS 79 #define TK_VGROUPS 79
#define TK_SINGLE_STABLE 80 #define TK_SINGLE_STABLE 80
#define TK_RETENTIONS 81 #define TK_RETENTIONS 81
#define TK_SCHEMALESS 82 #define TK_SCHEMALESS 82
#define TK_NK_COLON 83 #define TK_WAL_RETENTION_PERIOD 83
#define TK_TABLE 84 #define TK_WAL_RETENTION_SIZE 84
#define TK_NK_LP 85 #define TK_WAL_ROLL_PERIOD 85
#define TK_NK_RP 86 #define TK_WAL_SEGMENT_SIZE 86
#define TK_STABLE 87 #define TK_NK_COLON 87
#define TK_ADD 88 #define TK_TABLE 88
#define TK_COLUMN 89 #define TK_NK_LP 89
#define TK_MODIFY 90 #define TK_NK_RP 90
#define TK_RENAME 91 #define TK_STABLE 91
#define TK_TAG 92 #define TK_ADD 92
#define TK_SET 93 #define TK_COLUMN 93
#define TK_NK_EQ 94 #define TK_MODIFY 94
#define TK_USING 95 #define TK_RENAME 95
#define TK_TAGS 96 #define TK_TAG 96
#define TK_COMMENT 97 #define TK_SET 97
#define TK_BOOL 98 #define TK_NK_EQ 98
#define TK_TINYINT 99 #define TK_USING 99
#define TK_SMALLINT 100 #define TK_TAGS 100
#define TK_INT 101 #define TK_COMMENT 101
#define TK_INTEGER 102 #define TK_BOOL 102
#define TK_BIGINT 103 #define TK_TINYINT 103
#define TK_FLOAT 104 #define TK_SMALLINT 104
#define TK_DOUBLE 105 #define TK_INT 105
#define TK_BINARY 106 #define TK_INTEGER 106
#define TK_TIMESTAMP 107 #define TK_BIGINT 107
#define TK_NCHAR 108 #define TK_FLOAT 108
#define TK_UNSIGNED 109 #define TK_DOUBLE 109
#define TK_JSON 110 #define TK_BINARY 110
#define TK_VARCHAR 111 #define TK_TIMESTAMP 111
#define TK_MEDIUMBLOB 112 #define TK_NCHAR 112
#define TK_BLOB 113 #define TK_UNSIGNED 113
#define TK_VARBINARY 114 #define TK_JSON 114
#define TK_DECIMAL 115 #define TK_VARCHAR 115
#define TK_MAX_DELAY 116 #define TK_MEDIUMBLOB 116
#define TK_WATERMARK 117 #define TK_BLOB 117
#define TK_ROLLUP 118 #define TK_VARBINARY 118
#define TK_TTL 119 #define TK_DECIMAL 119
#define TK_SMA 120 #define TK_MAX_DELAY 120
#define TK_FIRST 121 #define TK_WATERMARK 121
#define TK_LAST 122 #define TK_ROLLUP 122
#define TK_SHOW 123 #define TK_TTL 123
#define TK_DATABASES 124 #define TK_SMA 124
#define TK_TABLES 125 #define TK_FIRST 125
#define TK_STABLES 126 #define TK_LAST 126
#define TK_MNODES 127 #define TK_SHOW 127
#define TK_MODULES 128 #define TK_DATABASES 128
#define TK_QNODES 129 #define TK_TABLES 129
#define TK_FUNCTIONS 130 #define TK_STABLES 130
#define TK_INDEXES 131 #define TK_MNODES 131
#define TK_ACCOUNTS 132 #define TK_MODULES 132
#define TK_APPS 133 #define TK_QNODES 133
#define TK_CONNECTIONS 134 #define TK_FUNCTIONS 134
#define TK_LICENCE 135 #define TK_INDEXES 135
#define TK_GRANTS 136 #define TK_ACCOUNTS 136
#define TK_QUERIES 137 #define TK_APPS 137
#define TK_SCORES 138 #define TK_CONNECTIONS 138
#define TK_TOPICS 139 #define TK_LICENCE 139
#define TK_VARIABLES 140 #define TK_GRANTS 140
#define TK_BNODES 141 #define TK_QUERIES 141
#define TK_SNODES 142 #define TK_SCORES 142
#define TK_CLUSTER 143 #define TK_TOPICS 143
#define TK_TRANSACTIONS 144 #define TK_VARIABLES 144
#define TK_DISTRIBUTED 145 #define TK_BNODES 145
#define TK_CONSUMERS 146 #define TK_SNODES 146
#define TK_SUBSCRIPTIONS 147 #define TK_CLUSTER 147
#define TK_LIKE 148 #define TK_TRANSACTIONS 148
#define TK_INDEX 149 #define TK_DISTRIBUTED 149
#define TK_FUNCTION 150 #define TK_CONSUMERS 150
#define TK_INTERVAL 151 #define TK_SUBSCRIPTIONS 151
#define TK_TOPIC 152 #define TK_LIKE 152
#define TK_AS 153 #define TK_INDEX 153
#define TK_WITH 154 #define TK_FUNCTION 154
#define TK_META 155 #define TK_INTERVAL 155
#define TK_CONSUMER 156 #define TK_TOPIC 156
#define TK_GROUP 157 #define TK_AS 157
#define TK_DESC 158 #define TK_WITH 158
#define TK_DESCRIBE 159 #define TK_META 159
#define TK_RESET 160 #define TK_CONSUMER 160
#define TK_QUERY 161 #define TK_GROUP 161
#define TK_CACHE 162 #define TK_DESC 162
#define TK_EXPLAIN 163 #define TK_DESCRIBE 163
#define TK_ANALYZE 164 #define TK_RESET 164
#define TK_VERBOSE 165 #define TK_QUERY 165
#define TK_NK_BOOL 166 #define TK_CACHE 166
#define TK_RATIO 167 #define TK_EXPLAIN 167
#define TK_NK_FLOAT 168 #define TK_ANALYZE 168
#define TK_COMPACT 169 #define TK_VERBOSE 169
#define TK_VNODES 170 #define TK_NK_BOOL 170
#define TK_IN 171 #define TK_RATIO 171
#define TK_OUTPUTTYPE 172 #define TK_NK_FLOAT 172
#define TK_AGGREGATE 173 #define TK_COMPACT 173
#define TK_BUFSIZE 174 #define TK_VNODES 174
#define TK_STREAM 175 #define TK_IN 175
#define TK_INTO 176 #define TK_OUTPUTTYPE 176
#define TK_TRIGGER 177 #define TK_AGGREGATE 177
#define TK_AT_ONCE 178 #define TK_BUFSIZE 178
#define TK_WINDOW_CLOSE 179 #define TK_STREAM 179
#define TK_IGNORE 180 #define TK_INTO 180
#define TK_EXPIRED 181 #define TK_TRIGGER 181
#define TK_KILL 182 #define TK_AT_ONCE 182
#define TK_CONNECTION 183 #define TK_WINDOW_CLOSE 183
#define TK_TRANSACTION 184 #define TK_IGNORE 184
#define TK_BALANCE 185 #define TK_EXPIRED 185
#define TK_VGROUP 186 #define TK_KILL 186
#define TK_MERGE 187 #define TK_CONNECTION 187
#define TK_REDISTRIBUTE 188 #define TK_TRANSACTION 188
#define TK_SPLIT 189 #define TK_BALANCE 189
#define TK_SYNCDB 190 #define TK_VGROUP 190
#define TK_DELETE 191 #define TK_MERGE 191
#define TK_INSERT 192 #define TK_REDISTRIBUTE 192
#define TK_NULL 193 #define TK_SPLIT 193
#define TK_NK_QUESTION 194 #define TK_SYNCDB 194
#define TK_NK_ARROW 195 #define TK_DELETE 195
#define TK_ROWTS 196 #define TK_INSERT 196
#define TK_TBNAME 197 #define TK_NULL 197
#define TK_QSTART 198 #define TK_NK_QUESTION 198
#define TK_QEND 199 #define TK_NK_ARROW 199
#define TK_QDURATION 200 #define TK_ROWTS 200
#define TK_WSTART 201 #define TK_TBNAME 201
#define TK_WEND 202 #define TK_QSTART 202
#define TK_WDURATION 203 #define TK_QEND 203
#define TK_CAST 204 #define TK_QDURATION 204
#define TK_NOW 205 #define TK_WSTART 205
#define TK_TODAY 206 #define TK_WEND 206
#define TK_TIMEZONE 207 #define TK_WDURATION 207
#define TK_CLIENT_VERSION 208 #define TK_CAST 208
#define TK_SERVER_VERSION 209 #define TK_NOW 209
#define TK_SERVER_STATUS 210 #define TK_TODAY 210
#define TK_CURRENT_USER 211 #define TK_TIMEZONE 211
#define TK_COUNT 212 #define TK_CLIENT_VERSION 212
#define TK_LAST_ROW 213 #define TK_SERVER_VERSION 213
#define TK_BETWEEN 214 #define TK_SERVER_STATUS 214
#define TK_IS 215 #define TK_CURRENT_USER 215
#define TK_NK_LT 216 #define TK_COUNT 216
#define TK_NK_GT 217 #define TK_LAST_ROW 217
#define TK_NK_LE 218 #define TK_BETWEEN 218
#define TK_NK_GE 219 #define TK_IS 219
#define TK_NK_NE 220 #define TK_NK_LT 220
#define TK_MATCH 221 #define TK_NK_GT 221
#define TK_NMATCH 222 #define TK_NK_LE 222
#define TK_CONTAINS 223 #define TK_NK_GE 223
#define TK_JOIN 224 #define TK_NK_NE 224
#define TK_INNER 225 #define TK_MATCH 225
#define TK_SELECT 226 #define TK_NMATCH 226
#define TK_DISTINCT 227 #define TK_CONTAINS 227
#define TK_WHERE 228 #define TK_JOIN 228
#define TK_PARTITION 229 #define TK_INNER 229
#define TK_BY 230 #define TK_SELECT 230
#define TK_SESSION 231 #define TK_DISTINCT 231
#define TK_STATE_WINDOW 232 #define TK_WHERE 232
#define TK_SLIDING 233 #define TK_PARTITION 233
#define TK_FILL 234 #define TK_BY 234
#define TK_VALUE 235 #define TK_SESSION 235
#define TK_NONE 236 #define TK_STATE_WINDOW 236
#define TK_PREV 237 #define TK_SLIDING 237
#define TK_LINEAR 238 #define TK_FILL 238
#define TK_NEXT 239 #define TK_VALUE 239
#define TK_HAVING 240 #define TK_NONE 240
#define TK_RANGE 241 #define TK_PREV 241
#define TK_EVERY 242 #define TK_LINEAR 242
#define TK_ORDER 243 #define TK_NEXT 243
#define TK_SLIMIT 244 #define TK_HAVING 244
#define TK_SOFFSET 245 #define TK_RANGE 245
#define TK_LIMIT 246 #define TK_EVERY 246
#define TK_OFFSET 247 #define TK_ORDER 247
#define TK_ASC 248 #define TK_SLIMIT 248
#define TK_NULLS 249 #define TK_SOFFSET 249
#define TK_ID 250 #define TK_LIMIT 250
#define TK_NK_BITNOT 251 #define TK_OFFSET 251
#define TK_VALUES 252 #define TK_ASC 252
#define TK_IMPORT 253 #define TK_NULLS 253
#define TK_NK_SEMI 254 #define TK_ID 254
#define TK_FILE 255 #define TK_NK_BITNOT 255
#define TK_VALUES 256
#define TK_IMPORT 257
#define TK_NK_SEMI 258
#define TK_FILE 259
#define TK_NK_SPACE 300 #define TK_NK_SPACE 300
#define TK_NK_COMMENT 301 #define TK_NK_COMMENT 301
......
...@@ -74,6 +74,10 @@ typedef struct SDatabaseOptions { ...@@ -74,6 +74,10 @@ typedef struct SDatabaseOptions {
int8_t singleStable; int8_t singleStable;
SNodeList* pRetentions; SNodeList* pRetentions;
int8_t schemaless; int8_t schemaless;
int32_t walRetentionPeriod;
int32_t walRetentionSize;
int32_t walRollPeriod;
int32_t walSegmentSize;
} SDatabaseOptions; } SDatabaseOptions;
typedef struct SCreateDatabaseStmt { typedef struct SCreateDatabaseStmt {
......
...@@ -104,6 +104,7 @@ typedef struct SJoinLogicNode { ...@@ -104,6 +104,7 @@ typedef struct SJoinLogicNode {
SNode* pMergeCondition; SNode* pMergeCondition;
SNode* pOnConditions; SNode* pOnConditions;
bool isSingleTableJoin; bool isSingleTableJoin;
EOrder inputTsOrder;
} SJoinLogicNode; } SJoinLogicNode;
typedef struct SAggLogicNode { typedef struct SAggLogicNode {
...@@ -201,6 +202,7 @@ typedef struct SWindowLogicNode { ...@@ -201,6 +202,7 @@ typedef struct SWindowLogicNode {
int64_t watermark; int64_t watermark;
int8_t igExpired; int8_t igExpired;
EWindowAlgorithm windowAlgo; EWindowAlgorithm windowAlgo;
EOrder inputTsOrder;
} SWindowLogicNode; } SWindowLogicNode;
typedef struct SFillLogicNode { typedef struct SFillLogicNode {
...@@ -356,15 +358,14 @@ typedef struct SInterpFuncPhysiNode { ...@@ -356,15 +358,14 @@ typedef struct SInterpFuncPhysiNode {
SNode* pTimeSeries; // SColumnNode SNode* pTimeSeries; // SColumnNode
} SInterpFuncPhysiNode; } SInterpFuncPhysiNode;
typedef struct SJoinPhysiNode { typedef struct SSortMergeJoinPhysiNode {
SPhysiNode node; SPhysiNode node;
EJoinType joinType; EJoinType joinType;
SNode* pMergeCondition; SNode* pMergeCondition;
SNode* pOnConditions; SNode* pOnConditions;
SNodeList* pTargets; SNodeList* pTargets;
} SJoinPhysiNode; EOrder inputTsOrder;
} SSortMergeJoinPhysiNode;
typedef SJoinPhysiNode SSortMergeJoinPhysiNode;
typedef struct SAggPhysiNode { typedef struct SAggPhysiNode {
SPhysiNode node; SPhysiNode node;
......
...@@ -255,6 +255,7 @@ typedef struct SSelectStmt { ...@@ -255,6 +255,7 @@ typedef struct SSelectStmt {
int32_t selectFuncNum; int32_t selectFuncNum;
bool isEmptyResult; bool isEmptyResult;
bool isTimeLineResult; bool isTimeLineResult;
bool isSubquery;
bool hasAggFuncs; bool hasAggFuncs;
bool hasRepeatScanFuncs; bool hasRepeatScanFuncs;
bool hasIndefiniteRowsFunc; bool hasIndefiniteRowsFunc;
......
...@@ -114,21 +114,30 @@ typedef struct SWal { ...@@ -114,21 +114,30 @@ typedef struct SWal {
int64_t refId; int64_t refId;
TdThreadMutex mutex; TdThreadMutex mutex;
// ref // ref
SHashObj *pRefHash; // ref -> SWalRef SHashObj *pRefHash; // refId -> SWalRef
// path // path
char path[WAL_PATH_LEN]; char path[WAL_PATH_LEN];
// reusable write head // reusable write head
SWalCkHead writeHead; SWalCkHead writeHead;
} SWal; // WAL HANDLE } SWal;
typedef struct {
int64_t refId;
int64_t refVer;
int64_t refFile;
SWal *pWal;
} SWalRef;
typedef struct { typedef struct {
int8_t scanUncommited; int8_t scanUncommited;
int8_t scanNotApplied;
int8_t scanMeta; int8_t scanMeta;
int8_t enableRef; int8_t enableRef;
} SWalFilterCond; } SWalFilterCond;
typedef struct { typedef struct {
SWal *pWal; SWal *pWal;
int64_t readerId;
TdFilePtr pLogFile; TdFilePtr pLogFile;
TdFilePtr pIdxFile; TdFilePtr pIdxFile;
int64_t curFileFirstVer; int64_t curFileFirstVer;
...@@ -138,7 +147,8 @@ typedef struct { ...@@ -138,7 +147,8 @@ typedef struct {
int8_t curStopped; int8_t curStopped;
TdThreadMutex mutex; TdThreadMutex mutex;
SWalFilterCond cond; SWalFilterCond cond;
SWalCkHead *pHead; // TODO remove it
SWalCkHead *pHead;
} SWalReader; } SWalReader;
// module initialization // module initialization
...@@ -157,11 +167,7 @@ int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_ ...@@ -157,11 +167,7 @@ int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
int32_t bodyLen); int32_t bodyLen);
// This interface assign version automatically and return to caller. // Assign version automatically and return to caller,
// When using this interface with concurrent writes,
// wal will write all logs atomically,
// but not sure which one will be actually write first,
// and then the unique index of successful writen is returned.
// -1 will be returned for failed writes // -1 will be returned for failed writes
int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen); int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
...@@ -191,17 +197,15 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); ...@@ -191,17 +197,15 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
typedef struct {
int64_t refId; SWalRef *walRefCommittedVer(SWal *);
int64_t ver;
} SWalRef;
SWalRef *walOpenRef(SWal *); SWalRef *walOpenRef(SWal *);
void walCloseRef(SWalRef *); void walCloseRef(SWal *pWal, int64_t refId);
int32_t walRefVer(SWalRef *, int64_t ver); int32_t walRefVer(SWalRef *, int64_t ver);
int32_t walUnrefVer(SWal *); void walUnrefVer(SWalRef *);
// help function for raft // helper function for raft
bool walLogExist(SWal *, int64_t ver); bool walLogExist(SWal *, int64_t ver);
bool walIsEmpty(SWal *); bool walIsEmpty(SWal *);
......
...@@ -358,6 +358,15 @@ typedef enum ELogicConditionType { ...@@ -358,6 +358,15 @@ typedef enum ELogicConditionType {
#define TSDB_DB_SCHEMALESS_OFF 0 #define TSDB_DB_SCHEMALESS_OFF 0
#define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF #define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF
#define TSDB_DB_MIN_WAL_RETENTION_PERIOD -1
#define TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD 0
#define TSDB_DB_MIN_WAL_RETENTION_SIZE -1
#define TSDB_DEFAULT_DB_WAL_RETENTION_SIZE 0
#define TSDB_DB_MIN_WAL_ROLL_PERIOD 0
#define TSDB_DEFAULT_DB_WAL_ROLL_PERIOD 0
#define TSDB_DB_MIN_WAL_SEGMENT_SIZE 0
#define TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE 0
#define TSDB_MIN_ROLLUP_MAX_DELAY 1 // unit millisecond #define TSDB_MIN_ROLLUP_MAX_DELAY 1 // unit millisecond
#define TSDB_MAX_ROLLUP_MAX_DELAY (15 * 60 * 1000) #define TSDB_MAX_ROLLUP_MAX_DELAY (15 * 60 * 1000)
#define TSDB_MIN_ROLLUP_WATERMARK 0 // unit millisecond #define TSDB_MIN_ROLLUP_WATERMARK 0 // unit millisecond
......
...@@ -67,7 +67,6 @@ extern int32_t idxDebugFlag; ...@@ -67,7 +67,6 @@ extern int32_t idxDebugFlag;
int32_t taosInitLog(const char *logName, int32_t maxFiles); int32_t taosInitLog(const char *logName, int32_t maxFiles);
void taosCloseLog(); void taosCloseLog();
void taosResetLog(); void taosResetLog();
void taosSetAllDebugFlag(int32_t flag);
void taosDumpData(uint8_t *msg, int32_t len); void taosDumpData(uint8_t *msg, int32_t len);
void taosPrintLog(const char *flags, ELogLevel level, int32_t dflag, const char *format, ...) void taosPrintLog(const char *flags, ELogLevel level, int32_t dflag, const char *format, ...)
......
...@@ -2019,7 +2019,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, ...@@ -2019,7 +2019,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
} }
if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) || if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
('0' <= *(tbList + i) && '9' >= *(tbList + i))) { ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
if (vLen[vIdx] > 0) { if (vLen[vIdx] > 0) {
goto _return; goto _return;
} }
......
...@@ -973,7 +973,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -973,7 +973,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, NULL, NULL); code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, pRequest->body.param, NULL);
if (code) { if (code) {
goto _return; goto _return;
} }
......
...@@ -1763,9 +1763,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -1763,9 +1763,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
int32_t len = 0; int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, "===stream===%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld|\n", flag, len += snprintf(dumpBuf + len, size - len, "===stream===%s |block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d\n", flag,
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
pDataBlock->info.uid); pDataBlock->info.uid, pDataBlock->info.rows);
if (len >= size - 1) return dumpBuf; if (len >= size - 1) return dumpBuf;
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
...@@ -1878,7 +1878,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1878,7 +1878,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
msgLen += sizeof(SSubmitBlk); msgLen += sizeof(SSubmitBlk);
int32_t dataLen = 0; int32_t dataLen = 0;
for (int32_t j = 0; j < rows; ++j) { // iterate by row for (int32_t j = 0; j < rows; ++j) { // iterate by row
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen + dataLen)); // set row buf
bool isStartKey = false; bool isStartKey = false;
int32_t offset = 0; int32_t offset = 0;
for (int32_t k = 0; k < colNum; ++k) { // iterate by column for (int32_t k = 0; k < colNum; ++k) { // iterate by column
......
...@@ -1143,6 +1143,10 @@ void taosCfgDynamicOptions(const char *option, const char *value) { ...@@ -1143,6 +1143,10 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
int32_t monitor = atoi(value); int32_t monitor = atoi(value);
uInfo("monitor set from %d to %d", tsEnableMonitor, monitor); uInfo("monitor set from %d to %d", tsEnableMonitor, monitor);
tsEnableMonitor = monitor; tsEnableMonitor = monitor;
SConfigItem *pItem = cfgGetItem(tsCfg, "monitor");
if (pItem != NULL) {
pItem->bval = tsEnableMonitor;
}
return; return;
} }
...@@ -1166,8 +1170,39 @@ void taosCfgDynamicOptions(const char *option, const char *value) { ...@@ -1166,8 +1170,39 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
int32_t flag = atoi(value); int32_t flag = atoi(value);
uInfo("%s set from %d to %d", optName, *optionVars[d], flag); uInfo("%s set from %d to %d", optName, *optionVars[d], flag);
*optionVars[d] = flag; *optionVars[d] = flag;
taosSetDebugFlag(optionVars[d], optName, flag);
return; return;
} }
uError("failed to cfg dynamic option:%s value:%s", option, value); uError("failed to cfg dynamic option:%s value:%s", option, value);
} }
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal) {
SConfigItem *pItem = cfgGetItem(tsCfg, flagName);
if (pItem != NULL) {
pItem->i32 = flagVal;
}
*pFlagPtr = flagVal;
}
void taosSetAllDebugFlag(int32_t flag) {
if (flag <= 0) return;
taosSetDebugFlag(&uDebugFlag, "uDebugFlag", flag);
taosSetDebugFlag(&rpcDebugFlag, "rpcDebugFlag", flag);
taosSetDebugFlag(&jniDebugFlag, "jniDebugFlag", flag);
taosSetDebugFlag(&qDebugFlag, "qDebugFlag", flag);
taosSetDebugFlag(&cDebugFlag, "cDebugFlag", flag);
taosSetDebugFlag(&dDebugFlag, "dDebugFlag", flag);
taosSetDebugFlag(&vDebugFlag, "vDebugFlag", flag);
taosSetDebugFlag(&mDebugFlag, "mDebugFlag", flag);
taosSetDebugFlag(&wDebugFlag, "wDebugFlag", flag);
taosSetDebugFlag(&sDebugFlag, "sDebugFlag", flag);
taosSetDebugFlag(&tsdbDebugFlag, "tsdbDebugFlag", flag);
taosSetDebugFlag(&tqDebugFlag, "tqDebugFlag", flag);
taosSetDebugFlag(&fsDebugFlag, "fsDebugFlag", flag);
taosSetDebugFlag(&udfDebugFlag, "udfDebugFlag", flag);
taosSetDebugFlag(&smaDebugFlag, "smaDebugFlag", flag);
taosSetDebugFlag(&idxDebugFlag, "idxDebugFlag", flag);
uInfo("all debug flag are set to %d", flag);
}
...@@ -2018,6 +2018,10 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) { ...@@ -2018,6 +2018,10 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1; if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1; if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
if (tEncodeI8(&encoder, pReq->schemaless) < 0) return -1; if (tEncodeI8(&encoder, pReq->schemaless) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRollPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walSegmentSize) < 0) return -1;
if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1; if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) { for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
...@@ -2060,6 +2064,10 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) ...@@ -2060,6 +2064,10 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1; if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1; if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->schemaless) < 0) return -1; if (tDecodeI8(&decoder, &pReq->schemaless) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRetentionSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRollPeriod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walSegmentSize) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1; if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1;
pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention)); pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention));
......
...@@ -164,8 +164,8 @@ typedef struct { ...@@ -164,8 +164,8 @@ typedef struct {
int32_t lastErrorNo; int32_t lastErrorNo;
tmsg_t lastMsgType; tmsg_t lastMsgType;
SEpSet lastEpset; SEpSet lastEpset;
char dbname1[TSDB_DB_FNAME_LEN]; char dbname1[TSDB_TABLE_FNAME_LEN];
char dbname2[TSDB_DB_FNAME_LEN]; char dbname2[TSDB_TABLE_FNAME_LEN];
int32_t startFunc; int32_t startFunc;
int32_t stopFunc; int32_t stopFunc;
int32_t paramLen; int32_t paramLen;
......
...@@ -874,7 +874,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { ...@@ -874,7 +874,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
} }
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) { static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
mInfo("config rsp from dnode, app:%p", pRsp->info.ahandle); mInfo("config rsp from dnode");
return 0; return 0;
} }
......
...@@ -281,7 +281,7 @@ static int32_t mndSetDropOffsetRedoLogs(SMnode *pMnode, STrans *pTrans, SMqOffse ...@@ -281,7 +281,7 @@ static int32_t mndSetDropOffsetRedoLogs(SMnode *pMnode, STrans *pTrans, SMqOffse
} }
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = -1; int32_t code = 0;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
...@@ -297,15 +297,15 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { ...@@ -297,15 +297,15 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) { if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
sdbRelease(pSdb, pOffset); sdbRelease(pSdb, pOffset);
goto END; sdbCancelFetch(pSdb, pIter);
code = -1;
break;
} }
sdbRelease(pSdb, pOffset); sdbRelease(pSdb, pOffset);
} }
code = 0; return code;
END:
return code;
} }
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) { int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
......
...@@ -312,7 +312,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { ...@@ -312,7 +312,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
pBasic->queryDesc = NULL; pBasic->queryDesc = NULL;
mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries); mDebug("queries updated in conn %u, num:%d", pConn->id, pConn->numOfQueries);
taosWUnLockLatch(&pConn->queryLock); taosWUnLockLatch(&pConn->queryLock);
......
...@@ -641,6 +641,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -641,6 +641,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_VND_CREATE_STB; action.msgType = TDMT_VND_CREATE_STB;
action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST; action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
...@@ -805,7 +806,7 @@ _OVER: ...@@ -805,7 +806,7 @@ _OVER:
} }
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
...@@ -1612,7 +1613,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb ...@@ -1612,7 +1613,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pStb->name); mDebug("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (needRsp) { if (needRsp) {
void *pCont = NULL; void *pCont = NULL;
...@@ -1811,7 +1812,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p ...@@ -1811,7 +1812,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
......
...@@ -824,7 +824,7 @@ int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj ...@@ -824,7 +824,7 @@ int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj
} }
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = -1; int32_t code = 0;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
...@@ -840,12 +840,14 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { ...@@ -840,12 +840,14 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
sdbRelease(pSdb, pSub); sdbRelease(pSdb, pSub);
goto END; sdbCancelFetch(pSdb, pIter);
code = -1;
break;
} }
sdbRelease(pSdb, pSub);
} }
code = 0;
END:
return code; return code;
} }
......
...@@ -833,7 +833,7 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) { ...@@ -833,7 +833,7 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
} }
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = -1; int32_t code = 0;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
...@@ -848,11 +848,14 @@ int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { ...@@ -848,11 +848,14 @@ int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
} }
if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) { if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
goto END; sdbRelease(pSdb, pTopic);
sdbCancelFetch(pSdb, pIter);
code = -1;
break;
} }
sdbRelease(pSdb, pTopic);
} }
code = 0;
END:
return code; return code;
} }
...@@ -127,8 +127,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { ...@@ -127,8 +127,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT8(pRaw, dataPos, 0, _OVER) SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT8(pRaw, dataPos, 0, _OVER) SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
...@@ -290,8 +290,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { ...@@ -290,8 +290,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans->exec = exec; pTrans->exec = exec;
pTrans->oper = oper; pTrans->oper = oper;
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER) SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
...@@ -727,10 +727,10 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c ...@@ -727,10 +727,10 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) { void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) {
if (dbname1 != NULL) { if (dbname1 != NULL) {
memcpy(pTrans->dbname1, dbname1, TSDB_DB_FNAME_LEN); tstrncpy(pTrans->dbname1, dbname1, TSDB_TABLE_FNAME_LEN);
} }
if (dbname2 != NULL) { if (dbname2 != NULL) {
memcpy(pTrans->dbname2, dbname2, TSDB_DB_FNAME_LEN); tstrncpy(pTrans->dbname2, dbname2, TSDB_TABLE_FNAME_LEN);
} }
} }
...@@ -1289,6 +1289,19 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { ...@@ -1289,6 +1289,19 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
} else { } else {
pTrans->code = terrno; pTrans->code = terrno;
if (pTrans->policy == TRN_POLICY_ROLLBACK) { if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (pTrans->lastAction != 0) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->lastAction);
if (pAction->retryCode != 0 && pAction->retryCode != pAction->errCode) {
if (pTrans->failedTimes < 6) {
mError("trans:%d, stage keep on redoAction since action:%d code:0x%x not 0x%x, failedTimes:%d", pTrans->id,
pTrans->lastAction, pTrans->code, pAction->retryCode, pTrans->failedTimes);
taosMsleep(1000);
continueExec = true;
return true;
}
}
}
pTrans->stage = TRN_STAGE_ROLLBACK; pTrans->stage = TRN_STAGE_ROLLBACK;
mError("trans:%d, stage from redoAction to rollback since %s", pTrans->id, terrstr()); mError("trans:%d, stage from redoAction to rollback since %s", pTrans->id, terrstr());
continueExec = true; continueExec = true;
......
...@@ -118,9 +118,8 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur); ...@@ -118,9 +118,8 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
// typedef struct STsdb STsdb; // typedef struct STsdb STsdb;
typedef struct STsdbReader STsdbReader; typedef struct STsdbReader STsdbReader;
#define BLOCK_LOAD_OFFSET_ORDER 1 #define TIMEWINDOW_RANGE_CONTAINED 1
#define BLOCK_LOAD_TABLESEQ_ORDER 2 #define TIMEWINDOW_RANGE_EXTERNAL 2
#define BLOCK_LOAD_EXTERN_ORDER 3
#define LASTROW_RETRIEVE_TYPE_ALL 0x1 #define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2 #define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
......
...@@ -104,6 +104,8 @@ typedef struct { ...@@ -104,6 +104,8 @@ typedef struct {
// TODO remove // TODO remove
SWalReader* pWalReader; SWalReader* pWalReader;
SWalRef* pRef;
// push // push
STqPushHandle pushHandle; STqPushHandle pushHandle;
......
...@@ -268,6 +268,7 @@ struct SVnode { ...@@ -268,6 +268,7 @@ struct SVnode {
tsem_t canCommit; tsem_t canCommit;
int64_t sync; int64_t sync;
int32_t blockCount; int32_t blockCount;
bool restored;
tsem_t syncSem; tsem_t syncSem;
SQHandle* pQuery; SQHandle* pQuery;
}; };
......
...@@ -178,7 +178,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -178,7 +178,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
if (metaGetTableEntryByName(&mr, pReq->name) == 0) { if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
// TODO: just for pass case // TODO: just for pass case
#if 0 #if 0
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST; terrno = TSDB_CODE_TDB_STB_ALREADY_EXIST;
metaReaderClear(&mr); metaReaderClear(&mr);
return -1; return -1;
#else #else
...@@ -223,7 +223,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb ...@@ -223,7 +223,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
// check if super table exists // check if super table exists
rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData); rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData);
if (rc < 0 || *(tb_uid_t *)pData != pReq->suid) { if (rc < 0 || *(tb_uid_t *)pData != pReq->suid) {
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST; terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
return -1; return -1;
} }
......
...@@ -212,6 +212,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -212,6 +212,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
if (offset.val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey));
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
ASSERT(0);
return -1;
}
}
/*}*/ /*}*/
/*}*/ /*}*/
...@@ -376,8 +385,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -376,8 +385,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
int64_t fetchVer = fetchOffsetNew.version + 1; int64_t fetchVer = fetchOffsetNew.version + 1;
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
code = -1; code = -1;
goto OVER; goto OVER;
...@@ -534,11 +543,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -534,11 +543,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.subType = req.subType; pHandle->execHandle.subType = req.subType;
pHandle->fetchMeta = req.withMeta; pHandle->fetchMeta = req.withMeta;
// TODO version should be assigned and refed during preprocess
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
if (pRef == NULL) {
ASSERT(0);
}
int64_t ver = pRef->refVer;
pHandle->pRef = pRef;
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
// TODO version should be assigned in preprocess
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
pHandle->execHandle.execCol.qmsg = req.qmsg; pHandle->execHandle.execCol.qmsg = req.qmsg;
pHandle->snapshotVer = ver; pHandle->snapshotVer = ver;
...@@ -560,10 +572,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -560,10 +572,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(pHandle->execHandle.pExecReader); ASSERT(pHandle->execHandle.pExecReader);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
pHandle->execHandle.execDb.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
pHandle->execHandle.execTb.suid = req.suid; pHandle->execHandle.execTb.suid = req.suid;
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
......
...@@ -52,7 +52,7 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -52,7 +52,7 @@ int32_t tqMetaOpen(STQ* pTq) {
ASSERT(0); ASSERT(0);
} }
TXN txn; TXN txn = {0};
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
ASSERT(0); ASSERT(0);
...@@ -75,7 +75,13 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -75,7 +75,13 @@ int32_t tqMetaOpen(STQ* pTq) {
STqHandle handle; STqHandle handle;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle); tDecodeSTqHandle(&decoder, &handle);
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.pRef = walOpenRef(pTq->pVnode->pWal);
if (handle.pRef == NULL) {
ASSERT(0);
}
walRefVer(handle.pRef, handle.snapshotVer);
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SReadHandle reader = { SReadHandle reader = {
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
...@@ -94,6 +100,7 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -94,6 +100,7 @@ int32_t tqMetaOpen(STQ* pTq) {
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader); ASSERT(handle.execHandle.pExecReader);
} else { } else {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.execDb.pFilterOutTbUid = handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} }
......
...@@ -1395,10 +1395,26 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { ...@@ -1395,10 +1395,26 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
break; break;
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
break; break;
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:{
pColAgg->sum += colVal.value.i8;
if (pColAgg->min > colVal.value.i8) {
pColAgg->min = colVal.value.i8;
}
if (pColAgg->max < colVal.value.i8) {
pColAgg->max = colVal.value.i8;
}
break; break;
case TSDB_DATA_TYPE_SMALLINT: }
case TSDB_DATA_TYPE_SMALLINT:{
pColAgg->sum += colVal.value.i16;
if (pColAgg->min > colVal.value.i16) {
pColAgg->min = colVal.value.i16;
}
if (pColAgg->max < colVal.value.i16) {
pColAgg->max = colVal.value.i16;
}
break; break;
}
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
pColAgg->sum += colVal.value.i32; pColAgg->sum += colVal.value.i32;
if (pColAgg->min > colVal.value.i32) { if (pColAgg->min > colVal.value.i32) {
...@@ -1419,24 +1435,79 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { ...@@ -1419,24 +1435,79 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
} }
break; break;
} }
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:{
pColAgg->sum += colVal.value.f;
if (pColAgg->min > colVal.value.f) {
pColAgg->min = colVal.value.f;
}
if (pColAgg->max < colVal.value.f) {
pColAgg->max = colVal.value.f;
}
break; break;
case TSDB_DATA_TYPE_DOUBLE: }
case TSDB_DATA_TYPE_DOUBLE:{
pColAgg->sum += colVal.value.d;
if (pColAgg->min > colVal.value.d) {
pColAgg->min = colVal.value.d;
}
if (pColAgg->max < colVal.value.d) {
pColAgg->max = colVal.value.d;
}
break; break;
}
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:{
if (pColAgg->min > colVal.value.i64) {
pColAgg->min = colVal.value.i64;
}
if (pColAgg->max < colVal.value.i64) {
pColAgg->max = colVal.value.i64;
}
break; break;
}
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
break; break;
case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_UTINYINT:{
pColAgg->sum += colVal.value.u8;
if (pColAgg->min > colVal.value.u8) {
pColAgg->min = colVal.value.u8;
}
if (pColAgg->max < colVal.value.u8) {
pColAgg->max = colVal.value.u8;
}
break; break;
case TSDB_DATA_TYPE_USMALLINT: }
case TSDB_DATA_TYPE_USMALLINT:{
pColAgg->sum += colVal.value.u16;
if (pColAgg->min > colVal.value.u16) {
pColAgg->min = colVal.value.u16;
}
if (pColAgg->max < colVal.value.u16) {
pColAgg->max = colVal.value.u16;
}
break; break;
case TSDB_DATA_TYPE_UINT: }
case TSDB_DATA_TYPE_UINT:{
pColAgg->sum += colVal.value.u32;
if (pColAgg->min > colVal.value.u32) {
pColAgg->min = colVal.value.u32;
}
if (pColAgg->max < colVal.value.u32) {
pColAgg->max = colVal.value.u32;
}
break; break;
case TSDB_DATA_TYPE_UBIGINT: }
case TSDB_DATA_TYPE_UBIGINT:{
pColAgg->sum += colVal.value.u64;
if (pColAgg->min > colVal.value.u64) {
pColAgg->min = colVal.value.u64;
}
if (pColAgg->max < colVal.value.u64) {
pColAgg->max = colVal.value.u64;
}
break; break;
}
case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_JSON:
break; break;
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
......
...@@ -16,23 +16,28 @@ ...@@ -16,23 +16,28 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "vnd.h" #include "vnd.h"
#define BATCH_DISABLE 1
static inline bool vnodeIsMsgBlock(tmsg_t type) { static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) || return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) ||
(type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL); (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL) ||
(type == TDMT_VND_ALTER_REPLICA);
} }
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) { if (vnodeIsMsgBlock(pMsg->msgType)) {
vTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
tsem_wait(&pVnode->syncSem); tsem_wait(&pVnode->syncSem);
} }
} }
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) { if (vnodeIsMsgBlock(pMsg->msgType)) {
vTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
tsem_post(&pVnode->syncSem); tsem_post(&pVnode->syncSem);
} }
} }
...@@ -124,60 +129,147 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -124,60 +129,147 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
tmsgSendRedirectRsp(&rsp, &newEpSet); tmsgSendRedirectRsp(&rsp, &newEpSet);
} }
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
rsp.code = terrno;
const STraceId *trace = &pMsg->info.traceId;
vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
}
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
if (code == TSDB_CODE_SYN_NOT_LEADER) {
vnodeRedirectRpcMsg(pVnode, pMsg);
} else {
const STraceId *trace = &pMsg->info.traceId;
vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
}
static void vnodeHandleAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = vnodeProcessAlterReplicaReq(pVnode, pMsg);
if (code > 0) {
ASSERT(0);
} else if (code == 0) {
vnodeWaitBlockMsg(pVnode, pMsg);
} else {
if (terrno != 0) code = terrno;
vnodeHandleProposeError(pVnode, pMsg, code);
}
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
if (*arrSize <= 0) return;
#if BATCH_DISABLE
int32_t code = syncPropose(pVnode->sync, pMsgArr[0], pIsWeakArr[0]);
#else
int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
#endif
if (code > 0) {
for (int32_t i = 0; i < *arrSize; ++i) {
vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
}
} else if (code == 0) {
vnodeWaitBlockMsg(pVnode, pMsgArr[*arrSize - 1]);
} else {
if (terrno != 0) code = terrno;
for (int32_t i = 0; i < *arrSize; ++i) {
vnodeHandleProposeError(pVnode, pMsgArr[i], code);
}
}
for (int32_t i = 0; i < *arrSize; ++i) {
SRpcMsg *pMsg = pMsgArr[i];
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
*arrSize = 0;
}
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode *pVnode = pInfo->ahandle; SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId; int32_t vgId = pVnode->config.vgId;
int32_t code = 0; int32_t code = 0;
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
int32_t arrayPos = 0;
SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg*));
bool *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs); vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
for (int32_t m = 0; m < numOfMsgs; m++) { for (int32_t msg = 0; msg < numOfMsgs; msg++) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
bool isBlock = vnodeIsMsgBlock(pMsg->msgType);
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle); vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
if (!pVnode->restored) {
vGError("vgId:%d, msg:%p failed to process since not leader", vgId, pMsg);
terrno = TSDB_CODE_APP_NOT_READY;
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
continue;
}
if (pMsgArr == NULL || pIsWeakArr == NULL) {
vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg);
terrno = TSDB_CODE_OUT_OF_MEMORY;
vnodeHandleProposeError(pVnode, pMsg, terrno);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
continue;
}
code = vnodePreProcessWriteMsg(pVnode, pMsg); code = vnodePreProcessWriteMsg(pVnode, pMsg);
if (code != 0) { if (code != 0) {
vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
} else { rpcFreeCont(pMsg->pCont);
if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { taosFreeQitem(pMsg);
code = vnodeProcessAlterReplicaReq(pVnode, pMsg); continue;
} else {
code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
if (code > 0) {
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
rsp.code = terrno;
vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
}
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
} else if (code == 0) {
vnodeWaitBlockMsg(pVnode, pMsg);
} else {
}
}
} }
if (code < 0) { if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
if (terrno == TSDB_CODE_SYN_NOT_LEADER) { vnodeHandleAlterReplicaReq(pVnode, pMsg);
vnodeRedirectRpcMsg(pVnode, pMsg); continue;
} else {
if (terrno != 0) code = terrno;
vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
} }
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code); if (isBlock || BATCH_DISABLE) {
rpcFreeCont(pMsg->pCont); vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
taosFreeQitem(pMsg); }
pMsgArr[arrayPos] = pMsg;
pIsWeakArr[arrayPos] = isWeak;
arrayPos++;
if (isBlock || msg == numOfMsgs - 1 || BATCH_DISABLE) {
vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
}
} }
taosMemoryFree(pMsgArr);
taosMemoryFree(pIsWeakArr);
} }
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
...@@ -527,6 +619,12 @@ static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsm ...@@ -527,6 +619,12 @@ static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsm
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
} }
static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data;
pVnode->restored = true;
vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
}
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pVnode; pFsm->data = pVnode;
...@@ -534,7 +632,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { ...@@ -534,7 +632,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = NULL; pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
pFsm->FpLeaderTransferCb = vnodeLeaderTransfer; pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
pFsm->FpReConfigCb = vnodeSyncReconfig; pFsm->FpReConfigCb = vnodeSyncReconfig;
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
...@@ -588,11 +686,10 @@ bool vnodeIsLeader(SVnode *pVnode) { ...@@ -588,11 +686,10 @@ bool vnodeIsLeader(SVnode *pVnode) {
return false; return false;
} }
// todo if (!pVnode->restored) {
// if (!pVnode->restored) { terrno = TSDB_CODE_APP_NOT_READY;
// terrno = TSDB_CODE_APP_NOT_READY; return false;
// return false; }
// }
return true; return true;
} }
\ No newline at end of file
...@@ -135,7 +135,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo ...@@ -135,7 +135,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode;
pPhysiChildren = pJoinNode->node.pChildren; pPhysiChildren = pJoinNode->node.pChildren;
break; break;
} }
...@@ -434,7 +434,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -434,7 +434,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode; STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_ROW_NEW(level,
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == pNode->type ? EXPLAIN_TBL_MERGE_SCAN_FORMAT : EXPLAIN_TBL_SCAN_FORMAT, QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == pNode->type ? EXPLAIN_TBL_MERGE_SCAN_FORMAT
: EXPLAIN_TBL_SCAN_FORMAT,
pTblScanNode->scan.tableName.tname); pTblScanNode->scan.tableName.tname);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
...@@ -551,7 +552,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -551,7 +552,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
if (pSTblScanNode->scan.pScanPseudoCols) { if (pSTblScanNode->scan.pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pSTblScanNode->scan.pScanPseudoCols->length); EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pSTblScanNode->scan.pScanPseudoCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
} }
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
...@@ -613,7 +614,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -613,7 +614,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType)); EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
...@@ -1180,7 +1181,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -1180,7 +1181,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
if (pDistScanNode->pScanPseudoCols) { if (pDistScanNode->pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pDistScanNode->pScanPseudoCols->length); EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pDistScanNode->pScanPseudoCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
} }
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDistScanNode->node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDistScanNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
...@@ -1367,7 +1368,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -1367,7 +1368,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pInterpNode->pFuncs->length); EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pInterpNode->pFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
} }
EXPLAIN_ROW_APPEND(EXPLAIN_MODE_FORMAT, nodesGetFillModeString(pInterpNode->fillMode)); EXPLAIN_ROW_APPEND(EXPLAIN_MODE_FORMAT, nodesGetFillModeString(pInterpNode->fillMode));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
...@@ -1419,7 +1420,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -1419,7 +1420,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
} }
} }
break; break;
} }
default: default:
qError("not supported physical node type %d", pNode->type); qError("not supported physical node type %d", pNode->type);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
......
...@@ -82,8 +82,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); ...@@ -82,8 +82,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void initResultRowInfo(SResultRowInfo* pResultRowInfo); void initResultRowInfo(SResultRowInfo* pResultRowInfo);
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
void closeAllResultRows(SResultRowInfo* pResultRowInfo);
void initResultRow(SResultRow* pResultRow); void initResultRow(SResultRow* pResultRow);
void closeResultRow(SResultRow* pResultRow); void closeResultRow(SResultRow* pResultRow);
bool isResultRowClosed(SResultRow* pResultRow); bool isResultRowClosed(SResultRow* pResultRow);
......
...@@ -108,7 +108,6 @@ typedef struct STaskCostInfo { ...@@ -108,7 +108,6 @@ typedef struct STaskCostInfo {
SFileBlockLoadRecorder* pRecoder; SFileBlockLoadRecorder* pRecoder;
uint64_t elapsedTime; uint64_t elapsedTime;
uint64_t firstStageMergeTime;
uint64_t winInfoSize; uint64_t winInfoSize;
uint64_t tableInfoSize; uint64_t tableInfoSize;
uint64_t hashSize; uint64_t hashSize;
...@@ -321,6 +320,47 @@ typedef struct STableScanInfo { ...@@ -321,6 +320,47 @@ typedef struct STableScanInfo {
int8_t noTable; int8_t noTable;
} STableScanInfo; } STableScanInfo;
typedef struct STableMergeScanInfo {
STableListInfo* tableListInfo;
int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* dataReaders; // array of tsdbReaderT*
SReadHandle readHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SFileBlockLoadRecorder readRecorder;
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowEntryInfoOffset;
SExprInfo* pExpr;
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
int32_t numOfOutput;
SExprSupp pseudoSup;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval interval;
SSampleExecInfo sample; // sample execution info
} STableMergeScanInfo;
typedef struct STagScanInfo { typedef struct STagScanInfo {
SColumnInfo *pCols; SColumnInfo *pCols;
SSDataBlock *pRes; SSDataBlock *pRes;
...@@ -352,6 +392,11 @@ typedef enum EStreamScanMode { ...@@ -352,6 +392,11 @@ typedef enum EStreamScanMode {
STREAM_SCAN_FROM_DATAREADER_RANGE, STREAM_SCAN_FROM_DATAREADER_RANGE,
} EStreamScanMode; } EStreamScanMode;
enum {
PROJECT_RETRIEVE_CONTINUE = 0x1,
PROJECT_RETRIEVE_DONE = 0x2,
};
typedef struct SCatchSupporter { typedef struct SCatchSupporter {
SHashObj* pWindowHashTable; // quick locate the window object for each window SHashObj* pWindowHashTable; // quick locate the window object for each window
SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file
...@@ -549,6 +594,7 @@ typedef struct SProjectOperatorInfo { ...@@ -549,6 +594,7 @@ typedef struct SProjectOperatorInfo {
SLimitInfo limitInfo; SLimitInfo limitInfo;
bool mergeDataBlocks; bool mergeDataBlocks;
SSDataBlock* pFinalRes; SSDataBlock* pFinalRes;
SNode* pCondition;
} SProjectOperatorInfo; } SProjectOperatorInfo;
typedef struct SIndefOperatorInfo { typedef struct SIndefOperatorInfo {
...@@ -881,7 +927,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition ...@@ -881,7 +927,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode, SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
...@@ -954,6 +1000,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, ...@@ -954,6 +1000,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid); void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid);
void printDataBlock(SSDataBlock* pBlock, const char* flag);
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
......
...@@ -182,7 +182,8 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { ...@@ -182,7 +182,8 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData); SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pDeleter->pParam->pUidList = NULL;
pOutput->numOfRows = pEntry->numOfRows; pOutput->numOfRows = pEntry->numOfRows;
pOutput->numOfCols = pEntry->numOfCols; pOutput->numOfCols = pEntry->numOfCols;
pOutput->compressed = pEntry->compressed; pOutput->compressed = pEntry->compressed;
...@@ -205,6 +206,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { ...@@ -205,6 +206,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize); atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize);
taosMemoryFreeClear(pDeleter->nextOutput.pData); taosMemoryFreeClear(pDeleter->nextOutput.pData);
taosArrayDestroy(pDeleter->pParam->pUidList);
taosMemoryFree(pDeleter->pParam);
while (!taosQueueEmpty(pDeleter->pDataBlocks)) { while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
SDataDeleterBuf* pBuf = NULL; SDataDeleterBuf* pBuf = NULL;
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
......
...@@ -43,10 +43,6 @@ void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) { ...@@ -43,10 +43,6 @@ void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) {
} }
} }
void closeAllResultRows(SResultRowInfo* pResultRowInfo) {
// do nothing
}
bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); } bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); }
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; } void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
...@@ -160,11 +156,13 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) { ...@@ -160,11 +156,13 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
SArray* createSortInfo(SNodeList* pNodeList) { SArray* createSortInfo(SNodeList* pNodeList) {
size_t numOfCols = 0; size_t numOfCols = 0;
if (pNodeList != NULL) { if (pNodeList != NULL) {
numOfCols = LIST_LENGTH(pNodeList); numOfCols = LIST_LENGTH(pNodeList);
} else { } else {
numOfCols = 0; numOfCols = 0;
} }
SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo)); SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
if (pList == NULL) { if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -196,10 +194,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { ...@@ -196,10 +194,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i); SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
/*if (!pDescNode->output) { // todo disable it temporarily*/
/*continue;*/
/*}*/
SColumnInfoData idata = SColumnInfoData idata =
createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId); createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
idata.info.scale = pDescNode->dataType.scale; idata.info.scale = pDescNode->dataType.scale;
...@@ -701,9 +695,6 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu ...@@ -701,9 +695,6 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
} }
} }
#ifdef BUF_PAGE_DEBUG
qDebug("page_setSelect num:%d", num);
#endif
if (p != NULL) { if (p != NULL) {
p->subsidiaries.pCtx = pValCtx; p->subsidiaries.pCtx = pValCtx;
p->subsidiaries.num = num; p->subsidiaries.num = num;
...@@ -852,7 +843,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi ...@@ -852,7 +843,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
// TODO: get it from stable scan node // TODO: get it from stable scan node
pCond->twindows = pTableScanNode->scanRange; pCond->twindows = pTableScanNode->scanRange;
pCond->suid = pTableScanNode->scan.suid; pCond->suid = pTableScanNode->scan.suid;
pCond->type = BLOCK_LOAD_OFFSET_ORDER; pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1; pCond->startVersion = -1;
pCond->endVersion = -1; pCond->endVersion = -1;
// pCond->type = pTableScanNode->scanFlag; // pCond->type = pTableScanNode->scanFlag;
...@@ -947,6 +938,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter ...@@ -947,6 +938,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter
} }
// get the correct time window according to the handled timestamp // get the correct time window according to the handled timestamp
// todo refactor
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t order) { int32_t order) {
STimeWindow w = {0}; STimeWindow w = {0};
......
...@@ -416,6 +416,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { ...@@ -416,6 +416,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
} }
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
atomic_store_64(&pTaskInfo->owner, 0);
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -28,30 +28,30 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); ...@@ -28,30 +28,30 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); static void destroyMergeJoinOperator(void* param, int32_t numOfOutput);
static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode); static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode, SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SExecTaskInfo* pTaskInfo) { SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL || pInfo == NULL) { if (pOperator == NULL || pInfo == NULL) {
goto _error; goto _error;
} }
SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc);
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pOperator->name = "MergeJoinOperator"; pOperator->name = "MergeJoinOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
SNode* pMergeCondition = pJoinNode->pMergeCondition; SNode* pMergeCondition = pJoinNode->pMergeCondition;
if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) { if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) {
...@@ -104,7 +104,7 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { ...@@ -104,7 +104,7 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
nodesDestroyNode(pJoinOperator->pCondAfterMerge); nodesDestroyNode(pJoinOperator->pCondAfterMerge);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
......
此差异已折叠。
...@@ -274,7 +274,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -274,7 +274,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
} else { } else {
qDebug("%s data block filter out, elapsed time:%"PRId64, GET_TASKID(pTaskInfo), (et - st)); qDebug("%s data block filter out, elapsed time:%" PRId64, GET_TASKID(pTaskInfo), (et - st));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1838,11 +1838,14 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { ...@@ -1838,11 +1838,14 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
int8_t tagType = smr.me.stbEntry.schemaTag.pSchema[i].type; int8_t tagType = smr.me.stbEntry.schemaTag.pSchema[i].type;
pColInfoData = taosArrayGet(p->pDataBlock, 4); pColInfoData = taosArrayGet(p->pDataBlock, 4);
char tagTypeStr[VARSTR_HEADER_SIZE + 32]; char tagTypeStr[VARSTR_HEADER_SIZE + 32];
int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name); int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name);
if (tagType == TSDB_DATA_TYPE_VARCHAR) { if (tagType == TSDB_DATA_TYPE_VARCHAR) {
tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE)); tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
(int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE));
} else if (tagType == TSDB_DATA_TYPE_NCHAR) { } else if (tagType == TSDB_DATA_TYPE_NCHAR) {
tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); tagTypeLen +=
sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
(int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
} }
varDataSetLen(tagTypeStr, tagTypeLen); varDataSetLen(tagTypeStr, tagTypeLen);
colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false); colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false);
...@@ -2527,49 +2530,6 @@ _error: ...@@ -2527,49 +2530,6 @@ _error:
return NULL; return NULL;
} }
typedef struct STableMergeScanInfo {
STableListInfo* tableListInfo;
int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* dataReaders; // array of tsdbReaderT*
SReadHandle readHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SFileBlockLoadRecorder readRecorder;
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowEntryInfoOffset;
SExprInfo* pExpr;
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
int32_t numOfOutput;
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
SqlFunctionCtx* pPseudoCtx;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval interval;
SSampleExecInfo sample; // sample execution info
} STableMergeScanInfo;
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idStr) { const char* idStr) {
...@@ -2700,9 +2660,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc ...@@ -2700,9 +2660,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true); relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr > 0) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->numOfPseudoExpr, pBlock, GET_TASKID(pTaskInfo)); pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
...@@ -2869,29 +2829,31 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2869,29 +2829,31 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
tsortDestroySortHandle(pInfo->pSortHandle); size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
for (int32_t i = 0; i < numReaders; ++i) {
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
blockDataDestroy(param->inputBlock);
}
taosArrayClear(pInfo->sortSourceParams); taosArrayClear(pInfo->sortSourceParams);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->dataReaders); ++i) { tsortDestroySortHandle(pInfo->pSortHandle);
for (int32_t i = 0; i < numReaders; ++i) {
STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i); STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i);
tsdbReaderClose(reader); tsdbReaderClose(reader);
} }
taosArrayDestroy(pInfo->dataReaders); taosArrayDestroy(pInfo->dataReaders);
pInfo->dataReaders = NULL; pInfo->dataReaders = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capacity, SOperatorInfo* pOperator) { SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity, SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* p = tsortGetSortedDataBlock(pHandle); blockDataCleanup(pResBlock);
if (p == NULL) { blockDataEnsureCapacity(pResBlock, capacity);
return NULL;
}
blockDataEnsureCapacity(p, capacity);
while (1) { while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle); STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
...@@ -2899,14 +2861,15 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa ...@@ -2899,14 +2861,15 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
break; break;
} }
appendOneRowToDataBlock(p, pTupleHandle); appendOneRowToDataBlock(pResBlock, pTupleHandle);
if (p->info.rows >= capacity) { if (pResBlock->info.rows >= capacity) {
break; break;
} }
} }
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows);
return (p->info.rows > 0) ? p : NULL; qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
} }
SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
...@@ -2935,7 +2898,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2935,7 +2898,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
} }
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while (pInfo->tableStartIndex < tableListSize) { while (pInfo->tableStartIndex < tableListSize) {
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator);
if (pBlock != NULL) { if (pBlock != NULL) {
pBlock->info.groupId = pInfo->groupId; pBlock->info.groupId = pInfo->groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
...@@ -2959,6 +2922,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2959,6 +2922,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
cleanupQueryTableDataCond(&pTableScanInfo->cond); cleanupQueryTableDataCond(&pTableScanInfo->cond);
taosArrayDestroy(pTableScanInfo->sortSourceParams);
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
...@@ -2974,7 +2938,9 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -2974,7 +2938,9 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
taosArrayDestroy(pTableScanInfo->pSortInfo); taosArrayDestroy(pTableScanInfo->pSortInfo);
cleanupExprSupp(&pTableScanInfo->pseudoSup);
taosMemoryFreeClear(pTableScanInfo->rowEntryInfoOffset);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -3031,8 +2997,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -3031,8 +2997,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
} }
if (pTableScanNode->scan.pScanPseudoCols != NULL) { if (pTableScanNode->scan.pScanPseudoCols != NULL) {
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); SExprSupp* pSup = &pInfo->pseudoSup;
pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowEntryInfoOffset); pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
} }
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
......
...@@ -1094,7 +1094,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1094,7 +1094,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL);
} }
closeAllResultRows(&pInfo->binfo.resultRowInfo);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
...@@ -1250,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -1250,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
...@@ -2045,7 +2043,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -2045,7 +2043,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
...@@ -2209,8 +2206,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2209,8 +2206,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SSDataBlock* pResBlock = pSliceInfo->pRes; SSDataBlock* pResBlock = pSliceInfo->pRes;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
// if (pOperator->status == OP_RES_TO_RETURN) { // if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) { // if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
...@@ -2350,10 +2345,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode ...@@ -2350,10 +2345,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues); pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues);
pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->win = pInterpPhyNode->timeRange; pInfo->win = pInterpPhyNode->timeRange;
pInfo->interval.interval = pInterpPhyNode->interval; pInfo->interval.interval = pInterpPhyNode->interval;
pInfo->current = pInfo->win.skey; pInfo->current = pInfo->win.skey;
pOperator->name = "TimeSliceOperator"; pOperator->name = "TimeSliceOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC;
...@@ -2774,7 +2769,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2774,7 +2769,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
...@@ -2783,7 +2778,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2783,7 +2778,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->pPullDataRes->info.rows != 0) { if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
ASSERT(IS_FINAL_OP(pInfo)); ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->pPullDataRes; return pInfo->pPullDataRes;
} }
...@@ -2798,20 +2793,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2798,20 +2793,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
return NULL; return NULL;
} }
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} else { } else {
if (!IS_FINAL_OP(pInfo)) { if (!IS_FINAL_OP(pInfo)) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows != 0) { if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
} }
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) { if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false; pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo)); ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
// process the rest of the data // process the rest of the data
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
...@@ -2819,13 +2814,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2819,13 +2814,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
// if (pInfo->pPullDataRes->info.rows != 0) { // if (pInfo->pPullDataRes->info.rows != 0) {
// // process the rest of the data // // process the rest of the data
// ASSERT(IS_FINAL_OP(pInfo)); // ASSERT(IS_FINAL_OP(pInfo));
// printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); // printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
// return pInfo->pPullDataRes; // return pInfo->pPullDataRes;
// } // }
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) { if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
} }
...@@ -2836,10 +2831,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2836,10 +2831,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
clearSpecialDataBlock(pInfo->pUpdateRes); clearSpecialDataBlock(pInfo->pUpdateRes);
removeDeleteResults(pUpdated, pInfo->pDelWins); removeDeleteResults(pUpdated, pInfo->pDelWins);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
break; break;
} }
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv"); printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA || if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA ||
...@@ -2939,20 +2934,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2939,20 +2934,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->pPullDataRes->info.rows != 0) { if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
ASSERT(IS_FINAL_OP(pInfo)); ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->pPullDataRes; return pInfo->pPullDataRes;
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows != 0) { if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) { if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false; pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo)); ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
// process the rest of the data // process the rest of the data
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
...@@ -2960,7 +2955,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2960,7 +2955,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) { if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
// ASSERT(false); // ASSERT(false);
...@@ -3820,14 +3815,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3820,14 +3815,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} else if (pOperator->status == OP_RES_TO_RETURN) { } else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "Final Session" : "Single Session"); printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "Final Session" : "Single Session"); printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
...@@ -3840,7 +3835,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3840,7 +3835,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "Final Session Recv" : "Single Session Recv"); printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv");
if (pBlock->info.type == STREAM_CLEAR) { if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
...@@ -3917,11 +3912,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3917,11 +3912,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "Final Session" : "Single Session"); printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "Final Session" : "Single Session"); printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
...@@ -3960,21 +3955,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { ...@@ -3960,21 +3955,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
} else if (pOperator->status == OP_RES_TO_RETURN) { } else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows > 0) { if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, "Semi Session"); printDataBlock(pBInfo->pRes, "sems session");
return pBInfo->pRes; return pBInfo->pRes;
} }
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); // doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) { if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) {
pInfo->returnDelete = true; pInfo->returnDelete = true;
printDataBlock(pInfo->pDelRes, "Semi Session"); printDataBlock(pInfo->pDelRes, "sems session");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
if (pInfo->pUpdateRes->info.rows > 0) { if (pInfo->pUpdateRes->info.rows > 0) {
// process the rest of the data // process the rest of the data
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
printDataBlock(pInfo->pUpdateRes, "Semi Session"); printDataBlock(pInfo->pUpdateRes, "sems session");
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
// semi interval operator clear disk buffer // semi interval operator clear disk buffer
...@@ -4038,21 +4033,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { ...@@ -4038,21 +4033,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows > 0) { if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, "Semi Session"); printDataBlock(pBInfo->pRes, "sems session");
return pBInfo->pRes; return pBInfo->pRes;
} }
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); // doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) { if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) {
pInfo->returnDelete = true; pInfo->returnDelete = true;
printDataBlock(pInfo->pDelRes, "Semi Session"); printDataBlock(pInfo->pDelRes, "sems session");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
if (pInfo->pUpdateRes->info.rows > 0) { if (pInfo->pUpdateRes->info.rows > 0) {
// process the rest of the data // process the rest of the data
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
printDataBlock(pInfo->pUpdateRes, "Semi Session"); printDataBlock(pInfo->pUpdateRes, "sems session");
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
......
此差异已折叠。
此差异已折叠。
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#ifdef LINUX
#include <unistd.h>
#endif
#ifdef WINDOWS
#include <windows.h>
#endif
#include "taosudf.h" #include "taosudf.h"
...@@ -35,6 +40,12 @@ DLL_EXPORT int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) { ...@@ -35,6 +40,12 @@ DLL_EXPORT int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) {
udfColDataSet(resultCol, i, (char *)&luckyNum, false); udfColDataSet(resultCol, i, (char *)&luckyNum, false);
} }
} }
//to simulate actual processing delay by udf
#ifdef LINUX
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
#endif
#ifdef WINDOWS
Sleep(1);
#endif
return 0; return 0;
} }
\ No newline at end of file
...@@ -468,7 +468,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk ...@@ -468,7 +468,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode* pJoin = (SJoinPhysiNode*)pNode; SSortMergeJoinPhysiNode* pJoin = (SSortMergeJoinPhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlan(pJoin->pMergeCondition, order, walker, pContext); res = walkPhysiPlan(pJoin->pMergeCondition, order, walker, pContext);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册