executorimpl.h 36.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef TDENGINE_EXECUTORIMPL_H
#define TDENGINE_EXECUTORIMPL_H

18 19 20 21
#ifdef __cplusplus
extern "C" {
#endif

22
#include "os.h"
S
common  
Shengliang Guan 已提交
23
#include "tcommon.h"
24
#include "tlosertree.h"
dengyihao's avatar
dengyihao 已提交
25
#include "tsort.h"
26 27 28
#include "ttszip.h"
#include "tvariant.h"

H
Haojun Liao 已提交
29
#include "dataSinkMgt.h"
30
#include "executil.h"
H
Haojun Liao 已提交
31
#include "executor.h"
H
Haojun Liao 已提交
32
#include "planner.h"
D
dapan1121 已提交
33
#include "scalar.h"
34 35
#include "taosdef.h"
#include "tarray.h"
5
54liuyao 已提交
36
#include "tfill.h"
H
Haojun Liao 已提交
37
#include "thash.h"
38
#include "tlockfree.h"
D
dapan1121 已提交
39
#include "tmsg.h"
dengyihao's avatar
dengyihao 已提交
40
#include "tpagedbuf.h"
L
Liu Jicong 已提交
41
#include "tstream.h"
L
Liu Jicong 已提交
42
#include "tstreamUpdate.h"
H
Haojun Liao 已提交
43

44
#include "executorInt.h"
L
Liu Jicong 已提交
45
#include "vnode.h"
H
Hongze Cheng 已提交
46

47 48
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);

5
54liuyao 已提交
49 50 51 52 53
#define IS_VALID_SESSION_WIN(winInfo)        ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo)     ((winInfo).sessionWin.win.skey = INT64_MIN)
#define IS_INVALID_SESSION_WIN_KEY(winKey)   ((winKey).win.skey <= 0)
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)

54
enum {
55 56
  // when this task starts to execute, this status will set
  TASK_NOT_COMPLETED = 0x1u,
57

58
  /* Task is over
59 60 61
   * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
   * 2. when all data within queried time window, it is also denoted as query_completed
   */
62
  TASK_COMPLETED = 0x2u,
63 64 65 66 67 68
};

/**
 * If the number of generated results is greater than this value,
 * query query will be halt and return results to client immediate.
 */
dengyihao's avatar
dengyihao 已提交
69 70 71 72 73
typedef struct SResultInfo {  // TODO refactor
  int64_t totalRows;          // total generated result size in rows
  int64_t totalBytes;         // total results in bytes.
  int32_t capacity;           // capacity of current result output buffer
  int32_t threshold;          // result size threshold in rows.
H
Haojun Liao 已提交
74
} SResultInfo;
75 76

typedef struct STableQueryInfo {
L
Liu Jicong 已提交
77 78
  TSKEY              lastKey;  // last check ts, todo remove it later
  SResultRowPosition pos;      // current active time window
79 80
} STableQueryInfo;

H
Haojun Liao 已提交
81 82 83 84 85
typedef struct SLimit {
  int64_t limit;
  int64_t offset;
} SLimit;

86
typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
87

H
Haojun Liao 已提交
88
typedef struct STaskCostInfo {
89 90 91 92 93
  int64_t                 created;
  int64_t                 start;
  uint64_t                elapsedTime;
  double                  extractListTime;
  double                  groupIdMapTime;
94
  SFileBlockLoadRecorder* pRecoder;
H
Haojun Liao 已提交
95
} STaskCostInfo;
96

H
Haojun Liao 已提交
97
typedef struct SOperatorCostInfo {
L
Liu Jicong 已提交
98 99
  double openCost;
  double totalCost;
H
Haojun Liao 已提交
100 101
} SOperatorCostInfo;

102 103
struct SOperatorInfo;

wmmhello's avatar
wmmhello 已提交
104
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
105
typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result);
106

107
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
108
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
109
typedef void (*__optr_close_fn_t)(void* param);
110
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
111
typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr);
H
Haojun Liao 已提交
112

H
Haojun Liao 已提交
113
typedef struct STaskIdInfo {
114 115 116 117
  uint64_t queryId;  // this is also a request id
  uint64_t subplanId;
  uint64_t templateId;
  char*    str;
D
dapan1121 已提交
118
  int32_t  vgId;
H
Haojun Liao 已提交
119 120
} STaskIdInfo;

L
Liu Jicong 已提交
121 122
enum {
  STREAM_RECOVER_STEP__NONE = 0,
123 124
  STREAM_RECOVER_STEP__PREPARE1,
  STREAM_RECOVER_STEP__PREPARE2,
5
54liuyao 已提交
125 126
  STREAM_RECOVER_STEP__SCAN1,
  STREAM_RECOVER_STEP__SCAN2,
L
Liu Jicong 已提交
127 128
};

