qExecutor.h 15.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef TDENGINE_QUERYEXECUTOR_H
#define TDENGINE_QUERYEXECUTOR_H

#include "os.h"

#include "hash.h"
H
Haojun Liao 已提交
21
#include "qAggMain.h"
H
Haojun Liao 已提交
22 23
#include "qFill.h"
#include "qResultbuf.h"
24
#include "qSqlparser.h"
H
Haojun Liao 已提交
25
#include "qTsbuf.h"
26
#include "query.h"
27
#include "taosdef.h"
H
Haojun Liao 已提交
28
#include "tarray.h"
B
Bomin Zhang 已提交
29
#include "tlockfree.h"
H
Haojun Liao 已提交
30
#include "tsdb.h"
31 32

struct SColumnFilterElem;
33
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, const char* val1, const char* val2, int16_t type);
H
hjxilinx 已提交
34
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
35

H
Haojun Liao 已提交
36 37 38 39 40 41 42 43 44
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
#define Q_STATUS_EQUAL(p, s)  (((p) & (s)) != 0u)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)

#define SET_STABLE_QUERY_OVER(_q) ((_q)->tableIndex = (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
#define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))

#define GET_TABLEGROUP(q, _index)   ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))

H
Haojun Liao 已提交
45 46
#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows)

H
Haojun Liao 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
enum {
  // when query starts to execute, this status will set
      QUERY_NOT_COMPLETED = 0x1u,

  /* result output buffer is full, current query is paused.
   * this status is only exist in group-by clause and diff/add/division/multiply/ query.
   */
      QUERY_RESBUF_FULL = 0x2u,

  /* query is over
   * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
   * 2. when all data within queried time window, it is also denoted as query_completed
   */
      QUERY_COMPLETED = 0x4u,

  /* when the result is not completed return to client, this status will be
   * usually used in case of interval query with interpolation option
   */
      QUERY_OVER = 0x8u,
};

H
Haojun Liao 已提交
68
typedef struct SResultRowPool {
69 70 71 72 73 74 75 76 77 78
  int32_t elemSize;
  int32_t blockSize;
  int32_t numOfElemPerBlock;

  struct {
    int32_t blockIndex;
    int32_t pos;
  } position;

  SArray* pData;    // SArray<void*>
H
Haojun Liao 已提交
79
} SResultRowPool;
80

81
typedef struct SSqlGroupbyExpr {
H
Haojun Liao 已提交
82 83 84 85 86
  int16_t tableIndex;
  SArray* columnInfo;  // SArray<SColIndex>, group by columns information
  int16_t numOfGroupCols;
  int16_t orderIndex;  // order by column index
  int16_t orderType;   // order by type: asc/desc
87 88
} SSqlGroupbyExpr;

H
Haojun Liao 已提交
89
typedef struct SResultRow {
90
  int32_t       pageId;      // pageId & rowId is the position of current result in disk-based output buffer
H
Haojun Liao 已提交
91
  int32_t       offset:29;    // row index in buffer page
92 93 94 95
  bool          startInterp; // the time window start timestamp has done the interpolation already.
  bool          endInterp;   // the time window end timestamp has done the interpolation already.
  bool          closed;      // this result status: closed or opened
  uint32_t      numOfRows;   // number of rows of current time window
H
Haojun Liao 已提交
96
  SResultRowCellInfo*  pCellInfo;  // For each result column, there is a resultInfo
97
  union {STimeWindow win; char* key;};  // start key of current time window
H
Haojun Liao 已提交
98
} SResultRow;
99

100
typedef struct SGroupResInfo {
H
Haojun Liao 已提交
101 102
  int32_t totalGroup;
  int32_t currentGroup;
103 104 105 106
  int32_t index;
  SArray* pRows;      // SArray<SResultRow*>
} SGroupResInfo;

H
Haojun Liao 已提交
107 108 109 110
/**
 * If the number of generated results is greater than this value,
 * query query will be halt and return results to client immediate.
 */
H
Haojun Liao 已提交
111
typedef struct SRspResultInfo {
H
Haojun Liao 已提交
112 113 114
  int64_t total;      // total generated result size in rows
  int64_t capacity;   // capacity of current result output buffer
  int32_t threshold;  // result size threshold in rows.
H
Haojun Liao 已提交
115
} SRspResultInfo;
116

