executorimpl.h 43.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hongze Cheng 已提交
15
// clang-format off
16 17 18
#ifndef TDENGINE_EXECUTORIMPL_H
#define TDENGINE_EXECUTORIMPL_H

19 20 21 22
#ifdef __cplusplus
extern "C" {
#endif

23
#include "os.h"
S
common  
Shengliang Guan 已提交
24
#include "tcommon.h"
25
#include "tlosertree.h"
dengyihao's avatar
dengyihao 已提交
26
#include "tsort.h"
27 28 29
#include "ttszip.h"
#include "tvariant.h"

H
Haojun Liao 已提交
30
#include "dataSinkMgt.h"
31
#include "executil.h"
H
Haojun Liao 已提交
32
#include "executor.h"
H
Haojun Liao 已提交
33
#include "planner.h"
D
dapan1121 已提交
34
#include "scalar.h"
35 36
#include "taosdef.h"
#include "tarray.h"
5
54liuyao 已提交
37
#include "tfill.h"
H
Haojun Liao 已提交
38
#include "thash.h"
39
#include "tlockfree.h"
D
dapan1121 已提交
40
#include "tmsg.h"
dengyihao's avatar
dengyihao 已提交
41
#include "tpagedbuf.h"
L
Liu Jicong 已提交
42
#include "tstream.h"
L
Liu Jicong 已提交
43
#include "tstreamUpdate.h"
H
Haojun Liao 已提交
44

45
#include "executorInt.h"
L
Liu Jicong 已提交
46
#include "vnode.h"
H
Hongze Cheng 已提交
47

48 49
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);

dengyihao's avatar
dengyihao 已提交
50
#define Q_STATUS_EQUAL(p, s)  (((p) & (s)) != 0u)
5
54liuyao 已提交
51 52 53 54 55
#define IS_VALID_SESSION_WIN(winInfo)        ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo)     ((winInfo).sessionWin.win.skey = INT64_MIN)
#define IS_INVALID_SESSION_WIN_KEY(winKey)   ((winKey).win.skey <= 0)
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)

56
enum {
57 58
  // when this task starts to execute, this status will set
  TASK_NOT_COMPLETED = 0x1u,
59

60
  /* Task is over
61 62 63
   * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
   * 2. when all data within queried time window, it is also denoted as query_completed
   */
64
  TASK_COMPLETED = 0x2u,
65 66 67 68 69 70
};

/**
 * If the number of generated results is greater than this value,
 * query query will be halt and return results to client immediate.
 */
dengyihao's avatar
dengyihao 已提交
71 72 73 74 75
typedef struct SResultInfo {  // TODO refactor
  int64_t totalRows;          // total generated result size in rows
  int64_t totalBytes;         // total results in bytes.
  int32_t capacity;           // capacity of current result output buffer
  int32_t threshold;          // result size threshold in rows.
H
Haojun Liao 已提交
76
} SResultInfo;
77 78

typedef struct STableQueryInfo {
L
Liu Jicong 已提交
79 80
  TSKEY              lastKey;  // last check ts, todo remove it later
  SResultRowPosition pos;      // current active time window
81 82
} STableQueryInfo;

H
Haojun Liao 已提交
83 84 85 86 87
typedef struct SLimit {
  int64_t limit;
  int64_t offset;
} SLimit;

88
typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
89

H
Haojun Liao 已提交
90
typedef struct STaskCostInfo {
91 92
  int64_t  created;
  int64_t  start;
93 94 95 96 97 98 99 100
  uint64_t loadStatisTime;
  uint64_t loadFileBlockTime;
  uint64_t loadDataInCacheTime;
  uint64_t loadStatisSize;
  uint64_t loadFileBlockSize;
  uint64_t loadDataInCacheSize;

  uint64_t loadDataTime;
101 102

  SFileBlockLoadRecorder* pRecoder;
L
Liu Jicong 已提交
103
  uint64_t                elapsedTime;
104

105 106 107 108 109 110 111
  uint64_t winInfoSize;
  uint64_t tableInfoSize;
  uint64_t hashSize;
  uint64_t numOfTimeWindows;

  SArray*   queryProfEvents;      // SArray<SQueryProfEvent>
  SHashObj* operatorProfResults;  // map<operator_type, SQueryProfEvent>
H
Haojun Liao 已提交
112
} STaskCostInfo;
113

H
Haojun Liao 已提交
114
typedef struct SOperatorCostInfo {
L
Liu Jicong 已提交
115 116
  double openCost;
  double totalCost;
H
Haojun Liao 已提交
117 118
} SOperatorCostInfo;

119 120
struct SOperatorInfo;

wmmhello's avatar
wmmhello 已提交
121
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
122
typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result);
123

124
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
125
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
126
typedef void (*__optr_close_fn_t)(void* param);
127
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
H
Haojun Liao 已提交
128

H
Haojun Liao 已提交
129
typedef struct STaskIdInfo {
130 131 132 133
  uint64_t queryId;  // this is also a request id
  uint64_t subplanId;
  uint64_t templateId;
  char*    str;
H
Haojun Liao 已提交
134 135
} STaskIdInfo;

L
Liu Jicong 已提交
136 137 138 139 140 141
enum {
  STREAM_RECOVER_STEP__NONE = 0,
  STREAM_RECOVER_STEP__PREPARE,
  STREAM_RECOVER_STEP__SCAN,
};

L
Liu Jicong 已提交
142
typedef struct {
L
Liu Jicong 已提交
143 144 145 146 147 148
  // TODO remove prepareStatus
  STqOffsetVal prepareStatus;  // for tmq
  STqOffsetVal lastStatus;     // for tmq
  SMqMetaRsp   metaRsp;        // for tmq fetching meta
  int8_t       returned;
  int64_t      snapshotVer;
L
Liu Jicong 已提交
149
  const SSubmitReq*  pReq;
L
Liu Jicong 已提交
150 151 152 153 154 155 156

  SSchemaWrapper*     schema;
  char                tbName[TSDB_TABLE_NAME_LEN];
  SSDataBlock*        pullOverBlk;  // for streaming
  SWalFilterCond      cond;
  int64_t             lastScanUid;
  int8_t              recoverStep;
L
Liu Jicong 已提交
157
  SQueryTableDataCond tableCond;
L
Liu Jicong 已提交
158 159
  int64_t             recoverStartVer;
  int64_t             recoverEndVer;
L
Liu Jicong 已提交
160 161
  int64_t             fillHistoryVer1;
  int64_t             fillHistoryVer2;
L
Liu Jicong 已提交
162
  SStreamState*       pState;
L
Liu Jicong 已提交
163 164
} SStreamTaskInfo;

