executorimpl.h 37.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef TDENGINE_EXECUTORIMPL_H
#define TDENGINE_EXECUTORIMPL_H

18 19 20 21
#ifdef __cplusplus
extern "C" {
#endif

22
#include "os.h"
S
common  
Shengliang Guan 已提交
23
#include "tcommon.h"
24
#include "tlosertree.h"
dengyihao's avatar
dengyihao 已提交
25
#include "tsort.h"
26 27 28
#include "ttszip.h"
#include "tvariant.h"

H
Haojun Liao 已提交
29
#include "dataSinkMgt.h"
30
#include "executil.h"
H
Haojun Liao 已提交
31
#include "executor.h"
H
Haojun Liao 已提交
32
#include "planner.h"
D
dapan1121 已提交
33
#include "scalar.h"
34 35
#include "taosdef.h"
#include "tarray.h"
5
54liuyao 已提交
36
#include "tfill.h"
H
Haojun Liao 已提交
37
#include "thash.h"
38
#include "tlockfree.h"
D
dapan1121 已提交
39
#include "tmsg.h"
dengyihao's avatar
dengyihao 已提交
40
#include "tpagedbuf.h"
L
Liu Jicong 已提交
41
#include "tstream.h"
L
Liu Jicong 已提交
42
#include "tstreamUpdate.h"
H
Haojun Liao 已提交
43

44
#include "executorInt.h"
L
Liu Jicong 已提交
45
#include "vnode.h"
H
Hongze Cheng 已提交
46

47 48
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);

L
Liu Jicong 已提交
49
#define Q_STATUS_EQUAL(p, s)                 (((p) & (s)) != 0u)
5
54liuyao 已提交
50 51 52 53 54
#define IS_VALID_SESSION_WIN(winInfo)        ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo)     ((winInfo).sessionWin.win.skey = INT64_MIN)
#define IS_INVALID_SESSION_WIN_KEY(winKey)   ((winKey).win.skey <= 0)
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)

55
enum {
56 57
  // when this task starts to execute, this status will set
  TASK_NOT_COMPLETED = 0x1u,
58

59
  /* Task is over
60 61 62
   * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
   * 2. when all data within queried time window, it is also denoted as query_completed
   */
63
  TASK_COMPLETED = 0x2u,
64 65 66 67 68 69
};

/**
 * If the number of generated results is greater than this value,
 * query query will be halt and return results to client immediate.
 */
dengyihao's avatar
dengyihao 已提交
70 71 72 73 74
typedef struct SResultInfo {  // TODO refactor
  int64_t totalRows;          // total generated result size in rows
  int64_t totalBytes;         // total results in bytes.
  int32_t capacity;           // capacity of current result output buffer
  int32_t threshold;          // result size threshold in rows.
H
Haojun Liao 已提交
75
} SResultInfo;
76 77

typedef struct STableQueryInfo {
L
Liu Jicong 已提交
78 79
  TSKEY              lastKey;  // last check ts, todo remove it later
  SResultRowPosition pos;      // current active time window
80 81
} STableQueryInfo;

H
Haojun Liao 已提交
82 83 84 85 86
typedef struct SLimit {
  int64_t limit;
  int64_t offset;
} SLimit;

87
typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
88

H
Haojun Liao 已提交
89
typedef struct STaskCostInfo {
90 91 92 93 94
  int64_t                 created;
  int64_t                 start;
  uint64_t                elapsedTime;
  double                  extractListTime;
  double                  groupIdMapTime;
95
  SFileBlockLoadRecorder* pRecoder;
H
Haojun Liao 已提交
96
} STaskCostInfo;
97

H
Haojun Liao 已提交
98
typedef struct SOperatorCostInfo {
L
Liu Jicong 已提交
99 100
  double openCost;
  double totalCost;
H
Haojun Liao 已提交
101 102
} SOperatorCostInfo;

103 104
struct SOperatorInfo;

wmmhello's avatar
wmmhello 已提交
105
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
106
typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result);
107

108
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
109
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
110
typedef void (*__optr_close_fn_t)(void* param);
111
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
H
Haojun Liao 已提交
112

H
Haojun Liao 已提交
113
typedef struct STaskIdInfo {
114 115 116 117
  uint64_t queryId;  // this is also a request id
  uint64_t subplanId;
  uint64_t templateId;
  char*    str;
H
Haojun Liao 已提交
118 119
} STaskIdInfo;

L
Liu Jicong 已提交
120 121
enum {
  STREAM_RECOVER_STEP__NONE = 0,
122 123
  STREAM_RECOVER_STEP__PREPARE1,
  STREAM_RECOVER_STEP__PREPARE2,
L
Liu Jicong 已提交
124 125 126
  STREAM_RECOVER_STEP__SCAN,
};