H
Haojun Liao 已提交
117
typedef struct SResultRowInfo {
H
Haojun Liao 已提交
118 119 120 121 122 123
  SResultRow** pResult;    // result list
  int16_t      type:8;     // data type for hash key
  int32_t      size:24;    // number of result set
  int32_t      capacity;   // max capacity
  int32_t      curIndex;   // current start active index
  int64_t      prevSKey;   // previous (not completed) sliding window start key
H
Haojun Liao 已提交
124
} SResultRowInfo;
125 126 127 128 129 130 131 132

typedef struct SColumnFilterElem {
  int16_t           bytes;  // column length
  __filter_func_t   fp;
  SColumnFilterInfo filterInfo;
} SColumnFilterElem;

typedef struct SSingleColumnFilterInfo {
H
Haojun Liao 已提交
133
  void*              pData;
134
  int32_t            numOfFilters;
H
Haojun Liao 已提交
135
  SColumnInfo        info;
136 137 138
  SColumnFilterElem* pFilters;
} SSingleColumnFilterInfo;

H
Haojun Liao 已提交
139
typedef struct STableQueryInfo {
H
hjxilinx 已提交
140
  TSKEY       lastKey;
H
Haojun Liao 已提交
141
  int32_t     groupIndex;     // group id in table list
142
  int16_t     queryRangeSet;  // denote if the query range is set, only available for interval query
143
  tVariant    tag;
H
hjxilinx 已提交
144
  STimeWindow win;
145
  STSCursor   cur;
H
Haojun Liao 已提交
146
  void*       pTable;         // for retrieve the page id list
H
Haojun Liao 已提交
147
  SResultRowInfo resInfo;
148 149
} STableQueryInfo;

H
Haojun Liao 已提交
150 151 152 153 154 155 156 157 158
typedef struct SQueryCostInfo {
  uint64_t loadStatisTime;
  uint64_t loadFileBlockTime;
  uint64_t loadDataInCacheTime;
  uint64_t loadStatisSize;
  uint64_t loadFileBlockSize;
  uint64_t loadDataInCacheSize;
  
  uint64_t loadDataTime;
159 160 161 162
  uint64_t totalRows;
  uint64_t totalCheckedRows;
  uint32_t totalBlocks;
  uint32_t loadBlocks;
H
Haojun Liao 已提交
163 164
  uint32_t loadBlockStatis;
  uint32_t discardBlocks;
165
  uint64_t elapsedTime;
H
Haojun Liao 已提交
166
  uint64_t firstStageMergeTime;
H
Haojun Liao 已提交
167 168
  uint64_t winInfoSize;
  uint64_t tableInfoSize;
H
Haojun Liao 已提交
169
  uint64_t hashSize;
170
  uint64_t numOfTimeWindows;
H
Haojun Liao 已提交
171
} SQueryCostInfo;
H
hjxilinx 已提交
172

H
Haojun Liao 已提交
173 174 175 176 177
typedef struct {
  int64_t vgroupLimit;
  int64_t ts;
} SOrderedPrjQueryInfo;

178 179 180 181 182
typedef struct {
  char*   tags;
  SArray* pResult;  // SArray<SStddevInterResult>
} SInterResult;

H
Haojun Liao 已提交
183 184 185 186 187 188
typedef struct SSDataBlock {
  SDataStatis *pBlockStatis;
  SArray      *pDataBlock;
  SDataBlockInfo info;
} SSDataBlock;