165 166 167 168 169
typedef struct {
  char*           tablename;
  char*           dbname;
  int32_t         tversion;
  SSchemaWrapper* sw;
170
  SSchemaWrapper* qsw;
171 172
} SSchemaInfo;

173
typedef struct SExecTaskInfo {
L
Liu Jicong 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
  STaskIdInfo   id;
  uint32_t      status;
  STimeWindow   window;
  STaskCostInfo cost;
  int64_t       owner;  // if it is in execution
  int32_t       code;

  int64_t               version;  // used for stream to record wal version
  SStreamTaskInfo       streamInfo;
  SSchemaInfo           schemaInfo;
  STableListInfo        tableqinfoList;  // this is a table list
  const char*           sql;             // query sql string
  jmp_buf               env;             // jump to this position when error happens.
  EOPTR_EXEC_MODEL      execModel;       // operator execution model [batch model|stream model]
  SSubplan*             pSubplan;
189
  struct SOperatorInfo* pRoot;
D
dapan1121 已提交
190
  SLocalFetch      localFetch;
191
} SExecTaskInfo;
H
Haojun Liao 已提交
192

193
enum {
L
Liu Jicong 已提交
194 195
  OP_NOT_OPENED = 0x0,
  OP_OPENED = 0x1,
H
Haojun Liao 已提交
196
  OP_RES_TO_RETURN = 0x5,
L
Liu Jicong 已提交
197
  OP_EXEC_DONE = 0x9,
L
Liu Jicong 已提交
198
  OP_EXEC_RECV = 0x11,
199 200
};

201
typedef struct SOperatorFpSet {
L
Liu Jicong 已提交
202 203 204 205 206 207 208 209
  __optr_open_fn_t    _openFn;  // DO NOT invoke this function directly
  __optr_fn_t         getNextFn;
  __optr_fn_t         getStreamResFn;  // execute the aggregate in the stream model, todo remove it
  __optr_fn_t         cleanupFn;       // call this function to release the allocated resources ASAP
  __optr_close_fn_t   closeFn;
  __optr_encode_fn_t  encodeResultRow;
  __optr_decode_fn_t  decodeResultRow;
  __optr_explain_fn_t getExplainFn;
210 211
} SOperatorFpSet;

212 213
typedef struct SExprSupp {
  SExprInfo*      pExprInfo;
L
Liu Jicong 已提交
214
  int32_t         numOfExprs;  // the number of scalar expression in group operator
215 216
  SqlFunctionCtx* pCtx;
  int32_t*        rowEntryInfoOffset;  // offset value for each row result cell info
H
Haojun Liao 已提交
217
  SFilterInfo*    pFilterInfo;
218 219
} SExprSupp;

220
typedef struct SOperatorInfo {
221
  uint16_t               operatorType;
222
  int16_t                resultDataBlockId;
L
Liu Jicong 已提交
223 224 225 226 227 228 229 230 231 232 233
  bool                   blocking;  // block operator or not
  uint8_t                status;    // denote if current operator is completed
  char*                  name;      // name, for debug purpose
  void*                  info;      // extension attribution
  SExprSupp              exprSupp;
  SExecTaskInfo*         pTaskInfo;
  SOperatorCostInfo      cost;
  SResultInfo            resultInfo;
  struct SOperatorInfo** pDownstream;      // downstram pointer list
  int32_t                numOfDownstream;  // number of downstream. The value is always ONE expect for join operator
  SOperatorFpSet         fpSet;
234 235
} SOperatorInfo;

236 237
typedef enum {
  EX_SOURCE_DATA_NOT_READY = 0x1,
H
Haojun Liao 已提交
238
  EX_SOURCE_DATA_READY     = 0x2,
239 240
  EX_SOURCE_DATA_EXHAUSTED = 0x3,
} EX_SOURCE_STATUS;
241

242 243 244
#define COL_MATCH_FROM_COL_ID  0x1
#define COL_MATCH_FROM_SLOT_ID 0x2

245
typedef struct SSourceDataInfo {
L
Liu Jicong 已提交
246 247 248 249 250 251
  int32_t            index;
  SRetrieveTableRsp* pRsp;
  uint64_t           totalRows;
  int32_t            code;
  EX_SOURCE_STATUS   status;
  const char*        taskId;
252 253
} SSourceDataInfo;

H
Haojun Liao 已提交
254
typedef struct SLoadRemoteDataInfo {
dengyihao's avatar
dengyihao 已提交
255 256 257
  uint64_t totalSize;     // total load bytes from remote
  uint64_t totalRows;     // total number of rows
  uint64_t totalElapsed;  // total elapsed time
H
Haojun Liao 已提交
258 259
} SLoadRemoteDataInfo;

260
typedef struct SLimitInfo {
L
Liu Jicong 已提交
261 262 263 264 265 266 267
  SLimit   limit;
  SLimit   slimit;
  uint64_t currentGroupId;
  int64_t  remainGroupOffset;
  int64_t  numOfOutputGroups;
  int64_t  remainOffset;
  int64_t  numOfOutputRows;
268 269
} SLimitInfo;

270
typedef struct SExchangeInfo {
L
Liu Jicong 已提交
271 272 273 274
  SArray* pSources;
  SArray* pSourceDataInfo;
  tsem_t  ready;
  void*   pTransporter;
275 276 277
  // SArray<SSDataBlock*>, result block list, used to keep the multi-block that
  // passed by downstream operator
  SArray*             pResultBlockList;
L
Liu Jicong 已提交
278 279 280
  int32_t             rspBlockIndex;  // indicate the return block index in pResultBlockList
  SSDataBlock*        pDummyBlock;    // dummy block, not keep data
  bool                seqLoadData;    // sequential load data or not, false by default
dengyihao's avatar
dengyihao 已提交
281
  int32_t             current;
H
Haojun Liao 已提交
282
  SLoadRemoteDataInfo loadInfo;
283
  uint64_t            self;
284
  SLimitInfo          limitInfo;
285 286
} SExchangeInfo;