L
Liu Jicong 已提交
127
typedef struct {
L
Liu Jicong 已提交
128
  // TODO remove prepareStatus
129 130 131 132 133 134
  STqOffsetVal      prepareStatus;  // for tmq
  STqOffsetVal      lastStatus;     // for tmq
  SMqMetaRsp        metaRsp;        // for tmq fetching meta
  int8_t            returned;
  int64_t           snapshotVer;
  const SSubmitReq* pReq;
L
Liu Jicong 已提交
135 136 137 138

  SSchemaWrapper*     schema;
  char                tbName[TSDB_TABLE_NAME_LEN];
  int8_t              recoverStep;
L
Liu Jicong 已提交
139
  SQueryTableDataCond tableCond;
L
Liu Jicong 已提交
140 141
  int64_t             fillHistoryVer1;
  int64_t             fillHistoryVer2;
142

L
Liu Jicong 已提交
143 144
  // int8_t        triggerSaved;
  // int64_t       deleteMarkSaved;
145
  SStreamState* pState;
L
Liu Jicong 已提交
146 147
} SStreamTaskInfo;

148 149 150 151 152
typedef struct {
  char*           tablename;
  char*           dbname;
  int32_t         tversion;
  SSchemaWrapper* sw;
153
  SSchemaWrapper* qsw;
154 155
} SSchemaInfo;

H
Haojun Liao 已提交
156
typedef struct SExchangeOpStopInfo {
H
Haojun Liao 已提交
157 158 159 160
  int32_t operatorType;
  int64_t refId;
} SExchangeOpStopInfo;

H
Haojun Liao 已提交
161
typedef struct STaskStopInfo {
H
Haojun Liao 已提交
162 163 164 165
  SRWLatch lock;
  SArray*  pStopInfo;
} STaskStopInfo;

H
Haojun Liao 已提交
166
struct SExecTaskInfo {
167 168 169 170 171 172 173
  STaskIdInfo   id;
  uint32_t      status;
  STimeWindow   window;
  STaskCostInfo cost;
  int64_t       owner;  // if it is in execution
  int32_t       code;

L
Liu Jicong 已提交
174 175 176
  int64_t               version;  // used for stream to record wal version
  SStreamTaskInfo       streamInfo;
  SSchemaInfo           schemaInfo;
H
Haojun Liao 已提交
177
  STableListInfo*       pTableInfoList;  // this is a table list
L
Liu Jicong 已提交
178 179 180 181
  const char*           sql;             // query sql string
  jmp_buf               env;             // jump to this position when error happens.
  EOPTR_EXEC_MODEL      execModel;       // operator execution model [batch model|stream model]
  SSubplan*             pSubplan;
182
  struct SOperatorInfo* pRoot;
183
  SLocalFetch           localFetch;
H
Haojun Liao 已提交
184
  SArray*               pResultBlockList;// result block list
D
dapan1121 已提交
185
  STaskStopInfo         stopInfo;
H
Haojun Liao 已提交
186
};
H
Haojun Liao 已提交
187

188
enum {
L
Liu Jicong 已提交
189 190
  OP_NOT_OPENED = 0x0,
  OP_OPENED = 0x1,
H
Haojun Liao 已提交
191
  OP_RES_TO_RETURN = 0x5,
L
Liu Jicong 已提交
192
  OP_EXEC_DONE = 0x9,
L
Liu Jicong 已提交
193
  OP_EXEC_RECV = 0x11,
194 195
};

196
typedef struct SOperatorFpSet {
L
Liu Jicong 已提交
197 198
  __optr_open_fn_t    _openFn;  // DO NOT invoke this function directly
  __optr_fn_t         getNextFn;
199
  __optr_fn_t         cleanupFn;  // call this function to release the allocated resources ASAP
L
Liu Jicong 已提交
200 201 202 203
  __optr_close_fn_t   closeFn;
  __optr_encode_fn_t  encodeResultRow;
  __optr_decode_fn_t  decodeResultRow;
  __optr_explain_fn_t getExplainFn;
204 205
} SOperatorFpSet;

206 207
typedef struct SExprSupp {
  SExprInfo*      pExprInfo;
L
Liu Jicong 已提交
208
  int32_t         numOfExprs;  // the number of scalar expression in group operator
209 210
  SqlFunctionCtx* pCtx;
  int32_t*        rowEntryInfoOffset;  // offset value for each row result cell info
H
Haojun Liao 已提交
211
  SFilterInfo*    pFilterInfo;
212 213
} SExprSupp;

214
typedef struct SOperatorInfo {
215
  uint16_t               operatorType;
216
  int16_t                resultDataBlockId;
L
Liu Jicong 已提交
217 218 219 220 221 222 223 224 225 226 227
  bool                   blocking;  // block operator or not
  uint8_t                status;    // denote if current operator is completed
  char*                  name;      // name, for debug purpose
  void*                  info;      // extension attribution
  SExprSupp              exprSupp;
  SExecTaskInfo*         pTaskInfo;
  SOperatorCostInfo      cost;
  SResultInfo            resultInfo;
  struct SOperatorInfo** pDownstream;      // downstram pointer list
  int32_t                numOfDownstream;  // number of downstream. The value is always ONE expect for join operator
  SOperatorFpSet         fpSet;
228 229
} SOperatorInfo;

230 231
typedef enum {
  EX_SOURCE_DATA_NOT_READY = 0x1,
L
Liu Jicong 已提交
232
  EX_SOURCE_DATA_READY = 0x2,
233 234
  EX_SOURCE_DATA_EXHAUSTED = 0x3,
} EX_SOURCE_STATUS;
235

236 237 238
#define COL_MATCH_FROM_COL_ID  0x1
#define COL_MATCH_FROM_SLOT_ID 0x2