L
Liu Jicong 已提交
129
typedef struct {
L
Liu Jicong 已提交
130
  // TODO remove prepareStatus
L
Liu Jicong 已提交
131 132 133 134 135 136
  STqOffsetVal prepareStatus;  // for tmq
  STqOffsetVal lastStatus;     // for tmq
  SMqMetaRsp   metaRsp;        // for tmq fetching meta
  int8_t       returned;
  int64_t      snapshotVer;
  // const SSubmitReq* pReq;
L
Liu Jicong 已提交
137

138
  SPackedData         submit;
L
Liu Jicong 已提交
139 140 141
  SSchemaWrapper*     schema;
  char                tbName[TSDB_TABLE_NAME_LEN];
  int8_t              recoverStep;
L
Liu Jicong 已提交
142
  int8_t              recoverScanFinished;
L
Liu Jicong 已提交
143
  SQueryTableDataCond tableCond;
L
Liu Jicong 已提交
144 145
  int64_t             fillHistoryVer1;
  int64_t             fillHistoryVer2;
L
liuyao 已提交
146
  int64_t             dataVersion;
L
liuyao 已提交
147 148
  SStreamState*       pState;
  int64_t             checkPointId;
L
Liu Jicong 已提交
149 150
} SStreamTaskInfo;

151 152 153 154 155
typedef struct {
  char*           tablename;
  char*           dbname;
  int32_t         tversion;
  SSchemaWrapper* sw;
156
  SSchemaWrapper* qsw;
157 158
} SSchemaInfo;

H
Haojun Liao 已提交
159
typedef struct SExchangeOpStopInfo {
L
Liu Jicong 已提交
160 161
  int32_t operatorType;
  int64_t refId;
D
dapan1121 已提交
162 163
} SExchangeOpStopInfo;

H
Haojun Liao 已提交
164
typedef struct STaskStopInfo {
D
dapan1121 已提交
165 166 167 168
  SRWLatch lock;
  SArray*  pStopInfo;
} STaskStopInfo;

H
Haojun Liao 已提交
169
struct SExecTaskInfo {
170 171 172 173
  STaskIdInfo           id;
  uint32_t              status;
  STimeWindow           window;
  STaskCostInfo         cost;
X
Xiaoyu Wang 已提交
174
  int64_t               owner;  // if it is in execution
175 176 177
  int32_t               code;
  int32_t               qbufQuota;  // total available buffer (in KB) during execution query
  int64_t               version;    // used for stream to record wal version, why not move to sschemainfo
L
Liu Jicong 已提交
178 179
  SStreamTaskInfo       streamInfo;
  SSchemaInfo           schemaInfo;
X
Xiaoyu Wang 已提交
180 181 182
  const char*           sql;        // query sql string
  jmp_buf               env;        // jump to this position when error happens.
  EOPTR_EXEC_MODEL      execModel;  // operator execution model [batch model|stream model]
L
Liu Jicong 已提交
183
  SSubplan*             pSubplan;
184
  struct SOperatorInfo* pRoot;
185
  SLocalFetch           localFetch;
L
Liu Jicong 已提交
186
  SArray*               pResultBlockList;  // result block list
D
dapan1121 已提交
187
  STaskStopInfo         stopInfo;
X
Xiaoyu Wang 已提交
188
  SRWLatch              lock;  // secure the access of STableListInfo
H
Haojun Liao 已提交
189
};
H
Haojun Liao 已提交
190

191
enum {
L
Liu Jicong 已提交
192 193
  OP_NOT_OPENED = 0x0,
  OP_OPENED = 0x1,
H
Haojun Liao 已提交
194
  OP_RES_TO_RETURN = 0x5,
L
Liu Jicong 已提交
195
  OP_EXEC_DONE = 0x9,
L
Liu Jicong 已提交
196
  OP_EXEC_RECV = 0x11,
197 198
};

199
typedef struct SOperatorFpSet {
L
Liu Jicong 已提交
200 201
  __optr_open_fn_t    _openFn;  // DO NOT invoke this function directly
  __optr_fn_t         getNextFn;
202
  __optr_fn_t         cleanupFn;  // call this function to release the allocated resources ASAP
L
Liu Jicong 已提交
203
  __optr_close_fn_t   closeFn;
L
Liu Jicong 已提交
204
  __optr_reqBuf_fn_t  reqBufFn;  // total used buffer for blocking operator
L
Liu Jicong 已提交
205 206 207
  __optr_encode_fn_t  encodeResultRow;
  __optr_decode_fn_t  decodeResultRow;
  __optr_explain_fn_t getExplainFn;
208 209
} SOperatorFpSet;

210 211
typedef struct SExprSupp {
  SExprInfo*      pExprInfo;
L
Liu Jicong 已提交
212
  int32_t         numOfExprs;  // the number of scalar expression in group operator
213 214
  SqlFunctionCtx* pCtx;
  int32_t*        rowEntryInfoOffset;  // offset value for each row result cell info
H
Haojun Liao 已提交
215
  SFilterInfo*    pFilterInfo;
216 217
} SExprSupp;