287
typedef struct SScanInfo {
L
Liu Jicong 已提交
288 289
  int32_t numOfAsc;
  int32_t numOfDesc;
290 291
} SScanInfo;

292
typedef struct SSampleExecInfo {
L
Liu Jicong 已提交
293 294
  double   sampleRatio;  // data block sample ratio, 1 by default
  uint32_t seed;         // random seed value
295 296
} SSampleExecInfo;

L
Liu Jicong 已提交
297 298
enum {
  TABLE_SCAN__TABLE_ORDER = 1,
299
  TABLE_SCAN__BLOCK_ORDER = 2,
L
Liu Jicong 已提交
300 301
};

302
typedef struct SAggSupporter {
H
Haojun Liao 已提交
303
  SSHashObj*     pResultRowHashTable;  // quick locate the window object for each result
304 305
  char*          keyBuf;               // window key buffer
  SDiskbasedBuf* pResultBuf;           // query result buffer based on blocked-wised disk file
L
Liu Jicong 已提交
306 307
  int32_t        resultRowSize;  // the result buffer size for each result row, with the meta data size for each row
  int32_t        currentPageId;  // current write page id
308 309 310
} SAggSupporter;

typedef struct {
L
Liu Jicong 已提交
311 312 313 314 315
  // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
  // current data block needs to be loaded.
  SInterval      interval;
  SAggSupporter* pAggSup;
  SExprSupp*     pExprSup;  // expr supporter of aggregate operator
316 317
} SAggOptrPushDownInfo;

318
typedef struct STableScanInfo {
319 320 321
  STsdbReader*           dataReader;
  SReadHandle            readHandle;
  SLimitInfo             limitInfo;
322
  SFileBlockLoadRecorder readRecorder;
L
Liu Jicong 已提交
323 324 325
  SScanInfo              scanInfo;
  int32_t                scanTimes;
  SNode*                 pFilterNode;  // filter info, which is push down by optimizer
326 327 328 329 330 331 332 333 334 335 336 337
  SSDataBlock*           pResBlock;
  SColMatchInfo          matchInfo;
  SExprSupp              pseudoSup;
  SQueryTableDataCond    cond;
  int32_t                scanFlag;  // table scan flag to denote if it is a repeat/reverse/main scan
  int32_t                dataBlockLoadFlag;
  SSampleExecInfo        sample;  // sample execution info
  int32_t                currentGroupId;
  int32_t                currentTable;
  int8_t                 scanMode;
  SAggOptrPushDownInfo   pdInfo;
  int8_t                 assignBlockUid;
338 339
} STableScanInfo;

340 341 342 343 344 345 346 347 348 349 350 351
typedef struct STableMergeScanInfo {
  STableListInfo* tableListInfo;
  int32_t         tableStartIndex;
  int32_t         tableEndIndex;
  bool            hasGroupId;
  uint64_t        groupId;
  SArray*         dataReaders;  // array of tsdbReaderT*
  SReadHandle     readHandle;
  int32_t         bufPageSize;
  uint32_t        sortBufSize;  // max buffer size for in-memory sort
  SArray*         pSortInfo;
  SSortHandle*    pSortHandle;
H
Haojun Liao 已提交
352 353 354
  SSDataBlock*    pSortInputBlock;
  int64_t         startTs;  // sort start time
  SArray*         sortSourceParams;
355
  SLimitInfo      limitInfo;
356 357 358 359 360 361 362 363 364 365
  SFileBlockLoadRecorder readRecorder;
  int64_t                numOfRows;
  SScanInfo              scanInfo;
  int32_t                scanTimes;
  SNode*                 pFilterNode;  // filter info, which is push down by optimizer
  SqlFunctionCtx*        pCtx;         // which belongs to the direct upstream operator operator query context
  SResultRowInfo*        pResultRowInfo;
  int32_t*               rowEntryInfoOffset;
  SExprInfo*             pExpr;
  SSDataBlock*           pResBlock;
H
Haojun Liao 已提交
366
  SColMatchInfo          matchInfo;
367
  int32_t                numOfOutput;
H
Haojun Liao 已提交
368 369 370 371
  SExprSupp              pseudoSup;
  SQueryTableDataCond    cond;
  int32_t                scanFlag;  // table scan flag to denote if it is a repeat/reverse/main scan
  int32_t                dataBlockLoadFlag;
372

373 374
  // if the upstream is an interval operator, the interval info is also kept here to get the time
  // window to check if current data block needs to be loaded.
H
Haojun Liao 已提交
375 376 377
  SInterval              interval;
  SSampleExecInfo        sample;  // sample execution info
  SSortExecInfo          sortExecInfo;
378 379
} STableMergeScanInfo;

380
typedef struct STagScanInfo {
L
Liu Jicong 已提交
381 382
  SColumnInfo*    pCols;
  SSDataBlock*    pRes;
H
Haojun Liao 已提交
383
  SColMatchInfo   matchInfo;
L
Liu Jicong 已提交
384 385 386
  int32_t         curPos;
  SReadHandle     readHandle;
  STableListInfo* pTableList;
387 388
} STagScanInfo;

389
typedef struct SLastrowScanInfo {
H
Haojun Liao 已提交
390 391 392 393 394 395 396 397 398 399 400
  SSDataBlock*   pRes;
  SReadHandle    readHandle;
  void*          pLastrowReader;
  SColMatchInfo  matchInfo;
  int32_t*       pSlotIds;
  SExprSupp      pseudoExprSup;
  int32_t        retrieveType;
  int32_t        currentGroupIndex;
  SSDataBlock*   pBufferredRes;
  SArray*        pUidList;
  int32_t        indexOfBufferedRes;
401 402
} SLastrowScanInfo;

5
54liuyao 已提交
403 404 405 406
typedef enum EStreamScanMode {
  STREAM_SCAN_FROM_READERHANDLE = 1,
  STREAM_SCAN_FROM_RES,
  STREAM_SCAN_FROM_UPDATERES,
407
  STREAM_SCAN_FROM_DELETE_DATA,
5
54liuyao 已提交
408
  STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
L
Liu Jicong 已提交
409
  STREAM_SCAN_FROM_DATAREADER_RANGE,
5
54liuyao 已提交
410 411
} EStreamScanMode;