H
Haojun Liao 已提交
239
typedef struct SLoadRemoteDataInfo {
dengyihao's avatar
dengyihao 已提交
240 241 242
  uint64_t totalSize;     // total load bytes from remote
  uint64_t totalRows;     // total number of rows
  uint64_t totalElapsed;  // total elapsed time
H
Haojun Liao 已提交
243 244
} SLoadRemoteDataInfo;

245
typedef struct SLimitInfo {
L
Liu Jicong 已提交
246 247 248 249 250 251 252
  SLimit   limit;
  SLimit   slimit;
  uint64_t currentGroupId;
  int64_t  remainGroupOffset;
  int64_t  numOfOutputGroups;
  int64_t  remainOffset;
  int64_t  numOfOutputRows;
253 254
} SLimitInfo;

255
typedef struct SExchangeInfo {
256 257 258 259
  SArray*             pSources;
  SArray*             pSourceDataInfo;
  tsem_t              ready;
  void*               pTransporter;
H
Haojun Liao 已提交
260

261 262
  // SArray<SSDataBlock*>, result block list, used to keep the multi-block that
  // passed by downstream operator
H
Haojun Liao 已提交
263
  SArray*             pResultBlockList;
H
Haojun Liao 已提交
264
  SArray*             pRecycledBlocks;// build a pool for small data block to avoid to repeatly create and then destroy.
L
Liu Jicong 已提交
265 266
  SSDataBlock*        pDummyBlock;    // dummy block, not keep data
  bool                seqLoadData;    // sequential load data or not, false by default
dengyihao's avatar
dengyihao 已提交
267
  int32_t             current;
H
Haojun Liao 已提交
268
  SLoadRemoteDataInfo loadInfo;
269
  uint64_t            self;
270
  SLimitInfo          limitInfo;
H
Haojun Liao 已提交
271
  int64_t             openedTs;       // start exec time stamp, todo: move to SLoadRemoteDataInfo
272 273
} SExchangeInfo;

274
typedef struct SScanInfo {
L
Liu Jicong 已提交
275 276
  int32_t numOfAsc;
  int32_t numOfDesc;
277 278
} SScanInfo;

279
typedef struct SSampleExecInfo {
L
Liu Jicong 已提交
280 281
  double   sampleRatio;  // data block sample ratio, 1 by default
  uint32_t seed;         // random seed value
282 283
} SSampleExecInfo;

L
Liu Jicong 已提交
284 285
enum {
  TABLE_SCAN__TABLE_ORDER = 1,
286
  TABLE_SCAN__BLOCK_ORDER = 2,
L
Liu Jicong 已提交
287 288
};

289
typedef struct SAggSupporter {
H
Haojun Liao 已提交
290
  SSHashObj*     pResultRowHashTable;  // quick locate the window object for each result
291 292
  char*          keyBuf;               // window key buffer
  SDiskbasedBuf* pResultBuf;           // query result buffer based on blocked-wised disk file
L
Liu Jicong 已提交
293 294
  int32_t        resultRowSize;  // the result buffer size for each result row, with the meta data size for each row
  int32_t        currentPageId;  // current write page id
295 296 297
} SAggSupporter;

typedef struct {
L
Liu Jicong 已提交
298 299 300 301 302
  // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
  // current data block needs to be loaded.
  SInterval      interval;
  SAggSupporter* pAggSup;
  SExprSupp*     pExprSup;  // expr supporter of aggregate operator
303 304
} SAggOptrPushDownInfo;

305 306 307 308 309 310
typedef struct STableMetaCacheInfo {
  SLRUCache*             pTableMetaEntryCache; // 100 by default
  uint64_t               metaFetch;
  uint64_t               cacheHit;
} STableMetaCacheInfo;

H
Haojun Liao 已提交
311
typedef struct STableScanBase {
312
  STsdbReader*           dataReader;
313
  SFileBlockLoadRecorder readRecorder;
H
Haojun Liao 已提交
314 315
  SQueryTableDataCond    cond;
  SAggOptrPushDownInfo   pdInfo;
316 317 318
  SColMatchInfo          matchInfo;
  SReadHandle            readHandle;
  SExprSupp              pseudoSup;
H
Haojun Liao 已提交
319
  STableMetaCacheInfo    metaCache;
320 321
  int32_t                scanFlag;  // table scan flag to denote if it is a repeat/reverse/main scan
  int32_t                dataBlockLoadFlag;
H
Haojun Liao 已提交
322
  SLimitInfo             limitInfo;
H
Haojun Liao 已提交
323 324 325 326
} STableScanBase;

typedef struct STableScanInfo {
  STableScanBase         base;
L
Liu Jicong 已提交
327 328
  SScanInfo              scanInfo;
  int32_t                scanTimes;
329 330 331 332 333 334
  SSDataBlock*           pResBlock;
  SSampleExecInfo        sample;  // sample execution info
  int32_t                currentGroupId;
  int32_t                currentTable;
  int8_t                 scanMode;
  int8_t                 assignBlockUid;
335 336
} STableScanInfo;