218
typedef struct SOperatorInfo {
219
  uint16_t               operatorType;
220
  int16_t                resultDataBlockId;
L
Liu Jicong 已提交
221 222 223 224 225 226 227 228 229 230 231
  bool                   blocking;  // block operator or not
  uint8_t                status;    // denote if current operator is completed
  char*                  name;      // name, for debug purpose
  void*                  info;      // extension attribution
  SExprSupp              exprSupp;
  SExecTaskInfo*         pTaskInfo;
  SOperatorCostInfo      cost;
  SResultInfo            resultInfo;
  struct SOperatorInfo** pDownstream;      // downstram pointer list
  int32_t                numOfDownstream;  // number of downstream. The value is always ONE expect for join operator
  SOperatorFpSet         fpSet;
232 233
} SOperatorInfo;

234 235
typedef enum {
  EX_SOURCE_DATA_NOT_READY = 0x1,
L
Liu Jicong 已提交
236
  EX_SOURCE_DATA_READY = 0x2,
237 238
  EX_SOURCE_DATA_EXHAUSTED = 0x3,
} EX_SOURCE_STATUS;
239

240 241 242
#define COL_MATCH_FROM_COL_ID  0x1
#define COL_MATCH_FROM_SLOT_ID 0x2

H
Haojun Liao 已提交
243
typedef struct SLoadRemoteDataInfo {
dengyihao's avatar
dengyihao 已提交
244 245 246
  uint64_t totalSize;     // total load bytes from remote
  uint64_t totalRows;     // total number of rows
  uint64_t totalElapsed;  // total elapsed time
H
Haojun Liao 已提交
247 248
} SLoadRemoteDataInfo;

249
typedef struct SLimitInfo {
L
Liu Jicong 已提交
250 251 252 253 254 255 256
  SLimit   limit;
  SLimit   slimit;
  uint64_t currentGroupId;
  int64_t  remainGroupOffset;
  int64_t  numOfOutputGroups;
  int64_t  remainOffset;
  int64_t  numOfOutputRows;
257 258
} SLimitInfo;

259
typedef struct SExchangeInfo {
L
Liu Jicong 已提交
260 261 262 263
  SArray* pSources;
  SArray* pSourceDataInfo;
  tsem_t  ready;
  void*   pTransporter;
H
Haojun Liao 已提交
264

265 266
  // SArray<SSDataBlock*>, result block list, used to keep the multi-block that
  // passed by downstream operator
L
Liu Jicong 已提交
267 268 269 270 271
  SArray*      pResultBlockList;
  SArray*      pRecycledBlocks;  // build a pool for small data block to avoid to repeatly create and then destroy.
  SSDataBlock* pDummyBlock;      // dummy block, not keep data
  bool         seqLoadData;      // sequential load data or not, false by default
  int32_t      current;
H
Haojun Liao 已提交
272
  SLoadRemoteDataInfo loadInfo;
273
  uint64_t            self;
274
  SLimitInfo          limitInfo;
L
Liu Jicong 已提交
275
  int64_t             openedTs;  // start exec time stamp, todo: move to SLoadRemoteDataInfo
276 277
} SExchangeInfo;

278
typedef struct SScanInfo {
L
Liu Jicong 已提交
279 280
  int32_t numOfAsc;
  int32_t numOfDesc;
281 282
} SScanInfo;

283
typedef struct SSampleExecInfo {
L
Liu Jicong 已提交
284 285
  double   sampleRatio;  // data block sample ratio, 1 by default
  uint32_t seed;         // random seed value
286 287
} SSampleExecInfo;

L
Liu Jicong 已提交
288 289
enum {
  TABLE_SCAN__TABLE_ORDER = 1,
290
  TABLE_SCAN__BLOCK_ORDER = 2,
L
Liu Jicong 已提交
291 292
};

293
typedef struct SAggSupporter {
H
Haojun Liao 已提交
294
  SSHashObj*     pResultRowHashTable;  // quick locate the window object for each result
295 296
  char*          keyBuf;               // window key buffer
  SDiskbasedBuf* pResultBuf;           // query result buffer based on blocked-wised disk file
L
Liu Jicong 已提交
297 298
  int32_t        resultRowSize;  // the result buffer size for each result row, with the meta data size for each row
  int32_t        currentPageId;  // current write page id
299 300 301
} SAggSupporter;

typedef struct {
L
Liu Jicong 已提交
302 303 304 305 306
  // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
  // current data block needs to be loaded.
  SInterval      interval;
  SAggSupporter* pAggSup;
  SExprSupp*     pExprSup;  // expr supporter of aggregate operator
307 308
} SAggOptrPushDownInfo;

309
typedef struct STableMetaCacheInfo {
L
Liu Jicong 已提交
310 311 312
  SLRUCache* pTableMetaEntryCache;  // 100 by default
  uint64_t   metaFetch;
  uint64_t   cacheHit;
313 314
} STableMetaCacheInfo;

H
Haojun Liao 已提交
315
typedef struct STableScanBase {
316
  STsdbReader*           dataReader;
H
Haojun Liao 已提交
317 318 319 320
  SFileBlockLoadRecorder readRecorder;
  SQueryTableDataCond    cond;
  SAggOptrPushDownInfo   pdInfo;
  SColMatchInfo          matchInfo;
321
  SReadHandle            readHandle;
H
Haojun Liao 已提交
322 323 324
  SExprSupp              pseudoSup;
  STableMetaCacheInfo    metaCache;
  int32_t                scanFlag;  // table scan flag to denote if it is a repeat/reverse/main scan
H
Haojun Liao 已提交
325 326
  int32_t                dataBlockLoadFlag;
  SLimitInfo             limitInfo;
327
  // there are more than one table list exists in one task, if only one vnode exists.
X
Xiaoyu Wang 已提交
328
  STableListInfo* pTableListInfo;
H
Haojun Liao 已提交
329 330 331
} STableScanBase;