412 413 414 415 416
enum {
  PROJECT_RETRIEVE_CONTINUE = 0x1,
  PROJECT_RETRIEVE_DONE = 0x2,
};

5
54liuyao 已提交
417
typedef struct SCatchSupporter {
L
Liu Jicong 已提交
418 419 420 421
  SHashObj*      pWindowHashTable;  // quick locate the window object for each window
  SDiskbasedBuf* pDataBuf;          // buffer based on blocked-wised disk file
  int32_t        keySize;
  int64_t*       pKeyBuf;
5
54liuyao 已提交
422 423
} SCatchSupporter;

5
54liuyao 已提交
424
typedef struct SStreamAggSupporter {
5
54liuyao 已提交
425 426 427 428 429 430 431 432 433
  int32_t         resultRowSize;  // the result buffer size for each result row, with the meta data size for each row
  SSDataBlock*    pScanBlock;
  SStreamState*   pState;
  int64_t         gap;            // stream session window gap
  SqlFunctionCtx* pDummyCtx;      // for combine
  SSHashObj*      pResultRows;
  int32_t         stateKeySize;
  int16_t         stateKeyType;
  SDiskbasedBuf*  pResultBuf;
5
54liuyao 已提交
434 435
} SStreamAggSupporter;

436
typedef struct SWindowSupporter {
5
54liuyao 已提交
437
  SStreamAggSupporter* pStreamAggSup;
L
Liu Jicong 已提交
438
  int64_t              gap;
439
  uint16_t             parentType;
440
  SAggSupporter*       pIntervalAggSup;
441
} SWindowSupporter;
442

443
typedef struct SPartitionBySupporter {
L
Liu Jicong 已提交
444 445 446 447
  SArray* pGroupCols;     // group by columns, SArray<SColumn>
  SArray* pGroupColVals;  // current group column values, SArray<SGroupKeys>
  char*   keyBuf;         // group by keys for hash
  bool    needCalc;       // partition by column
448 449 450
} SPartitionBySupporter;

typedef struct SPartitionDataInfo {
L
Liu Jicong 已提交
451
  uint64_t groupId;
452 453
  char*    tbname;
  SArray*  tags;
L
Liu Jicong 已提交
454
  SArray*  rowIds;
455
} SPartitionDataInfo;
456

5
54liuyao 已提交
457
typedef struct STimeWindowAggSupp {
L
Liu Jicong 已提交
458 459
  int8_t          calTrigger;
  int64_t         waterMark;
5
54liuyao 已提交
460
  int64_t         deleteMark;
L
Liu Jicong 已提交
461
  TSKEY           maxTs;
5
54liuyao 已提交
462
  TSKEY           minTs;
L
Liu Jicong 已提交
463
  SColumnInfoData timeWindowData;  // query time window info for scalar function execution.
5
54liuyao 已提交
464 465
} STimeWindowAggSupp;

466
typedef struct SStreamScanInfo {
H
Haojun Liao 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
  uint64_t              tableUid;  // queried super table uid
  SExprInfo*            pPseudoExpr;
  int32_t               numOfPseudoExpr;
  SExprSupp             tbnameCalSup;
  SExprSupp             tagCalSup;
  int32_t               primaryTsIndex;  // primary time stamp slot id
  SReadHandle           readHandle;
  SInterval             interval;       // if the upstream is an interval operator, the interval info is also kept here.
  SColMatchInfo         matchInfo;
  SNode*                pCondition;
                        
  SArray*               pBlockLists;  // multiple SSDatablock.
  SSDataBlock*          pRes;         // result SSDataBlock
  SSDataBlock*          pUpdateRes;   // update SSDataBlock
  int32_t               updateResIndex;
  int32_t               blockType;        // current block type
  int32_t               validBlockIndex;  // Is current data has returned?
  uint64_t              numOfExec;        // execution times
  STqReader*            tqReader;

  uint64_t              groupId;
  SUpdateInfo*          pUpdateInfo;
L
Liu Jicong 已提交
489

L
Liu Jicong 已提交
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
  EStreamScanMode       scanMode;
  SOperatorInfo*        pStreamScanOp;
  SOperatorInfo*        pTableScanOp;
  SArray*               childIds;
  SWindowSupporter      windowSup;
  SPartitionBySupporter partitionSup;
  SExprSupp*            pPartScalarSup;
  bool                  assignBlockUid;  // assign block uid to groupId, temporarily used for generating rollup SMA.
  int32_t               scanWinIndex;    // for state operator
  int32_t               pullDataResIndex;
  SSDataBlock*          pPullDataRes;    // pull data SSDataBlock
  SSDataBlock*          pDeleteDataRes;  // delete data SSDataBlock
  int32_t               deleteDataIndex;
  STimeWindow           updateWin;
  STimeWindowAggSupp    twAggSup;
  SSDataBlock*          pUpdateDataRes;
L
Liu Jicong 已提交
506
  // status for tmq
L
Liu Jicong 已提交
507 508 509
  SNodeList* pGroupTags;
  SNode*     pTagCond;
  SNode*     pTagIndexCond;
510
} SStreamScanInfo;
H
Haojun Liao 已提交
511

L
Liu Jicong 已提交
512 513 514 515 516 517 518 519 520 521 522 523
typedef struct {
  //  int8_t    subType;
  //  bool      withMeta;
  //  int64_t   suid;
  //  int64_t   snapVersion;
  //  void     *metaInfo;
  //  void     *dataInfo;
  SVnode*       vnode;
  SSDataBlock   pRes;  // result SSDataBlock
  STsdbReader*  dataReader;
  SSnapContext* sContext;
} SStreamRawScanInfo;
524

dengyihao's avatar
dengyihao 已提交
525 526 527 528 529 530
typedef struct SSysTableIndex {
  int8_t init;
  SArray *uids; 
  int32_t lastIdx; 
} SSysTableIndex;