337
typedef struct STableMergeScanInfo {
dengyihao's avatar
opt mem  
dengyihao 已提交
338 339 340 341
  int32_t                tableStartIndex;
  int32_t                tableEndIndex;
  bool                   hasGroupId;
  uint64_t               groupId;
L
Liu Jicong 已提交
342
  SArray*                queryConds;  // array of queryTableDataCond
H
Haojun Liao 已提交
343
  STableScanBase         base;
dengyihao's avatar
opt mem  
dengyihao 已提交
344 345 346 347 348 349 350 351
  int32_t                bufPageSize;
  uint32_t               sortBufSize;  // max buffer size for in-memory sort
  SArray*                pSortInfo;
  SSortHandle*           pSortHandle;
  SSDataBlock*           pSortInputBlock;
  int64_t                startTs;  // sort start time
  SArray*                sortSourceParams;
  SLimitInfo             limitInfo;
352 353 354 355
  int64_t                numOfRows;
  SScanInfo              scanInfo;
  int32_t                scanTimes;
  SSDataBlock*           pResBlock;
H
Haojun Liao 已提交
356 357
  SSampleExecInfo        sample;  // sample execution info
  SSortExecInfo          sortExecInfo;
358 359
} STableMergeScanInfo;

360
typedef struct STagScanInfo {
L
Liu Jicong 已提交
361 362
  SColumnInfo*    pCols;
  SSDataBlock*    pRes;
H
Haojun Liao 已提交
363
  SColMatchInfo   matchInfo;
L
Liu Jicong 已提交
364 365
  int32_t         curPos;
  SReadHandle     readHandle;
366 367
} STagScanInfo;

5
54liuyao 已提交
368 369 370 371
typedef enum EStreamScanMode {
  STREAM_SCAN_FROM_READERHANDLE = 1,
  STREAM_SCAN_FROM_RES,
  STREAM_SCAN_FROM_UPDATERES,
372
  STREAM_SCAN_FROM_DELETE_DATA,
5
54liuyao 已提交
373
  STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
L
Liu Jicong 已提交
374
  STREAM_SCAN_FROM_DATAREADER_RANGE,
5
54liuyao 已提交
375 376
} EStreamScanMode;

377 378 379 380 381
enum {
  PROJECT_RETRIEVE_CONTINUE = 0x1,
  PROJECT_RETRIEVE_DONE = 0x2,
};

5
54liuyao 已提交
382
typedef struct SStreamAggSupporter {
5
54liuyao 已提交
383 384 385
  int32_t         resultRowSize;  // the result buffer size for each result row, with the meta data size for each row
  SSDataBlock*    pScanBlock;
  SStreamState*   pState;
386 387
  int64_t         gap;        // stream session window gap
  SqlFunctionCtx* pDummyCtx;  // for combine
5
54liuyao 已提交
388 389 390 391
  SSHashObj*      pResultRows;
  int32_t         stateKeySize;
  int16_t         stateKeyType;
  SDiskbasedBuf*  pResultBuf;
5
54liuyao 已提交
392 393
} SStreamAggSupporter;

394
typedef struct SWindowSupporter {
5
54liuyao 已提交
395
  SStreamAggSupporter* pStreamAggSup;
L
Liu Jicong 已提交
396
  int64_t              gap;
397
  uint16_t             parentType;
398
  SAggSupporter*       pIntervalAggSup;
399
} SWindowSupporter;
400

401
typedef struct SPartitionBySupporter {
L
Liu Jicong 已提交
402 403 404 405
  SArray* pGroupCols;     // group by columns, SArray<SColumn>
  SArray* pGroupColVals;  // current group column values, SArray<SGroupKeys>
  char*   keyBuf;         // group by keys for hash
  bool    needCalc;       // partition by column
406 407 408
} SPartitionBySupporter;

typedef struct SPartitionDataInfo {
L
Liu Jicong 已提交
409
  uint64_t groupId;
410 411
  char*    tbname;
  SArray*  tags;
L
Liu Jicong 已提交
412
  SArray*  rowIds;
413
} SPartitionDataInfo;
414

5
54liuyao 已提交
415
typedef struct STimeWindowAggSupp {
L
Liu Jicong 已提交
416
  int8_t          calTrigger;
L
Liu Jicong 已提交
417
  int8_t          calTriggerSaved;
5
54liuyao 已提交
418
  int64_t         deleteMark;
L
Liu Jicong 已提交
419 420
  int64_t         deleteMarkSaved;
  int64_t         waterMark;
L
Liu Jicong 已提交
421
  TSKEY           maxTs;
5
54liuyao 已提交
422
  TSKEY           minTs;
L
Liu Jicong 已提交
423
  SColumnInfoData timeWindowData;  // query time window info for scalar function execution.
5
54liuyao 已提交
424 425
} STimeWindowAggSupp;