typedef struct STableScanInfo {
L
Liu Jicong 已提交
332 333 334 335 336 337 338 339 340 341
  STableScanBase  base;
  SScanInfo       scanInfo;
  int32_t         scanTimes;
  SSDataBlock*    pResBlock;
  SSampleExecInfo sample;  // sample execution info
  int32_t         currentGroupId;
  int32_t         currentTable;
  int8_t          scanMode;
  int8_t          assignBlockUid;
  bool            hasGroupByTag;
D
dapan1121 已提交
342
  bool            countOnly;
343 344
} STableScanInfo;

345
typedef struct STableMergeScanInfo {
L
Liu Jicong 已提交
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
  int32_t         tableStartIndex;
  int32_t         tableEndIndex;
  bool            hasGroupId;
  uint64_t        groupId;
  SArray*         queryConds;  // array of queryTableDataCond
  STableScanBase  base;
  int32_t         bufPageSize;
  uint32_t        sortBufSize;  // max buffer size for in-memory sort
  SArray*         pSortInfo;
  SSortHandle*    pSortHandle;
  SSDataBlock*    pSortInputBlock;
  int64_t         startTs;  // sort start time
  SArray*         sortSourceParams;
  SLimitInfo      limitInfo;
  int64_t         numOfRows;
  SScanInfo       scanInfo;
  int32_t         scanTimes;
  SSDataBlock*    pResBlock;
  SSampleExecInfo sample;  // sample execution info
  SSortExecInfo   sortExecInfo;
366 367
} STableMergeScanInfo;

368
typedef struct STagScanInfo {
369 370 371 372 373
  SColumnInfo*    pCols;
  SSDataBlock*    pRes;
  SColMatchInfo   matchInfo;
  int32_t         curPos;
  SReadHandle     readHandle;
374
  STableListInfo* pTableListInfo;
375 376
} STagScanInfo;

5
54liuyao 已提交
377 378 379 380
typedef enum EStreamScanMode {
  STREAM_SCAN_FROM_READERHANDLE = 1,
  STREAM_SCAN_FROM_RES,
  STREAM_SCAN_FROM_UPDATERES,
381
  STREAM_SCAN_FROM_DELETE_DATA,
5
54liuyao 已提交
382
  STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
L
Liu Jicong 已提交
383
  STREAM_SCAN_FROM_DATAREADER_RANGE,
5
54liuyao 已提交
384 385
} EStreamScanMode;

386 387 388 389 390
enum {
  PROJECT_RETRIEVE_CONTINUE = 0x1,
  PROJECT_RETRIEVE_DONE = 0x2,
};

5
54liuyao 已提交
391
typedef struct SStreamAggSupporter {
5
54liuyao 已提交
392 393 394
  int32_t         resultRowSize;  // the result buffer size for each result row, with the meta data size for each row
  SSDataBlock*    pScanBlock;
  SStreamState*   pState;
395 396
  int64_t         gap;        // stream session window gap
  SqlFunctionCtx* pDummyCtx;  // for combine
5
54liuyao 已提交
397 398 399 400
  SSHashObj*      pResultRows;
  int32_t         stateKeySize;
  int16_t         stateKeyType;
  SDiskbasedBuf*  pResultBuf;
5
54liuyao 已提交
401 402
} SStreamAggSupporter;

403
typedef struct SWindowSupporter {
5
54liuyao 已提交
404
  SStreamAggSupporter* pStreamAggSup;
L
Liu Jicong 已提交
405
  int64_t              gap;
406
  uint16_t             parentType;
407
  SAggSupporter*       pIntervalAggSup;
408
} SWindowSupporter;
409

410
typedef struct SPartitionBySupporter {
L
Liu Jicong 已提交
411 412 413 414
  SArray* pGroupCols;     // group by columns, SArray<SColumn>
  SArray* pGroupColVals;  // current group column values, SArray<SGroupKeys>
  char*   keyBuf;         // group by keys for hash
  bool    needCalc;       // partition by column
415 416 417
} SPartitionBySupporter;

typedef struct SPartitionDataInfo {
L
Liu Jicong 已提交
418
  uint64_t groupId;
419 420
  char*    tbname;
  SArray*  tags;
L
Liu Jicong 已提交
421
  SArray*  rowIds;
422
} SPartitionDataInfo;
423