531
typedef struct SSysTableScanInfo {
dengyihao's avatar
dengyihao 已提交
532 533 534 535
  SRetrieveMetaTableRsp* pRsp;
  SRetrieveTableReq      req;
  SEpSet                 epSet;
  tsem_t                 ready;
536 537
  SReadHandle            readHandle;
  int32_t                accountId;
538
  const char*            pUser;
539
  bool                   sysInfo;
540 541 542
  bool                   showRewrite;
  SNode*                 pCondition;  // db_name filter condition, to discard data that are not in current database
  SMTbCursor*            pCur;        // cursor for iterate the local table meta store.
H
Haojun Liao 已提交
543 544
  SSysTableIndex*        pIdx;         // idx for local table meta
  SColMatchInfo          matchInfo;
545 546 547 548
  SName                  name;
  SSDataBlock*           pRes;
  int64_t                numOfBlocks;  // extract basic running information.
  SLoadRemoteDataInfo    loadInfo;
549 550
} SSysTableScanInfo;

551 552 553
typedef struct SBlockDistInfo {
  SSDataBlock* pResBlock;
  void*        pHandle;
554
  SReadHandle  readHandle;
L
Liu Jicong 已提交
555
  uint64_t     uid;  // table uid
556 557
} SBlockDistInfo;

558
// todo remove this
559
typedef struct SOptrBasicInfo {
L
Liu Jicong 已提交
560 561 562
  SResultRowInfo resultRowInfo;
  SSDataBlock*   pRes;
  bool           mergeResultBlock;
563 564
} SOptrBasicInfo;

565
typedef struct SIntervalAggOperatorInfo {
wmmhello's avatar
wmmhello 已提交
566
  // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
567
  SOptrBasicInfo     binfo;              // basic info
wmmhello's avatar
wmmhello 已提交
568
  SAggSupporter      aggSup;             // aggregate supporter
569
  SExprSupp          scalarSupp;         // supporter for perform scalar function
570 571 572 573 574
  SGroupResInfo      groupResInfo;       // multiple results build supporter
  SInterval          interval;           // interval info
  int32_t            primaryTsIndex;     // primary time stamp slot id from result of downstream operator.
  STimeWindow        win;                // query time range
  bool               timeWindowInterpo;  // interpolation needed or not
575
  SArray*            pInterpCols;        // interpolation columns
576
  int32_t            resultTsOrder;      // result timestamp order
577
  int32_t            inputOrder;         // input data ts order
578 579
  EOPTR_EXEC_MODEL   execModel;          // operator execution model [batch model|stream model]
  STimeWindowAggSupp twAggSup;
L
Liu Jicong 已提交
580
  SArray*            pPrevValues;  //  SArray<SGroupKeys> used to keep the previous not null value for interpolation.
581
  SNode*             pCondition;
582
} SIntervalAggOperatorInfo;
583

584
typedef struct SMergeAlignedIntervalAggOperatorInfo {
L
Liu Jicong 已提交
585
  SIntervalAggOperatorInfo* intervalAggOperatorInfo;
586

587
//  bool         hasGroupId;
588 589
  uint64_t     groupId;  // current groupId
  int64_t      curTs;    // current ts
590 591
  SSDataBlock* prefetchedBlock;
  SNode*       pCondition;
592
  SResultRow*  pResultRow;
593 594
} SMergeAlignedIntervalAggOperatorInfo;

595
typedef struct SStreamIntervalOperatorInfo {
wmmhello's avatar
wmmhello 已提交
596
  // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
5
54liuyao 已提交
597
  SOptrBasicInfo     binfo;              // basic info
wmmhello's avatar
wmmhello 已提交
598
  SAggSupporter      aggSup;             // aggregate supporter
5
54liuyao 已提交
599
  SExprSupp          scalarSupp;         // supporter for perform scalar function
5
54liuyao 已提交
600 601 602
  SGroupResInfo      groupResInfo;       // multiple results build supporter
  SInterval          interval;           // interval info
  int32_t            primaryTsIndex;     // primary time stamp slot id from result of downstream operator.
603 604 605 606 607 608
  STimeWindowAggSupp twAggSup;
  bool               invertible;
  bool               ignoreExpiredData;
  SArray*            pDelWins;           // SWinRes
  int32_t            delIndex;
  SSDataBlock*       pDelRes;
609
  SPhysiNode*        pPhyNode;           // create new child
5
54liuyao 已提交
610
  SHashObj*          pPullDataMap;
611
  SArray*            pPullWins;          // SPullWindowInfo
5
54liuyao 已提交
612 613
  int32_t            pullIndex;
  SSDataBlock*       pPullDataRes;
614 615 616 617
  bool               isFinal;
  SArray*            pChildren;
  SStreamState*      pState;
  SWinKey            delKey;
618
  SHashObj*          pGroupIdTbNameMap;  // uint64_t -> char[TSDB_TABLE_NAME_LEN]
619
} SStreamIntervalOperatorInfo;
5
54liuyao 已提交
620

621
typedef struct SAggOperatorInfo {
wmmhello's avatar
wmmhello 已提交
622
  // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
L
Liu Jicong 已提交
623 624
  SOptrBasicInfo binfo;
  SAggSupporter  aggSup;
wmmhello's avatar
wmmhello 已提交
625

L
Liu Jicong 已提交
626 627 628 629 630
  STableQueryInfo* current;
  uint64_t         groupId;
  SGroupResInfo    groupResInfo;
  SExprSupp        scalarExprSup;
  SNode*           pCondition;
631 632 633
} SAggOperatorInfo;

typedef struct SProjectOperatorInfo {
L
Liu Jicong 已提交
634 635 636 637 638 639 640 641
  SOptrBasicInfo binfo;
  SAggSupporter  aggSup;
  SNode*         pFilterNode;  // filter info, which is push down by optimizer
  SArray*        pPseudoColInfo;
  SLimitInfo     limitInfo;
  bool           mergeDataBlocks;
  SSDataBlock*   pFinalRes;
  SNode*         pCondition;
642 643
} SProjectOperatorInfo;

H
Haojun Liao 已提交
644
typedef struct SIndefOperatorInfo {
L
Liu Jicong 已提交
645 646 647 648 649 650
  SOptrBasicInfo binfo;
  SAggSupporter  aggSup;
  SArray*        pPseudoColInfo;
  SExprSupp      scalarSup;
  SNode*         pCondition;
  uint64_t       groupId;
651

L
Liu Jicong 已提交
652
  SSDataBlock* pNextGroupRes;
H
Haojun Liao 已提交
653 654
} SIndefOperatorInfo;