189
typedef struct SQuery {
H
Haojun Liao 已提交
190 191
  SLimitVal        limit;

H
Haojun Liao 已提交
192 193 194 195 196 197 198 199 200
  bool             stableQuery;      // super table query or not
  bool             topBotQuery;      // TODO used bitwise flag
  bool             groupbyColumn;    // denote if this is a groupby normal column query
  bool             hasTagResults;    // if there are tag values in final result or not
  bool             timeWindowInterpo;// if the time window start/end required interpolation
  bool             queryWindowIdentical; // all query time windows are identical for all tables in one group
  bool             queryBlockDist;    // if query data block distribution
  bool             stabledev;        // super table stddev query
  int32_t          interBufSize;     // intermediate buffer sizse
H
Haojun Liao 已提交
201 202 203

  SOrderVal        order;

H
Haojun Liao 已提交
204 205
  int16_t          numOfCols;
  int16_t          numOfTags;
H
Haojun Liao 已提交
206

H
Haojun Liao 已提交
207
  STimeWindow      window;
208
  SInterval        interval;
209
  int16_t          precision;
H
Haojun Liao 已提交
210 211
  int16_t          numOfOutput;
  int16_t          fillType;
212 213 214 215 216 217
  int16_t          checkResultBuf;   // check if the buffer is full during scan each block

  int32_t          srcRowSize;       // todo extract struct
  int32_t          resultRowSize;
  int32_t          maxSrcColumnSize;
  int32_t          tagLen;           // tag value length of current query
H
Haojun Liao 已提交
218
  SSqlGroupbyExpr* pGroupbyExpr;
H
Haojun Liao 已提交
219
  SExprInfo*       pExpr1;
H
Haojun Liao 已提交
220 221
  SExprInfo*       pExpr2;
  int32_t          numOfExpr2;
H
Haojun Liao 已提交
222 223 224 225
  SColumnInfo*     colList;
  SColumnInfo*     tagColList;
  int32_t          numOfFilterCols;
  int64_t*         fillVal;
H
Haojun Liao 已提交
226 227 228
  SOrderedPrjQueryInfo prjInfo;        // limit value for each vgroup, only available in global order projection query.
  SSingleColumnFilterInfo* pFilterInfo;

229
  uint32_t         status;             // query status
H
Haojun Liao 已提交
230
  STableQueryInfo* current;
H
Haojun Liao 已提交
231 232
  void*            tsdb;
  SMemRef          memRef;
H
Haojun Liao 已提交
233
  STableGroupInfo  tableGroupInfo;       // table <tid, last_key> list  SArray<STableKeyInfo>
H
Haojun Liao 已提交
234
  int32_t          vgId;
235 236
} SQuery;

H
Haojun Liao 已提交
237
typedef SSDataBlock* (*__operator_fn_t)(void* param);
H
Haojun Liao 已提交
238
typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
H
Haojun Liao 已提交
239

H
Haojun Liao 已提交
240
struct SOperatorInfo;
H
Haojun Liao 已提交
241

H
Haojun Liao 已提交
242 243 244 245
typedef struct {
  FILE* file;   // file struct pointer
} SFileResultInfo;

246
typedef struct SQueryRuntimeEnv {
H
Haojun Liao 已提交
247
  jmp_buf              env;
H
Haojun Liao 已提交
248
  SQuery*              pQuery;
H
Haojun Liao 已提交
249
  void*                qinfo;
H
Haojun Liao 已提交
250
  uint16_t             scanFlag;         // denotes reversed scan of data or not
H
Haojun Liao 已提交
251
  SFillInfo*           pFillInfo;        // todo move to operatorInfo
H
Haojun Liao 已提交
252
  void*                pQueryHandle;
H
Haojun Liao 已提交
253

H
Haojun Liao 已提交
254
  int32_t              prevGroupId;      // previous executed group id
H
Haojun Liao 已提交
255
  SDiskbasedResultBuf* pResultBuf;       // query result buffer based on blocked-wised disk file
H
Haojun Liao 已提交
256
  SHashObj*            pResultRowHashTable; // quick locate the window object for each result
H
Haojun Liao 已提交
257
  char*                keyBuf;           // window key buffer
258
  SResultRowPool*      pool;             // window result object pool
259
  char**               prevRow;
260

261 262 263 264 265
  SArray*              prevResult;       // intermediate result, SArray<SInterResult>
  STSBuf*              pTsBuf;           // timestamp filter list
  STSCursor            cur;

  char*                tagVal;           // tag value of current data block
266
  SArithmeticSupport  *sasArray;
H
Haojun Liao 已提交
267

H
Haojun Liao 已提交
268 269 270 271 272 273 274
  SSDataBlock         *outputBuf;
  int32_t              tableIndex;  //TODO remove it
  STableGroupInfo      tableqinfoGroupInfo;  // this is a group array list, including SArray<STableQueryInfo*> structure
  struct SOperatorInfo *proot;
  struct SOperatorInfo *pTableScanner;   // table scan operator
  SGroupResInfo         groupResInfo;
  int64_t               currentOffset;   // dynamic offset value
H
Haojun Liao 已提交
275 276

  SRspResultInfo   resultInfo;
277 278
} SQueryRuntimeEnv;