426
typedef struct SStreamScanInfo {
L
Liu Jicong 已提交
427 428 429 430 431 432 433 434 435
  uint64_t      tableUid;  // queried super table uid
  SExprInfo*    pPseudoExpr;
  int32_t       numOfPseudoExpr;
  SExprSupp     tbnameCalSup;
  SExprSupp     tagCalSup;
  int32_t       primaryTsIndex;  // primary time stamp slot id
  SReadHandle   readHandle;
  SInterval     interval;  // if the upstream is an interval operator, the interval info is also kept here.
  SColMatchInfo matchInfo;
L
Liu Jicong 已提交
436 437 438 439 440 441 442 443 444 445 446 447 448

  SArray*      pBlockLists;  // multiple SSDatablock.
  SSDataBlock* pRes;         // result SSDataBlock
  SSDataBlock* pUpdateRes;   // update SSDataBlock
  int32_t      updateResIndex;
  int32_t      blockType;        // current block type
  int32_t      validBlockIndex;  // Is current data has returned?
  uint64_t     numOfExec;        // execution times
  STqReader*   tqReader;

  uint64_t     groupId;
  SUpdateInfo* pUpdateInfo;

L
Liu Jicong 已提交
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
  EStreamScanMode       scanMode;
  SOperatorInfo*        pStreamScanOp;
  SOperatorInfo*        pTableScanOp;
  SArray*               childIds;
  SWindowSupporter      windowSup;
  SPartitionBySupporter partitionSup;
  SExprSupp*            pPartScalarSup;
  bool                  assignBlockUid;  // assign block uid to groupId, temporarily used for generating rollup SMA.
  int32_t               scanWinIndex;    // for state operator
  int32_t               pullDataResIndex;
  SSDataBlock*          pPullDataRes;    // pull data SSDataBlock
  SSDataBlock*          pDeleteDataRes;  // delete data SSDataBlock
  int32_t               deleteDataIndex;
  STimeWindow           updateWin;
  STimeWindowAggSupp    twAggSup;
  SSDataBlock*          pUpdateDataRes;
L
Liu Jicong 已提交
465
  // status for tmq
L
Liu Jicong 已提交
466 467 468
  SNodeList* pGroupTags;
  SNode*     pTagCond;
  SNode*     pTagIndexCond;
469
} SStreamScanInfo;
H
Haojun Liao 已提交
470

L
Liu Jicong 已提交
471 472 473 474 475 476 477 478 479 480 481 482
typedef struct {
  //  int8_t    subType;
  //  bool      withMeta;
  //  int64_t   suid;
  //  int64_t   snapVersion;
  //  void     *metaInfo;
  //  void     *dataInfo;
  SVnode*       vnode;
  SSDataBlock   pRes;  // result SSDataBlock
  STsdbReader*  dataReader;
  SSnapContext* sContext;
} SStreamRawScanInfo;
483

484
typedef struct SOptrBasicInfo {
L
Liu Jicong 已提交
485 486 487
  SResultRowInfo resultRowInfo;
  SSDataBlock*   pRes;
  bool           mergeResultBlock;
488 489
} SOptrBasicInfo;

490
typedef struct SIntervalAggOperatorInfo {
491
  SOptrBasicInfo     binfo;              // basic info
wmmhello's avatar
wmmhello 已提交
492
  SAggSupporter      aggSup;             // aggregate supporter
493
  SExprSupp          scalarSupp;         // supporter for perform scalar function
494 495 496 497 498
  SGroupResInfo      groupResInfo;       // multiple results build supporter
  SInterval          interval;           // interval info
  int32_t            primaryTsIndex;     // primary time stamp slot id from result of downstream operator.
  STimeWindow        win;                // query time range
  bool               timeWindowInterpo;  // interpolation needed or not
499
  SArray*            pInterpCols;        // interpolation columns
500
  int32_t            resultTsOrder;      // result timestamp order
501
  int32_t            inputOrder;         // input data ts order
502 503
  EOPTR_EXEC_MODEL   execModel;          // operator execution model [batch model|stream model]
  STimeWindowAggSupp twAggSup;
L
Liu Jicong 已提交
504
  SArray*            pPrevValues;  //  SArray<SGroupKeys> used to keep the previous not null value for interpolation.
505
} SIntervalAggOperatorInfo;
506

507
typedef struct SMergeAlignedIntervalAggOperatorInfo {
L
Liu Jicong 已提交
508
  SIntervalAggOperatorInfo* intervalAggOperatorInfo;
509

510 511
  uint64_t     groupId;  // current groupId
  int64_t      curTs;    // current ts
512
  SSDataBlock* prefetchedBlock;
513
  SResultRow*  pResultRow;
514 515
} SMergeAlignedIntervalAggOperatorInfo;

516
typedef struct SStreamIntervalOperatorInfo {
517 518 519 520 521 522
  SOptrBasicInfo     binfo;           // basic info
  SAggSupporter      aggSup;          // aggregate supporter
  SExprSupp          scalarSupp;      // supporter for perform scalar function
  SGroupResInfo      groupResInfo;    // multiple results build supporter
  SInterval          interval;        // interval info
  int32_t            primaryTsIndex;  // primary time stamp slot id from result of downstream operator.
523 524 525
  STimeWindowAggSupp twAggSup;
  bool               invertible;
  bool               ignoreExpiredData;
526
  SArray*            pDelWins;  // SWinRes
527 528
  int32_t            delIndex;
  SSDataBlock*       pDelRes;
529
  SPhysiNode*        pPhyNode;  // create new child
5
54liuyao 已提交
530
  SHashObj*          pPullDataMap;
531
  SArray*            pPullWins;  // SPullWindowInfo
5
54liuyao 已提交
532 533
  int32_t            pullIndex;
  SSDataBlock*       pPullDataRes;
534 535 536 537 538
  bool               isFinal;
  SArray*            pChildren;
  SStreamState*      pState;
  SWinKey            delKey;
} SStreamIntervalOperatorInfo;
5
54liuyao 已提交
539