655
typedef struct SFillOperatorInfo {
656 657
  struct SFillInfo* pFillInfo;
  SSDataBlock*      pRes;
H
Haojun Liao 已提交
658
  SSDataBlock*      pFinalRes;
659 660 661
  int64_t           totalInputRows;
  void**            p;
  SSDataBlock*      existNewGroupBlock;
662
  STimeWindow       win;
S
slzhou 已提交
663
  SNode*            pCondition;
H
Haojun Liao 已提交
664
  SColMatchInfo     matchInfo;
H
Haojun Liao 已提交
665
  int32_t           primaryTsCol;
H
Haojun Liao 已提交
666
  int32_t           primarySrcSlotId;
L
Liu Jicong 已提交
667
  uint64_t          curGroupId;  // current handled group id
668 669
  SExprInfo*        pExprInfo;
  int32_t           numOfExpr;
670
  SExprSupp         noFillExprSupp;
671 672 673
} SFillOperatorInfo;

typedef struct SGroupbyOperatorInfo {
L
Liu Jicong 已提交
674 675
  SOptrBasicInfo binfo;
  SAggSupporter  aggSup;
H
Haojun Liao 已提交
676 677 678 679 680 681 682 683
  SArray*        pGroupCols;     // group by columns, SArray<SColumn>
  SArray*        pGroupColVals;  // current group column values, SArray<SGroupKeys>
  SNode*         pCondition;
  bool           isInit;         // denote if current val is initialized or not
  char*          keyBuf;         // group by keys for hash
  int32_t        groupKeyLen;    // total group by column width
  SGroupResInfo  groupResInfo;
  SExprSupp      scalarSup;
684 685
} SGroupbyOperatorInfo;

H
Haojun Liao 已提交
686
typedef struct SDataGroupInfo {
L
Liu Jicong 已提交
687 688 689
  uint64_t groupId;
  int64_t  numOfRows;
  SArray*  pPageList;
H
Haojun Liao 已提交
690 691 692 693
} SDataGroupInfo;

// The sort in partition may be needed later.
typedef struct SPartitionOperatorInfo {
dengyihao's avatar
dengyihao 已提交
694 695 696 697 698 699 700
  SOptrBasicInfo binfo;
  SArray*        pGroupCols;
  SArray*        pGroupColVals;  // current group column values, SArray<SGroupKeys>
  char*          keyBuf;         // group by keys for hash
  int32_t        groupKeyLen;    // total group by column width
  SHashObj*      pGroupSet;      // quick locate the window object for each result

L
Liu Jicong 已提交
701 702 703 704 705 706
  SDiskbasedBuf* pBuf;              // query result buffer based on blocked-wised disk file
  int32_t        rowCapacity;       // maximum number of rows for each buffer page
  int32_t*       columnOffset;      // start position for each column data
  SArray*        sortedGroupArray;  // SDataGroupInfo sorted by group id
  int32_t        groupIndex;        // group index
  int32_t        pageIndex;         // page index of current group
707
  SExprSupp      scalarSup;
H
Haojun Liao 已提交
708
} SPartitionOperatorInfo;
709

710
typedef struct SWindowRowsSup {
dengyihao's avatar
dengyihao 已提交
711 712 713 714
  STimeWindow win;
  TSKEY       prevTs;
  int32_t     startRowIndex;
  int32_t     numOfRows;
715
  uint64_t    groupId;
716 717
} SWindowRowsSup;

H
Haojun Liao 已提交
718
typedef struct SSessionAggOperatorInfo {
L
Liu Jicong 已提交
719 720
  SOptrBasicInfo binfo;
  SAggSupporter  aggSup;
wmmhello's avatar
wmmhello 已提交
721

722 723
  SGroupResInfo      groupResInfo;
  SWindowRowsSup     winSup;
L
Liu Jicong 已提交
724 725 726
  bool               reptScan;  // next round scan
  int64_t            gap;       // session window gap
  int32_t            tsSlotId;  // primary timestamp slot id
727
  STimeWindowAggSupp twAggSup;
L
Liu Jicong 已提交
728
  const SNode*       pCondition;
H
Haojun Liao 已提交
729
} SSessionAggOperatorInfo;
730

5
54liuyao 已提交
731
typedef struct SResultWindowInfo {
5
54liuyao 已提交
732 733
  void*              pOutputBuf;
  SSessionKey        sessionWin;
L
Liu Jicong 已提交
734
  bool               isOutput;
5
54liuyao 已提交
735 736
} SResultWindowInfo;

5
54liuyao 已提交
737 738
typedef struct SStateWindowInfo {
  SResultWindowInfo winInfo;
5
54liuyao 已提交
739
  SStateKeys*       pStateKey;
5
54liuyao 已提交
740 741
} SStateWindowInfo;

5
54liuyao 已提交
742
typedef struct SStreamSessionAggOperatorInfo {
L
Liu Jicong 已提交
743 744
  SOptrBasicInfo      binfo;
  SStreamAggSupporter streamAggSup;
5
54liuyao 已提交
745
  SExprSupp           scalarSupp;      // supporter for perform scalar function
L
Liu Jicong 已提交
746 747 748 749 750
  SGroupResInfo       groupResInfo;
  int32_t             primaryTsIndex;  // primary timestamp slot id
  int32_t             endTsIndex;      // window end timestamp slot id
  int32_t             order;           // current SSDataBlock scan order
  STimeWindowAggSupp  twAggSup;
5
54liuyao 已提交
751 752 753
  SSDataBlock*        pWinBlock;       // window result
  SSDataBlock*        pDelRes;         // delete result
  SSDataBlock*        pUpdateRes;      // update window
L
Liu Jicong 已提交
754
  bool                returnUpdate;
5
54liuyao 已提交
755
  SSHashObj*          pStDeleted;
L
Liu Jicong 已提交
756
  void*               pDelIterator;
5
54liuyao 已提交
757 758
  SArray*             pChildren;       // cache for children's result; final stream operator
  SPhysiNode*         pPhyNode;        // create new child
L
Liu Jicong 已提交
759 760
  bool                isFinal;
  bool                ignoreExpiredData;
761
  SHashObj*           pGroupIdTbNameMap;
5
54liuyao 已提交
762 763
} SStreamSessionAggOperatorInfo;