5
54liuyao 已提交
424
typedef struct STimeWindowAggSupp {
L
Liu Jicong 已提交
425
  int8_t          calTrigger;
L
Liu Jicong 已提交
426
  int8_t          calTriggerSaved;
5
54liuyao 已提交
427
  int64_t         deleteMark;
L
Liu Jicong 已提交
428 429
  int64_t         deleteMarkSaved;
  int64_t         waterMark;
L
Liu Jicong 已提交
430
  TSKEY           maxTs;
5
54liuyao 已提交
431
  TSKEY           minTs;
L
liuyao 已提交
432 433
  TSKEY           checkPointTs;
  TSKEY           checkPointInterval;
L
Liu Jicong 已提交
434
  SColumnInfoData timeWindowData;  // query time window info for scalar function execution.
5
54liuyao 已提交
435 436
} STimeWindowAggSupp;

437
typedef struct SStreamScanInfo {
L
Liu Jicong 已提交
438 439 440 441 442 443 444 445 446
  uint64_t      tableUid;  // queried super table uid
  SExprInfo*    pPseudoExpr;
  int32_t       numOfPseudoExpr;
  SExprSupp     tbnameCalSup;
  SExprSupp     tagCalSup;
  int32_t       primaryTsIndex;  // primary time stamp slot id
  SReadHandle   readHandle;
  SInterval     interval;  // if the upstream is an interval operator, the interval info is also kept here.
  SColMatchInfo matchInfo;
L
Liu Jicong 已提交
447 448 449 450 451 452 453 454 455 456 457 458 459

  SArray*      pBlockLists;  // multiple SSDatablock.
  SSDataBlock* pRes;         // result SSDataBlock
  SSDataBlock* pUpdateRes;   // update SSDataBlock
  int32_t      updateResIndex;
  int32_t      blockType;        // current block type
  int32_t      validBlockIndex;  // Is current data has returned?
  uint64_t     numOfExec;        // execution times
  STqReader*   tqReader;

  uint64_t     groupId;
  SUpdateInfo* pUpdateInfo;

L
Liu Jicong 已提交
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475
  EStreamScanMode       scanMode;
  SOperatorInfo*        pStreamScanOp;
  SOperatorInfo*        pTableScanOp;
  SArray*               childIds;
  SWindowSupporter      windowSup;
  SPartitionBySupporter partitionSup;
  SExprSupp*            pPartScalarSup;
  bool                  assignBlockUid;  // assign block uid to groupId, temporarily used for generating rollup SMA.
  int32_t               scanWinIndex;    // for state operator
  int32_t               pullDataResIndex;
  SSDataBlock*          pPullDataRes;    // pull data SSDataBlock
  SSDataBlock*          pDeleteDataRes;  // delete data SSDataBlock
  int32_t               deleteDataIndex;
  STimeWindow           updateWin;
  STimeWindowAggSupp    twAggSup;
  SSDataBlock*          pUpdateDataRes;
L
Liu Jicong 已提交
476
  // status for tmq
L
Liu Jicong 已提交
477 478 479
  SNodeList* pGroupTags;
  SNode*     pTagCond;
  SNode*     pTagIndexCond;
L
Liu Jicong 已提交
480 481

  // recover
5
54liuyao 已提交
482 483 484
  int32_t      blockRecoverContiCnt;
  int32_t      blockRecoverTotCnt;
  SSDataBlock* pRecoverRes;
L
Liu Jicong 已提交
485

5
54liuyao 已提交
486
  SSDataBlock* pCreateTbRes;
X
Xiaoyu Wang 已提交
487 488
  int8_t       igCheckUpdate;
  int8_t       igExpired;
489
} SStreamScanInfo;
H
Haojun Liao 已提交
490

L
Liu Jicong 已提交
491
typedef struct {
492 493 494 495 496
  SVnode*         vnode;
  SSDataBlock     pRes;  // result SSDataBlock
  STsdbReader*    dataReader;
  SSnapContext*   sContext;
  STableListInfo* pTableListInfo;
L
Liu Jicong 已提交
497
} SStreamRawScanInfo;
498

S
slzhou 已提交
499 500 501 502
typedef struct STableCountScanSupp {
  int16_t dbNameSlotId;
  int16_t stbNameSlotId;
  int16_t tbCountSlotId;
503 504
  bool    groupByDbName;
  bool    groupByStbName;
S
slzhou 已提交
505 506
  char    dbNameFilter[TSDB_DB_NAME_LEN];
  char    stbNameFilter[TSDB_TABLE_NAME_LEN];
S
slzhou 已提交
507 508 509 510 511 512 513 514
} STableCountScanSupp;

typedef struct STableCountScanOperatorInfo {
  SReadHandle  readHandle;
  SSDataBlock* pRes;

  STableCountScanSupp supp;

L
Liu Jicong 已提交
515 516
  int32_t currGrpIdx;
  SArray* stbUidList;  // when group by db_name and/or stable_name
S
slzhou 已提交
517
} STableCountScanOperatorInfo;
S
shenglian zhou 已提交
518

519
typedef struct SOptrBasicInfo {
L
Liu Jicong 已提交
520 521 522
  SResultRowInfo resultRowInfo;
  SSDataBlock*   pRes;
  bool           mergeResultBlock;
523 524
} SOptrBasicInfo;