540
typedef struct SAggOperatorInfo {
H
Haojun Liao 已提交
541 542
  SOptrBasicInfo   binfo;
  SAggSupporter    aggSup;
L
Liu Jicong 已提交
543 544 545 546
  STableQueryInfo* current;
  uint64_t         groupId;
  SGroupResInfo    groupResInfo;
  SExprSupp        scalarExprSup;
547 548 549
} SAggOperatorInfo;

typedef struct SFillOperatorInfo {
550 551
  struct SFillInfo* pFillInfo;
  SSDataBlock*      pRes;
H
Haojun Liao 已提交
552
  SSDataBlock*      pFinalRes;
553 554 555
  int64_t           totalInputRows;
  void**            p;
  SSDataBlock*      existNewGroupBlock;
556
  STimeWindow       win;
H
Haojun Liao 已提交
557
  SColMatchInfo     matchInfo;
H
Haojun Liao 已提交
558
  int32_t           primaryTsCol;
H
Haojun Liao 已提交
559
  int32_t           primarySrcSlotId;
L
Liu Jicong 已提交
560
  uint64_t          curGroupId;  // current handled group id
561 562
  SExprInfo*        pExprInfo;
  int32_t           numOfExpr;
563
  SExprSupp         noFillExprSupp;
564 565
} SFillOperatorInfo;

H
Haojun Liao 已提交
566
typedef struct SDataGroupInfo {
L
Liu Jicong 已提交
567 568 569
  uint64_t groupId;
  int64_t  numOfRows;
  SArray*  pPageList;
H
Haojun Liao 已提交
570 571
} SDataGroupInfo;

572
typedef struct SWindowRowsSup {
dengyihao's avatar
dengyihao 已提交
573 574 575 576
  STimeWindow win;
  TSKEY       prevTs;
  int32_t     startRowIndex;
  int32_t     numOfRows;
577
  uint64_t    groupId;
578 579
} SWindowRowsSup;

H
Haojun Liao 已提交
580
typedef struct SSessionAggOperatorInfo {
L
Liu Jicong 已提交
581 582
  SOptrBasicInfo binfo;
  SAggSupporter  aggSup;
wmmhello's avatar
wmmhello 已提交
583

584 585
  SGroupResInfo      groupResInfo;
  SWindowRowsSup     winSup;
L
Liu Jicong 已提交
586 587 588
  bool               reptScan;  // next round scan
  int64_t            gap;       // session window gap
  int32_t            tsSlotId;  // primary timestamp slot id
589
  STimeWindowAggSupp twAggSup;
H
Haojun Liao 已提交
590
} SSessionAggOperatorInfo;
591

5
54liuyao 已提交
592
typedef struct SResultWindowInfo {
593 594 595
  void*       pOutputBuf;
  SSessionKey sessionWin;
  bool        isOutput;
5
54liuyao 已提交
596 597
} SResultWindowInfo;

5
54liuyao 已提交
598 599
typedef struct SStateWindowInfo {
  SResultWindowInfo winInfo;
5
54liuyao 已提交
600
  SStateKeys*       pStateKey;
5
54liuyao 已提交
601 602
} SStateWindowInfo;

5
54liuyao 已提交
603
typedef struct SStreamSessionAggOperatorInfo {
L
Liu Jicong 已提交
604 605
  SOptrBasicInfo      binfo;
  SStreamAggSupporter streamAggSup;
606
  SExprSupp           scalarSupp;  // supporter for perform scalar function
L
Liu Jicong 已提交
607 608 609 610 611
  SGroupResInfo       groupResInfo;
  int32_t             primaryTsIndex;  // primary timestamp slot id
  int32_t             endTsIndex;      // window end timestamp slot id
  int32_t             order;           // current SSDataBlock scan order
  STimeWindowAggSupp  twAggSup;
612 613 614
  SSDataBlock*        pWinBlock;   // window result
  SSDataBlock*        pDelRes;     // delete result
  SSDataBlock*        pUpdateRes;  // update window
L
Liu Jicong 已提交
615
  bool                returnUpdate;
5
54liuyao 已提交
616
  SSHashObj*          pStDeleted;
L
Liu Jicong 已提交
617
  void*               pDelIterator;
618 619
  SArray*             pChildren;  // cache for children's result; final stream operator
  SPhysiNode*         pPhyNode;   // create new child
L
Liu Jicong 已提交
620 621
  bool                isFinal;
  bool                ignoreExpiredData;
5
54liuyao 已提交
622 623
} SStreamSessionAggOperatorInfo;

5
54liuyao 已提交
624 625 626
typedef struct SStreamStateAggOperatorInfo {
  SOptrBasicInfo      binfo;
  SStreamAggSupporter streamAggSup;
627
  SExprSupp           scalarSupp;  // supporter for perform scalar function
5
54liuyao 已提交
628 629 630 631 632 633 634
  SGroupResInfo       groupResInfo;
  int32_t             primaryTsIndex;  // primary timestamp slot id
  STimeWindowAggSupp  twAggSup;
  SColumn             stateCol;
  SSDataBlock*        pDelRes;
  SSHashObj*          pSeDeleted;
  void*               pDelIterator;
635
  SArray*             pChildren;  // cache for children's result;
5
54liuyao 已提交
636 637 638
  bool                ignoreExpiredData;
} SStreamStateAggOperatorInfo;