5
54liuyao 已提交
764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779
typedef struct SStreamStateAggOperatorInfo {
  SOptrBasicInfo      binfo;
  SStreamAggSupporter streamAggSup;
  SExprSupp           scalarSupp;      // supporter for perform scalar function
  SGroupResInfo       groupResInfo;
  int32_t             primaryTsIndex;  // primary timestamp slot id
  STimeWindowAggSupp  twAggSup;
  SColumn             stateCol;
  SSDataBlock*        pDelRes;
  SSHashObj*          pSeDeleted;
  void*               pDelIterator;
  SArray*             pChildren;       // cache for children's result;
  bool                ignoreExpiredData;
  SHashObj*           pGroupIdTbNameMap;
} SStreamStateAggOperatorInfo;

780 781 782 783
typedef struct SStreamPartitionOperatorInfo {
  SOptrBasicInfo        binfo;
  SPartitionBySupporter partitionSup;
  SExprSupp             scalarSup;
784
  SExprSupp             tbnameCalSup;
L
Liu Jicong 已提交
785
  SExprSupp             tagCalSup;
786 787 788 789
  SHashObj*             pPartitions;
  void*                 parIte;
  SSDataBlock*          pInputDataBlock;
  int32_t               tsColIndex;
790
  SSDataBlock*          pDelRes;
791 792
} SStreamPartitionOperatorInfo;

5
54liuyao 已提交
793 794 795 796 797 798 799 800 801 802
typedef struct SStreamFillOperatorInfo {
  SStreamFillSupporter* pFillSup;
  SSDataBlock*      pRes;
  SSDataBlock*      pSrcBlock;
  int32_t           srcRowIndex;
  SSDataBlock*      pPrevSrcBlock;
  SSDataBlock*      pSrcDelBlock;
  int32_t           srcDelRowIndex;
  SSDataBlock*      pDelRes;
  SNode*            pCondition;
H
Haojun Liao 已提交
803
  SColMatchInfo     matchInfo;
5
54liuyao 已提交
804 805 806 807 808
  int32_t           primaryTsCol;
  int32_t           primarySrcSlotId;
  SStreamFillInfo*  pFillInfo;
} SStreamFillOperatorInfo;

809
typedef struct STimeSliceOperatorInfo {
L
Liu Jicong 已提交
810 811 812 813 814 815 816 817 818 819 820 821 822 823
  SSDataBlock*         pRes;
  STimeWindow          win;
  SInterval            interval;
  int64_t              current;
  SArray*              pPrevRow;     // SArray<SGroupValue>
  SArray*              pNextRow;     // SArray<SGroupValue>
  SArray*              pLinearInfo;  // SArray<SFillLinearInfo>
  bool                 fillLastPoint;
  bool                 isPrevRowSet;
  bool                 isNextRowSet;
  int32_t              fillType;      // fill type
  SColumn              tsCol;         // primary timestamp column
  SExprSupp            scalarSup;     // scalar calculation
  struct SFillColInfo* pFillColInfo;  // fill column info
824 825
} STimeSliceOperatorInfo;

826
typedef struct SStateWindowOperatorInfo {
wmmhello's avatar
wmmhello 已提交
827
  // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
L
Liu Jicong 已提交
828 829
  SOptrBasicInfo binfo;
  SAggSupporter  aggSup;
830
  SExprSupp      scalarSup;
wmmhello's avatar
wmmhello 已提交
831

832 833
  SGroupResInfo      groupResInfo;
  SWindowRowsSup     winSup;
834
  SColumn            stateCol;  // start row index
835 836
  bool               hasKey;
  SStateKeys         stateKey;
837
  int32_t            tsSlotId;  // primary timestamp column slot id
838
  STimeWindowAggSupp twAggSup;
L
Liu Jicong 已提交
839
  const SNode* pCondition;
840 841
} SStateWindowOperatorInfo;

H
Haojun Liao 已提交
842
typedef struct SSortOperatorInfo {
843
  SOptrBasicInfo binfo;
L
Liu Jicong 已提交
844 845 846
  uint32_t       sortBufSize;  // max buffer size for in-memory sort
  SArray*        pSortInfo;
  SSortHandle*   pSortHandle;
H
Haojun Liao 已提交
847
  SColMatchInfo  matchInfo;
L
Liu Jicong 已提交
848
  int32_t        bufPageSize;
849 850 851 852
  int64_t        startTs;      // sort start time
  uint64_t       sortElapsed;  // sort elapsed time, time to flush to disk not included.
  SLimitInfo     limitInfo;
  SNode*         pCondition;
H
Haojun Liao 已提交
853
} SSortOperatorInfo;
854

dengyihao's avatar
dengyihao 已提交
855 856 857
typedef struct STagFilterOperatorInfo {
  SOptrBasicInfo binfo;
} STagFilterOperatorInfo;
858

859
typedef struct SJoinOperatorInfo {
L
Liu Jicong 已提交
860 861 862 863 864 865 866 867 868 869 870 871
  SSDataBlock* pRes;
  int32_t      joinType;
  int32_t      inputOrder;

  SSDataBlock* pLeft;
  int32_t      leftPos;
  SColumnInfo  leftCol;

  SSDataBlock* pRight;
  int32_t      rightPos;
  SColumnInfo  rightCol;
  SNode*       pCondAfterMerge;
872 873
} SJoinOperatorInfo;

874 875 876
#define OPTR_IS_OPENED(_optr)  (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)

877 878
void doDestroyExchangeOperatorInfo(void* param);

879
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
880
                                   __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_explain_fn_t explain);
881

H
Haojun Liao 已提交
882 883
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
884 885 886

void    initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void    cleanupBasicInfo(SOptrBasicInfo* pInfo);
887
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
888
void    cleanupExprSupp(SExprSupp* pSup);
889
void    destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
L
Liu Jicong 已提交
890
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
891
                    const char* pkey);
L
Liu Jicong 已提交
892
void    initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
893 894 895

void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf);
L
Liu Jicong 已提交
896 897
void    doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                               SDiskbasedBuf* pBuf);
898

L
Liu Jicong 已提交
899
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
900 901
bool    hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
void    initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
902
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator);
903

L
Liu Jicong 已提交
904 905
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
                      int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
906

H
Haojun Liao 已提交
907
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
L
Liu Jicong 已提交
908 909
void    updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
                             SOperatorInfo* pOperator);