H
Haojun Liao 已提交
279 280 281 282 283 284
enum {
  OP_IN_EXECUTING   = 1,
  OP_RES_TO_RETURN  = 2,
  OP_EXEC_DONE      = 3,
};

H
Haojun Liao 已提交
285
typedef struct SOperatorInfo {
H
Haojun Liao 已提交
286 287
  uint8_t           operatorType;
  bool              blockingOptr;  // block operator or not
H
Haojun Liao 已提交
288
  uint8_t           status;        // denote if current operator is completed
H
Haojun Liao 已提交
289 290 291 292 293
  int32_t           numOfOutput;   // number of columns of the current operator results
  char             *name;          // name, used to show the query execution plan
  void             *info;          // extension attribution
  SExprInfo        *pExpr;
  SQueryRuntimeEnv *pRuntimeEnv;
H
Haojun Liao 已提交
294

H
Haojun Liao 已提交
295
  struct SOperatorInfo *upstream;
H
Haojun Liao 已提交
296
  __operator_fn_t       exec;
H
Haojun Liao 已提交
297
  __optr_cleanup_fn_t   cleanup;
H
Haojun Liao 已提交
298 299
} SOperatorInfo;

300 301 302 303 304
enum {
  QUERY_RESULT_NOT_READY = 1,
  QUERY_RESULT_READY     = 2,
};

305
typedef struct SQInfo {
H
Haojun Liao 已提交
306
  void*            signature;
H
Haojun Liao 已提交
307
  int32_t          code;   // error code to returned to client
308
  int64_t          owner;  // if it is in execution
H
Haojun Liao 已提交
309

310
  SQueryRuntimeEnv runtimeEnv;
H
Haojun Liao 已提交
311
  SQuery           query;
312
  SHashObj*        arrTableIdInfo;
H
Haojun Liao 已提交
313

314 315 316 317
  /*
   * the query is executed position on which meter of the whole list.
   * when the index reaches the last one of the list, it means the query is completed.
   */
318
  void*            pBuf;        // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
H
Haojun Liao 已提交
319

320
  pthread_mutex_t  lock;        // used to synchronize the rsp/query threads
H
Haojun Liao 已提交
321
  tsem_t           ready;
322 323
  int32_t          dataReady;   // denote if query result is ready or not
  void*            rspContext;  // response context
324
  int64_t          startExecTs; // start to exec timestamp
325
  char*            sql;         // query sql string
H
Haojun Liao 已提交
326
  SQueryCostInfo   summary;
327 328
} SQInfo;

H
Haojun Liao 已提交
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
typedef struct SQueryParam {
  char            *sql;
  char            *tagCond;
  char            *tbnameCond;
  char            *prevResult;
  SArray          *pTableIdList;
  SSqlFuncMsg    **pExprMsg;
  SSqlFuncMsg    **pSecExprMsg;
  SExprInfo       *pExprs;
  SExprInfo       *pSecExprs;

  SColIndex       *pGroupColIndex;
  SColumnInfo     *pTagColumnInfo;
  SSqlGroupbyExpr *pGroupbyExpr;
} SQueryParam;