639 640 641 642
typedef struct SStreamPartitionOperatorInfo {
  SOptrBasicInfo        binfo;
  SPartitionBySupporter partitionSup;
  SExprSupp             scalarSup;
643
  SExprSupp             tbnameCalSup;
L
Liu Jicong 已提交
644
  SExprSupp             tagCalSup;
645 646 647 648
  SHashObj*             pPartitions;
  void*                 parIte;
  SSDataBlock*          pInputDataBlock;
  int32_t               tsColIndex;
649
  SSDataBlock*          pDelRes;
650 651
} SStreamPartitionOperatorInfo;

5
54liuyao 已提交
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668
typedef struct SStreamFillSupporter {
  int32_t        type;  // fill type
  SInterval      interval;
  SResultRowData prev;
  SResultRowData cur;
  SResultRowData next;
  SResultRowData nextNext;
  SFillColInfo*  pAllColInfo;  // fill exprs and not fill exprs
  SExprSupp      notFillExprSup;
  int32_t        numOfAllCols;  // number of all exprs, including the tags columns
  int32_t        numOfFillCols;
  int32_t        numOfNotFillCols;
  int32_t        rowSize;
  SSHashObj*     pResMap;
  bool           hasDelete;
} SStreamFillSupporter;

5
54liuyao 已提交
669 670
typedef struct SStreamFillOperatorInfo {
  SStreamFillSupporter* pFillSup;
671 672 673 674 675 676 677
  SSDataBlock*          pRes;
  SSDataBlock*          pSrcBlock;
  int32_t               srcRowIndex;
  SSDataBlock*          pPrevSrcBlock;
  SSDataBlock*          pSrcDelBlock;
  int32_t               srcDelRowIndex;
  SSDataBlock*          pDelRes;
L
Liu Jicong 已提交
678
  SColMatchInfo         matchInfo;
679 680 681
  int32_t               primaryTsCol;
  int32_t               primarySrcSlotId;
  SStreamFillInfo*      pFillInfo;
5
54liuyao 已提交
682 683
} SStreamFillOperatorInfo;

684
typedef struct STimeSliceOperatorInfo {
L
Liu Jicong 已提交
685 686 687 688 689 690 691 692 693 694 695 696 697
  SSDataBlock*         pRes;
  STimeWindow          win;
  SInterval            interval;
  int64_t              current;
  SArray*              pPrevRow;     // SArray<SGroupValue>
  SArray*              pNextRow;     // SArray<SGroupValue>
  SArray*              pLinearInfo;  // SArray<SFillLinearInfo>
  bool                 isPrevRowSet;
  bool                 isNextRowSet;
  int32_t              fillType;      // fill type
  SColumn              tsCol;         // primary timestamp column
  SExprSupp            scalarSup;     // scalar calculation
  struct SFillColInfo* pFillColInfo;  // fill column info
698 699
} STimeSliceOperatorInfo;

700
typedef struct SStateWindowOperatorInfo {
wmmhello's avatar
wmmhello 已提交
701
  // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
L
Liu Jicong 已提交
702 703
  SOptrBasicInfo binfo;
  SAggSupporter  aggSup;
704
  SExprSupp      scalarSup;
wmmhello's avatar
wmmhello 已提交
705

706 707
  SGroupResInfo      groupResInfo;
  SWindowRowsSup     winSup;
708
  SColumn            stateCol;  // start row index
709 710
  bool               hasKey;
  SStateKeys         stateKey;
711
  int32_t            tsSlotId;  // primary timestamp column slot id
712
  STimeWindowAggSupp twAggSup;
713 714
} SStateWindowOperatorInfo;

715 716 717
#define OPTR_IS_OPENED(_optr)  (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)

H
Haojun Liao 已提交
718 719
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
                                   __optr_close_fn_t closeFn, __optr_explain_fn_t explain);
720

H
Haojun Liao 已提交
721 722
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
723 724 725

void    initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void    cleanupBasicInfo(SOptrBasicInfo* pInfo);
726
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
727
void    cleanupExprSupp(SExprSupp* pSup);
728
void    destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
L
Liu Jicong 已提交
729
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
730
                    const char* pkey);
L
Liu Jicong 已提交
731
void    initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
732 733 734

void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf);
735 736
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf);
737

738 739
bool    hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
void    initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
740
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator);
741

L
Liu Jicong 已提交
742 743
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
                      int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
744

H
Haojun Liao 已提交
745
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
L
Liu Jicong 已提交
746 747
void    updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
                             SOperatorInfo* pOperator);
748

749 750
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);

L
Liu Jicong 已提交
751
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag);
752
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
753

L
Liu Jicong 已提交
754
void doDestroyExchangeOperatorInfo(void* param);
H
Haojun Liao 已提交
755

H
Haojun Liao 已提交
756
void    setOperatorCompleted(SOperatorInfo* pOperator);
L
Liu Jicong 已提交
757 758
void    setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
                        void* pInfo, SExecTaskInfo* pTaskInfo);
H
Haojun Liao 已提交
759
void    doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
L
Liu Jicong 已提交
760 761
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);
762

L
Liu Jicong 已提交
763 764
void cleanupAggSup(SAggSupporter* pAggSup);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
H
Haojun Liao 已提交
765
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
766 767