910

911 912
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);

L
Liu Jicong 已提交
913
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag);
914
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
915

dengyihao's avatar
dengyihao 已提交
916
void    doSetOperatorCompleted(SOperatorInfo* pOperator);
H
Haojun Liao 已提交
917
void    doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo);
918 919
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
                               SSDataBlock* pBlock, const char* idStr);
920

L
Liu Jicong 已提交
921 922 923
void cleanupAggSup(SAggSupporter* pAggSup);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
924

L
Liu Jicong 已提交
925
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts);
926
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts);
927

928
SSDataBlock* loadNextDataBlock(void* param);
929

930
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
931

L
Liu Jicong 已提交
932 933 934
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup);
H
Haojun Liao 已提交
935

936 937
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);

L
Liu Jicong 已提交
938 939
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
                                           SExecTaskInfo* pTaskInfo);
dengyihao's avatar
dengyihao 已提交
940 941
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
942 943
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
                                              const char* pUser, SExecTaskInfo* pTaskInfo);
944

H
Haojun Liao 已提交
945
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
946

L
Liu Jicong 已提交
947 948 949 950
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
                                                 SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
                                         SExecTaskInfo* pTaskInfo);
951
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
952 953 954 955
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams,
                                               SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
                                           SExecTaskInfo* pTaskInfo);
956

H
Haojun Liao 已提交
957
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode,
L
Liu Jicong 已提交
958
                                          SExecTaskInfo* pTaskInfo, bool isStream);
959 960 961 962
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
                                               SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
                                                      SExecTaskInfo* pTaskInfo);
963 964
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                     SExecTaskInfo* pTaskInfo, int32_t numOfChild);
H
Haojun Liao 已提交
965 966
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
                                            SExecTaskInfo* pTaskInfo);
H
Haojun Liao 已提交
967
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode *pAggNode, SExecTaskInfo* pTaskInfo);
968 969
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
                                               SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo);
970

971
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
972
                                            SExecTaskInfo* pTaskInfo);
973

974 975
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);

L
Liu Jicong 已提交
976 977 978 979 980
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
                                      SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
                                             SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
981
                                           SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
982

983
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
L
Liu Jicong 已提交
984
                                                 SExecTaskInfo* pTaskInfo);
985

986
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
987 988
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
                                           SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
989

L
Liu Jicong 已提交
990 991 992 993
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                  SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                       SExecTaskInfo* pTaskInfo, int32_t numOfChild);
994 995
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream,
                                                SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
5
54liuyao 已提交
996

L
Liu Jicong 已提交
997 998
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                SExecTaskInfo* pTaskInfo);
5
54liuyao 已提交
999 1000
SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
                                            SExecTaskInfo* pTaskInfo);
5
54liuyao 已提交
1001

1002
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
L
Liu Jicong 已提交
1003
                              int32_t numOfOutput, SArray* pPseudoList);
1004

1005
void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
X
Xiaoyu Wang 已提交
1006

1007
bool    isTaskKilled(SExecTaskInfo* pTaskInfo);
1008 1009
int32_t checkForQueryBuf(size_t numOfTables);

L
Liu Jicong 已提交
1010 1011
void setTaskKilled(SExecTaskInfo* pTaskInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo);
1012

dengyihao's avatar
dengyihao 已提交
1013
void    doDestroyTask(SExecTaskInfo* pTaskInfo);
1014 1015
int32_t getMaximumIdleDurationSec();

wmmhello's avatar
wmmhello 已提交
1016 1017 1018 1019
/*
 * ops:     root operator
 * data:    *data save the result of encode, need to be freed by caller
 * length:  *length save the length of *data
C
Cary Xu 已提交
1020
 * nOptrWithVal: *nOptrWithVal save the number of optr with value
wmmhello's avatar
wmmhello 已提交
1021 1022
 * return:  result code, 0 means success
 */
L
Liu Jicong 已提交
1023
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t* length, int32_t* nOptrWithVal);
wmmhello's avatar
wmmhello 已提交
1024 1025 1026 1027 1028 1029 1030

/*
 * ops:    root operator, created by caller
 * data:   save the result of decode
 * length: the length of data
 * return: result code, 0 means success
 */
H
Haojun Liao 已提交
1031
int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length);
wmmhello's avatar
wmmhello 已提交
1032

1033
void    setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
dengyihao's avatar
dengyihao 已提交
1034
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
D
dapan1121 已提交
1035
                               char* sql, EOPTR_EXEC_MODEL model);
L
Liu Jicong 已提交
1036
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
H
Haojun Liao 已提交
1037
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
dengyihao's avatar
dengyihao 已提交
1038

1039
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result);
wmmhello's avatar
wmmhello 已提交
1040 1041
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length);

1042
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
1043
                                int32_t order);
1044
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
L
Liu Jicong 已提交
1045
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
5
54liuyao 已提交
1046
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
L
Liu Jicong 已提交
1047
SResultRow*        getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
5
54liuyao 已提交
1048
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
L
Liu Jicong 已提交
1049 1050 1051 1052 1053
bool               isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool               functionNeedToExecute(SqlFunctionCtx* pCtx);
bool               isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool               isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool               isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
1054
bool               isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
1055
void               appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName);
L
Liu Jicong 已提交
1056
void               printDataBlock(SSDataBlock* pBlock, const char* flag);
1057
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
1058

1059 1060
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
                                             SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
1061

L
Liu Jicong 已提交
1062 1063 1064
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
                                STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
                                const char* idstr);
1065

S
shenglian zhou 已提交
1066 1067
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
                                           SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
1068
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
1069
                                                SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
1070

1071
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
1072

L
Liu Jicong 已提交
1073
bool    groupbyTbname(SNodeList* pGroupList);
wmmhello's avatar
wmmhello 已提交
1074
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
L
Liu Jicong 已提交
1075
void*   destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
1076
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
1077
                                   SGroupResInfo* pGroupResInfo);
5
54liuyao 已提交
1078
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
1079
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
1080
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
1081 1082 1083 1084
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
                     int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
5
54liuyao 已提交
1085
void    getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
1086

1087 1088 1089 1090
#ifdef __cplusplus
}
#endif

1091
#endif  // TDENGINE_EXECUTORIMPL_H