525 526 527 528 529 530 531 532 533 534
typedef struct SAggOperatorInfo {
  SOptrBasicInfo   binfo;
  SAggSupporter    aggSup;
  STableQueryInfo* current;
  uint64_t         groupId;
  SGroupResInfo    groupResInfo;
  SExprSupp        scalarExprSup;
  bool             groupKeyOptimized;
} SAggOperatorInfo;

535
typedef struct SIntervalAggOperatorInfo {
536
  SOptrBasicInfo     binfo;              // basic info
wmmhello's avatar
wmmhello 已提交
537
  SAggSupporter      aggSup;             // aggregate supporter
538
  SExprSupp          scalarSupp;         // supporter for perform scalar function
539 540 541 542 543
  SGroupResInfo      groupResInfo;       // multiple results build supporter
  SInterval          interval;           // interval info
  int32_t            primaryTsIndex;     // primary time stamp slot id from result of downstream operator.
  STimeWindow        win;                // query time range
  bool               timeWindowInterpo;  // interpolation needed or not
544
  SArray*            pInterpCols;        // interpolation columns
545
  int32_t            resultTsOrder;      // result timestamp order
546
  int32_t            inputOrder;         // input data ts order
547 548
  EOPTR_EXEC_MODEL   execModel;          // operator execution model [batch model|stream model]
  STimeWindowAggSupp twAggSup;
L
Liu Jicong 已提交
549
  SArray*            pPrevValues;  //  SArray<SGroupKeys> used to keep the previous not null value for interpolation.
550
} SIntervalAggOperatorInfo;
551

552
typedef struct SMergeAlignedIntervalAggOperatorInfo {
L
Liu Jicong 已提交
553
  SIntervalAggOperatorInfo* intervalAggOperatorInfo;
554

555 556
  uint64_t     groupId;  // current groupId
  int64_t      curTs;    // current ts
557
  SSDataBlock* prefetchedBlock;
558
  SResultRow*  pResultRow;
559 560
} SMergeAlignedIntervalAggOperatorInfo;

561
typedef struct SStreamIntervalOperatorInfo {
562 563 564 565 566 567
  SOptrBasicInfo     binfo;           // basic info
  SAggSupporter      aggSup;          // aggregate supporter
  SExprSupp          scalarSupp;      // supporter for perform scalar function
  SGroupResInfo      groupResInfo;    // multiple results build supporter
  SInterval          interval;        // interval info
  int32_t            primaryTsIndex;  // primary time stamp slot id from result of downstream operator.
568 569 570
  STimeWindowAggSupp twAggSup;
  bool               invertible;
  bool               ignoreExpiredData;
571
  bool               ignoreExpiredDataSaved;
572
  SArray*            pDelWins;  // SWinRes
573 574
  int32_t            delIndex;
  SSDataBlock*       pDelRes;
575
  SPhysiNode*        pPhyNode;  // create new child
5
54liuyao 已提交
576
  SHashObj*          pPullDataMap;
577
  SArray*            pPullWins;  // SPullWindowInfo
5
54liuyao 已提交
578 579
  int32_t            pullIndex;
  SSDataBlock*       pPullDataRes;
580 581 582 583
  bool               isFinal;
  SArray*            pChildren;
  SStreamState*      pState;
  SWinKey            delKey;
5
54liuyao 已提交
584
  uint64_t           numOfDatapack;
5
54liuyao 已提交
585
  SArray*            pUpdated;
586
  SSHashObj*         pUpdatedMap;
L
liuyao 已提交
587
  int64_t            dataVersion;
588
} SStreamIntervalOperatorInfo;
5
54liuyao 已提交
589

H
Haojun Liao 已提交
590
typedef struct SDataGroupInfo {
L
Liu Jicong 已提交
591 592 593
  uint64_t groupId;
  int64_t  numOfRows;
  SArray*  pPageList;
H
Haojun Liao 已提交
594 595
} SDataGroupInfo;

596
typedef struct SWindowRowsSup {
dengyihao's avatar
dengyihao 已提交
597 598 599 600
  STimeWindow win;
  TSKEY       prevTs;
  int32_t     startRowIndex;
  int32_t     numOfRows;
601
  uint64_t    groupId;
602 603
} SWindowRowsSup;

5
54liuyao 已提交
604
typedef struct SResultWindowInfo {
605 606 607
  void*       pOutputBuf;
  SSessionKey sessionWin;
  bool        isOutput;
5
54liuyao 已提交
608 609
} SResultWindowInfo;

5
54liuyao 已提交
610 611
typedef struct SStateWindowInfo {
  SResultWindowInfo winInfo;
5
54liuyao 已提交
612
  SStateKeys*       pStateKey;
5
54liuyao 已提交
613 614
} SStateWindowInfo;