SSDataBlock* loadNextDataBlock(void* param);
768

769
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
770

L
Liu Jicong 已提交
771 772 773
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup);
H
Haojun Liao 已提交
774

775 776
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);

L
Liu Jicong 已提交
777 778
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
                                           SExecTaskInfo* pTaskInfo);
dengyihao's avatar
dengyihao 已提交
779 780
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
781 782
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
                                              const char* pUser, SExecTaskInfo* pTaskInfo);
783

H
Haojun Liao 已提交
784
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
785

L
Liu Jicong 已提交
786 787 788 789
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
                                                 SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
                                         SExecTaskInfo* pTaskInfo);
790
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
791 792 793 794
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams,
                                               SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
                                           SExecTaskInfo* pTaskInfo);
795

H
Haojun Liao 已提交
796
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode,
L
Liu Jicong 已提交
797
                                          SExecTaskInfo* pTaskInfo, bool isStream);
798 799 800 801
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
                                               SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
                                                      SExecTaskInfo* pTaskInfo);
802 803
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                     SExecTaskInfo* pTaskInfo, int32_t numOfChild);
H
Haojun Liao 已提交
804 805
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
                                            SExecTaskInfo* pTaskInfo);
806
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo);
807 808
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
                                               SExecTaskInfo* pTaskInfo);
809

810
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
811
                                            SExecTaskInfo* pTaskInfo);
812

813 814
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);

L
Liu Jicong 已提交
815 816 817 818 819
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
                                      SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
                                             SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
820
                                           SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
821

822
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
L
Liu Jicong 已提交
823
                                                 SExecTaskInfo* pTaskInfo);
824

825
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
826 827
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
                                           SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
828

L
Liu Jicong 已提交
829 830 831 832
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                  SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                       SExecTaskInfo* pTaskInfo, int32_t numOfChild);
833 834
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                SExecTaskInfo* pTaskInfo);
5
54liuyao 已提交
835

L
Liu Jicong 已提交
836 837
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                SExecTaskInfo* pTaskInfo);
5
54liuyao 已提交
838 839
SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
                                            SExecTaskInfo* pTaskInfo);
5
54liuyao 已提交
840

841
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
L
Liu Jicong 已提交
842
                              int32_t numOfOutput, SArray* pPseudoList);
843

844
void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
X
Xiaoyu Wang 已提交
845

846
bool    isTaskKilled(SExecTaskInfo* pTaskInfo);
847 848
int32_t checkForQueryBuf(size_t numOfTables);

H
Haojun Liao 已提交
849 850
void    setTaskKilled(SExecTaskInfo* pTaskInfo);
void    queryCostStatis(SExecTaskInfo* pTaskInfo);
dengyihao's avatar
dengyihao 已提交
851
void    doDestroyTask(SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
852
void    destroyOperatorInfo(SOperatorInfo* pOperator);
853 854
int32_t getMaximumIdleDurationSec();

wmmhello's avatar
wmmhello 已提交
855 856 857 858
/*
 * ops:     root operator
 * data:    *data save the result of encode, need to be freed by caller
 * length:  *length save the length of *data
C
Cary Xu 已提交
859
 * nOptrWithVal: *nOptrWithVal save the number of optr with value
wmmhello's avatar
wmmhello 已提交
860 861
 * return:  result code, 0 means success
 */
L
Liu Jicong 已提交
862
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t* length, int32_t* nOptrWithVal);
wmmhello's avatar
wmmhello 已提交
863 864 865 866 867 868 869

/*
 * ops:    root operator, created by caller
 * data:   save the result of decode
 * length: the length of data
 * return: result code, 0 means success
 */
H
Haojun Liao 已提交
870
int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length);
wmmhello's avatar
wmmhello 已提交
871

872
void    setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
dengyihao's avatar
dengyihao 已提交
873
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
D
dapan1121 已提交
874
                               char* sql, EOPTR_EXEC_MODEL model);
L
Liu Jicong 已提交
875
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
H
Haojun Liao 已提交
876
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
dengyihao's avatar
dengyihao 已提交
877

878
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
879
                                int32_t order);
880
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
L
Liu Jicong 已提交
881
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
5
54liuyao 已提交
882
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
883
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
5
54liuyao 已提交
884
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
885 886 887 888 889 890 891 892 893
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName);
void printDataBlock(SSDataBlock* pBlock, const char* flag);
894
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
L
Liu Jicong 已提交
895
void     calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
896

897 898
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
899

S
shenglian zhou 已提交
900 901
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
                                           SExecTaskInfo* pTaskInfo);
H
Haojun Liao 已提交
902 903
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
                                                SExecTaskInfo* pTaskInfo);
904

905
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
906

L
Liu Jicong 已提交
907 908
bool    groupbyTbname(SNodeList* pGroupList);
void*   destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
909
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
910
                                   SGroupResInfo* pGroupResInfo);
5
54liuyao 已提交
911
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
912
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
913
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
914 915
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
                     SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
916 917
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
5
54liuyao 已提交
918
void    getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
L
Liu Jicong 已提交
919
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
920

921 922 923 924
#ifdef __cplusplus
}
#endif

925
#endif  // TDENGINE_EXECUTORIMPL_H