H
Haojun Liao 已提交
345
typedef struct STableScanInfo {
H
Haojun Liao 已提交
346
  SQueryRuntimeEnv *pRuntimeEnv;
H
Haojun Liao 已提交
347

H
Haojun Liao 已提交
348 349 350 351 352 353 354 355 356 357 358 359
  void           *pQueryHandle;
  int32_t         numOfBlocks;
  int32_t         numOfSkipped;
  int32_t         numOfBlockStatis;
  int64_t         numOfRows;
                 
  int32_t         order;        // scan order
  int32_t         times;        // repeat counts
  int32_t         current;
  int32_t         reverseTimes; // 0 by default

  SQLFunctionCtx *pCtx;         // next operator query context
H
Haojun Liao 已提交
360
  SResultRowInfo *pResultRowInfo;
H
Haojun Liao 已提交
361 362
  int32_t        *rowCellInfoOffset;
  SExprInfo      *pExpr;
H
Haojun Liao 已提交
363 364 365
  SSDataBlock     block;
  bool            loadExternalRows; // load external rows (prev & next rows)
  bool            externalLoaded;   // external rows loaded
H
Haojun Liao 已提交
366
  int32_t         numOfOutput;
H
Haojun Liao 已提交
367
  int64_t         elapsedTime;
H
Haojun Liao 已提交
368

H
Haojun Liao 已提交
369
  int32_t         tableIndex;
H
Haojun Liao 已提交
370 371
} STableScanInfo;

372 373 374
typedef struct STagScanInfo {
  SColumnInfo* pCols;
  SSDataBlock* pRes;
H
Haojun Liao 已提交
375 376
  int32_t      totalTables;
  int32_t      currentIndex;
377 378
} STagScanInfo;

H
Haojun Liao 已提交
379
typedef struct SOptrBasicInfo {
H
Haojun Liao 已提交
380
  SResultRowInfo    resultRowInfo;
H
Haojun Liao 已提交
381
  int32_t          *rowCellInfoOffset;  // offset value for each row result cell info
H
Haojun Liao 已提交
382
  SQLFunctionCtx   *pCtx;
H
Haojun Liao 已提交
383
  SSDataBlock      *pRes;
H
Haojun Liao 已提交
384 385
} SOptrBasicInfo;

H
Haojun Liao 已提交
386 387 388 389 390 391
typedef struct SOptrBasicInfo STableIntervalOperatorInfo;

typedef struct SAggOperatorInfo {
  SOptrBasicInfo binfo;
  uint32_t       seed;
} SAggOperatorInfo;
H
Haojun Liao 已提交
392

H
Haojun Liao 已提交
393
typedef struct SArithOperatorInfo {
H
Haojun Liao 已提交
394 395
  SOptrBasicInfo binfo;
  int32_t        bufCapacity;
H
Haojun Liao 已提交
396
  uint32_t       seed;
H
Haojun Liao 已提交
397 398
} SArithOperatorInfo;

H
Haojun Liao 已提交
399 400 401 402 403 404 405 406 407
typedef struct SLimitOperatorInfo {
  int64_t limit;
  int64_t total;
} SLimitOperatorInfo;

typedef struct SOffsetOperatorInfo {
  int64_t offset;
} SOffsetOperatorInfo;

H
Haojun Liao 已提交
408
typedef struct SFillOperatorInfo {
H
Haojun Liao 已提交
409 410
  SSDataBlock *pRes;
  int64_t      totalInputRows;
H
Haojun Liao 已提交
411 412
} SFillOperatorInfo;

H
Haojun Liao 已提交
413 414 415 416
typedef struct SGroupbyOperatorInfo {
  SOptrBasicInfo binfo;
  int32_t        colIndex;
} SGroupbyOperatorInfo;
H
Haojun Liao 已提交
417

H
Haojun Liao 已提交
418 419 420 421
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
                                   SColumnInfo* pTagCols);
H
Haojun Liao 已提交
422 423 424
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo,
                                           SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr);

H
Haojun Liao 已提交
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
                        SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql);
int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);

bool isQueryKilled(SQInfo *pQInfo);
int32_t checkForQueryBuf(size_t numOfTables);
bool doBuildResCheck(SQInfo* pQInfo);
void setQueryStatus(SQuery *pQuery, int8_t status);

bool onlyQueryTags(SQuery* pQuery);
void buildTagQueryResult(SQInfo *pQInfo);
void stableQueryImpl(SQInfo *pQInfo);
void buildTableBlockDistResult(SQInfo *pQInfo);
void tableQueryImpl(SQInfo *pQInfo);
bool isValidQInfo(void *param);

int32_t doDumpQueryResult(SQInfo *pQInfo, char *data);

size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo);
void queryCostStatis(SQInfo *pQInfo);
void freeQInfo(SQInfo *pQInfo);

int32_t getMaximumIdleDurationSec();

452
#endif  // TDENGINE_QUERYEXECUTOR_H