5
54liuyao 已提交
615
typedef struct SStreamSessionAggOperatorInfo {
L
Liu Jicong 已提交
616 617
  SOptrBasicInfo      binfo;
  SStreamAggSupporter streamAggSup;
618
  SExprSupp           scalarSupp;  // supporter for perform scalar function
L
Liu Jicong 已提交
619 620 621 622 623
  SGroupResInfo       groupResInfo;
  int32_t             primaryTsIndex;  // primary timestamp slot id
  int32_t             endTsIndex;      // window end timestamp slot id
  int32_t             order;           // current SSDataBlock scan order
  STimeWindowAggSupp  twAggSup;
624 625 626
  SSDataBlock*        pWinBlock;   // window result
  SSDataBlock*        pDelRes;     // delete result
  SSDataBlock*        pUpdateRes;  // update window
L
Liu Jicong 已提交
627
  bool                returnUpdate;
5
54liuyao 已提交
628
  SSHashObj*          pStDeleted;
L
Liu Jicong 已提交
629
  void*               pDelIterator;
630 631
  SArray*             pChildren;  // cache for children's result; final stream operator
  SPhysiNode*         pPhyNode;   // create new child
L
Liu Jicong 已提交
632 633
  bool                isFinal;
  bool                ignoreExpiredData;
634
  bool                ignoreExpiredDataSaved;
5
54liuyao 已提交
635 636
  SArray*             pUpdated;
  SSHashObj*          pStUpdated;
L
liuyao 已提交
637
  int64_t             dataVersion;
5
54liuyao 已提交
638 639
} SStreamSessionAggOperatorInfo;

5
54liuyao 已提交
640 641 642
typedef struct SStreamStateAggOperatorInfo {
  SOptrBasicInfo      binfo;
  SStreamAggSupporter streamAggSup;
643
  SExprSupp           scalarSupp;  // supporter for perform scalar function
5
54liuyao 已提交
644 645 646 647 648 649 650
  SGroupResInfo       groupResInfo;
  int32_t             primaryTsIndex;  // primary timestamp slot id
  STimeWindowAggSupp  twAggSup;
  SColumn             stateCol;
  SSDataBlock*        pDelRes;
  SSHashObj*          pSeDeleted;
  void*               pDelIterator;
651
  SArray*             pChildren;  // cache for children's result;
5
54liuyao 已提交
652
  bool                ignoreExpiredData;
653
  bool                ignoreExpiredDataSaved;
5
54liuyao 已提交
654 655
  SArray*             pUpdated;
  SSHashObj*          pSeUpdated;
L
liuyao 已提交
656
  int64_t             dataVersion;
5
54liuyao 已提交
657 658
} SStreamStateAggOperatorInfo;

659 660 661 662
typedef struct SStreamPartitionOperatorInfo {
  SOptrBasicInfo        binfo;
  SPartitionBySupporter partitionSup;
  SExprSupp             scalarSup;
663
  SExprSupp             tbnameCalSup;
L
Liu Jicong 已提交
664
  SExprSupp             tagCalSup;
665 666
  SHashObj*             pPartitions;
  void*                 parIte;
5
54liuyao 已提交
667
  void*                 pTbNameIte;
668 669
  SSDataBlock*          pInputDataBlock;
  int32_t               tsColIndex;
670
  SSDataBlock*          pDelRes;
5
54liuyao 已提交
671
  SSDataBlock*          pCreateTbRes;
672 673
} SStreamPartitionOperatorInfo;

5
54liuyao 已提交
674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690
typedef struct SStreamFillSupporter {
  int32_t        type;  // fill type
  SInterval      interval;
  SResultRowData prev;
  SResultRowData cur;
  SResultRowData next;
  SResultRowData nextNext;
  SFillColInfo*  pAllColInfo;  // fill exprs and not fill exprs
  SExprSupp      notFillExprSup;
  int32_t        numOfAllCols;  // number of all exprs, including the tags columns
  int32_t        numOfFillCols;
  int32_t        numOfNotFillCols;
  int32_t        rowSize;
  SSHashObj*     pResMap;
  bool           hasDelete;
} SStreamFillSupporter;

5
54liuyao 已提交
691 692
typedef struct SStreamFillOperatorInfo {
  SStreamFillSupporter* pFillSup;
693 694 695 696 697 698
  SSDataBlock*          pRes;
  SSDataBlock*          pSrcBlock;
  int32_t               srcRowIndex;
  SSDataBlock*          pSrcDelBlock;
  int32_t               srcDelRowIndex;
  SSDataBlock*          pDelRes;
L
Liu Jicong 已提交
699
  SColMatchInfo         matchInfo;
700 701 702
  int32_t               primaryTsCol;
  int32_t               primarySrcSlotId;
  SStreamFillInfo*      pFillInfo;
5
54liuyao 已提交
703 704
} SStreamFillOperatorInfo;

705 706 707
#define OPTR_IS_OPENED(_optr)  (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)

X
Xiaoyu Wang 已提交
708 709
SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model,
                                    char* dbFName);
710

H
Haojun Liao 已提交
711
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
712 713
                                   __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain);
int32_t        optrDummyOpenFn(SOperatorInfo* pOperator);
H
Haojun Liao 已提交
714 715 716 717 718
int32_t        appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
void           setOperatorCompleted(SOperatorInfo* pOperator);
void           setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
                               void* pInfo, SExecTaskInfo* pTaskInfo);
void           destroyOperatorInfo(SOperatorInfo* pOperator);
719
int32_t        optrDefaultBufFn(SOperatorInfo* pOperator);
720

L
Liu Jicong 已提交
721 722
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
H
Haojun Liao 已提交
723

724
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
725
void    cleanupExprSupp(SExprSupp* pSup);
H
Haojun Liao 已提交
726

L
Liu Jicong 已提交
727
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
H
Haojun Liao 已提交
728

H
Haojun Liao 已提交
729
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
L
Liu Jicong 已提交
730
                   const char* pkey, void* pState);
H
Haojun Liao 已提交
731 732
void    cleanupAggSup(SAggSupporter* pAggSup);

L
Liu Jicong 已提交
733
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
734 735
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf);
736

H
Haojun Liao 已提交
737
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
738
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
H
Haojun Liao 已提交
739
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
740
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
741
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
742

H
Haojun Liao 已提交
743 744
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
                                     int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
745

H
Haojun Liao 已提交
746
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
747
void    updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
L
Liu Jicong 已提交
748
                             SOperatorInfo* pOperator);
749

750 751
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);

752
SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id);
X
Xiaoyu Wang 已提交
753 754
int32_t        getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder);
int32_t        getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
755

H
Haojun Liao 已提交
756
extern void doDestroyExchangeOperatorInfo(void* param);
H
Haojun Liao 已提交
757

H
Haojun Liao 已提交
758
void    doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
L
Liu Jicong 已提交
759 760
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);
761

L
Liu Jicong 已提交
762
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
H
Haojun Liao 已提交
763
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
764

765
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
766
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
767

L
Liu Jicong 已提交
768 769
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
D
dapan1121 已提交
770
                                   bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup);
H
Haojun Liao 已提交
771 772
// operator creater functions
// clang-format off
773 774
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);

775
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableList, SExecTaskInfo* pTaskInfo);
H
Haojun Liao 已提交
776

777
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
H
Haojun Liao 已提交
778

779
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
H
Haojun Liao 已提交
780 781

SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
782

783 784
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* handle, STableCountScanPhysiNode* pNode, SExecTaskInfo* pTaskInfo);

H
Haojun Liao 已提交
785
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
786

H
Haojun Liao 已提交
787 788 789 790
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);

SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);

791
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo);
H
Haojun Liao 已提交
792 793 794

SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);

795
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
H
Haojun Liao 已提交
796

5
54liuyao 已提交
797
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
H
Haojun Liao 已提交
798 799 800 801 802 803 804 805 806

SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo);

SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SExecTaskInfo* pTaskInfo);

SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);

SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo);

807
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo);
808

809
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
H
Haojun Liao 已提交
810

811
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
812

813 814
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);

H
Haojun Liao 已提交
815 816 817
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);

SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
818

H
Haojun Liao 已提交
819
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
820

H
Haojun Liao 已提交
821
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
822

823
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
L
Liu Jicong 已提交
824

H
Haojun Liao 已提交
825 826 827 828 829 830 831 832 833
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);

SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);

SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);

SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);

SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
5
54liuyao 已提交
834

H
Haojun Liao 已提交
835
SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
H
Haojun Liao 已提交
836 837

SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
H
Haojun Liao 已提交
838 839

SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo);
H
Haojun Liao 已提交
840
// clang-format on
5
54liuyao 已提交
841

842
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
L
Liu Jicong 已提交
843
                              int32_t numOfOutput, SArray* pPseudoList);
844

845
void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
X
Xiaoyu Wang 已提交
846

847 848
int32_t checkForQueryBuf(size_t numOfTables);

L
Liu Jicong 已提交
849 850 851 852
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
H
Haojun Liao 已提交
853

D
dapan1121 已提交
854 855
char* buildTaskId(uint64_t taskId, uint64_t queryId);

856 857
SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo);

D
dapan1121 已提交
858
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
X
Xiaoyu Wang 已提交
859 860 861
                           int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, STableListInfo* pTableListInfo,
                            SReadHandle* readHandle);
H
Haojun Liao 已提交
862
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
dengyihao's avatar
dengyihao 已提交
863

864
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
865
                                int32_t order);
866
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
L
Liu Jicong 已提交
867
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
5
54liuyao 已提交
868
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
869
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
5
54liuyao 已提交
870
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
871 872 873 874 875 876 877
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName);
L
Liu Jicong 已提交
878
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
879

880 881
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
882

L
Liu Jicong 已提交
883
bool    groupbyTbname(SNodeList* pGroupList);
5
54liuyao 已提交
884
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
885
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
886
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
887 888
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
5
54liuyao 已提交
889
void    getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
L
Liu Jicong 已提交
890
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
X
Xiaoyu Wang 已提交
891 892
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
                               int64_t* pData);
5
54liuyao 已提交
893
void    appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
5
54liuyao 已提交
894 895 896 897
                             SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock);

SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
SExprInfo*   createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
898

H
Haojun Liao 已提交
899
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
X
Xiaoyu Wang 已提交
900 901
                              SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
902
void doClearBufferedBlocks(SStreamScanInfo* pInfo);
H
Haojun Liao 已提交
903

904 905 906 907
#ifdef __cplusplus
}
#endif

908
#endif  // TDENGINE_EXECUTORIMPL_H