qExecutor.h 12.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef TDENGINE_QUERYEXECUTOR_H
#define TDENGINE_QUERYEXECUTOR_H

#include "os.h"

#include "hash.h"
H
Haojun Liao 已提交
21
#include "qAggMain.h"
H
Haojun Liao 已提交
22 23
#include "qFill.h"
#include "qResultbuf.h"
24
#include "qSqlparser.h"
H
Haojun Liao 已提交
25
#include "qTsbuf.h"
26
#include "query.h"
27
#include "taosdef.h"
H
Haojun Liao 已提交
28
#include "tarray.h"
B
Bomin Zhang 已提交
29
#include "tlockfree.h"
H
Haojun Liao 已提交
30
#include "tsdb.h"
31 32

struct SColumnFilterElem;
33
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, const char* val1, const char* val2, int16_t type);
H
hjxilinx 已提交
34
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
35

H
Haojun Liao 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
#define Q_STATUS_EQUAL(p, s)  (((p) & (s)) != 0u)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)

#define SET_STABLE_QUERY_OVER(_q) ((_q)->tableIndex = (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
#define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))

#define GET_TABLEGROUP(q, _index)   ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))

enum {
  // when query starts to execute, this status will set
      QUERY_NOT_COMPLETED = 0x1u,

  /* result output buffer is full, current query is paused.
   * this status is only exist in group-by clause and diff/add/division/multiply/ query.
   */
      QUERY_RESBUF_FULL = 0x2u,

  /* query is over
   * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
   * 2. when all data within queried time window, it is also denoted as query_completed
   */
      QUERY_COMPLETED = 0x4u,

  /* when the result is not completed return to client, this status will be
   * usually used in case of interval query with interpolation option
   */
      QUERY_OVER = 0x8u,
};

H
Haojun Liao 已提交
66
typedef struct SResultRowPool {
67 68 69 70 71 72 73 74 75 76
  int32_t elemSize;
  int32_t blockSize;
  int32_t numOfElemPerBlock;

  struct {
    int32_t blockIndex;
    int32_t pos;
  } position;

  SArray* pData;    // SArray<void*>
H
Haojun Liao 已提交
77
} SResultRowPool;
78

79
typedef struct SSqlGroupbyExpr {
H
Haojun Liao 已提交
80 81 82 83 84
  int16_t tableIndex;
  SArray* columnInfo;  // SArray<SColIndex>, group by columns information
  int16_t numOfGroupCols;
  int16_t orderIndex;  // order by column index
  int16_t orderType;   // order by type: asc/desc
85 86
} SSqlGroupbyExpr;

H
Haojun Liao 已提交
87
typedef struct SResultRow {
88
  int32_t       pageId;      // pageId & rowId is the position of current result in disk-based output buffer
89 90 91 92 93
  int32_t       rowId:29;    // row index in buffer page
  bool          startInterp; // the time window start timestamp has done the interpolation already.
  bool          endInterp;   // the time window end timestamp has done the interpolation already.
  bool          closed;      // this result status: closed or opened
  uint32_t      numOfRows;   // number of rows of current time window
H
Haojun Liao 已提交
94
  SResultRowCellInfo*  pCellInfo;  // For each result column, there is a resultInfo
95
  union {STimeWindow win; char* key;};  // start key of current time window
H
Haojun Liao 已提交
96
} SResultRow;
97

98
typedef struct SGroupResInfo {
H
Haojun Liao 已提交
99 100
  int32_t totalGroup;
  int32_t currentGroup;
101 102 103 104
  int32_t index;
  SArray* pRows;      // SArray<SResultRow*>
} SGroupResInfo;

H
Haojun Liao 已提交
105 106 107 108
/**
 * If the number of generated results is greater than this value,
 * query query will be halt and return results to client immediate.
 */
109
typedef struct SResultRec {
H
Haojun Liao 已提交
110 111 112 113
  int64_t total;      // total generated result size in rows
  int64_t rows;       // current result set size in rows
  int64_t capacity;   // capacity of current result output buffer
  int32_t threshold;  // result size threshold in rows.
114 115
} SResultRec;

H
Haojun Liao 已提交
116
typedef struct SResultRowInfo {
H
Haojun Liao 已提交
117 118 119 120 121 122
  SResultRow** pResult;    // result list
  int16_t      type:8;     // data type for hash key
  int32_t      size:24;    // number of result set
  int32_t      capacity;   // max capacity
  int32_t      curIndex;   // current start active index
  int64_t      prevSKey;   // previous (not completed) sliding window start key
H
Haojun Liao 已提交
123
} SResultRowInfo;
124 125 126 127 128 129 130 131

typedef struct SColumnFilterElem {
  int16_t           bytes;  // column length
  __filter_func_t   fp;
  SColumnFilterInfo filterInfo;
} SColumnFilterElem;

typedef struct SSingleColumnFilterInfo {
H
Haojun Liao 已提交
132
  void*              pData;
133
  int32_t            numOfFilters;
H
Haojun Liao 已提交
134
  SColumnInfo        info;
135 136 137
  SColumnFilterElem* pFilters;
} SSingleColumnFilterInfo;

H
Haojun Liao 已提交
138
typedef struct STableQueryInfo {
H
hjxilinx 已提交
139
  TSKEY       lastKey;
H
Haojun Liao 已提交
140
  int32_t     groupIndex;     // group id in table list
141
  int16_t     queryRangeSet;  // denote if the query range is set, only available for interval query
142
  tVariant    tag;
H
hjxilinx 已提交
143
  STimeWindow win;
144
  STSCursor   cur;
H
Haojun Liao 已提交
145
  void*       pTable;         // for retrieve the page id list
H
Haojun Liao 已提交
146
  SResultRowInfo resInfo;
147 148
} STableQueryInfo;

H
Haojun Liao 已提交
149 150 151 152 153 154 155 156 157
typedef struct SQueryCostInfo {
  uint64_t loadStatisTime;
  uint64_t loadFileBlockTime;
  uint64_t loadDataInCacheTime;
  uint64_t loadStatisSize;
  uint64_t loadFileBlockSize;
  uint64_t loadDataInCacheSize;
  
  uint64_t loadDataTime;
158 159 160 161
  uint64_t totalRows;
  uint64_t totalCheckedRows;
  uint32_t totalBlocks;
  uint32_t loadBlocks;
H
Haojun Liao 已提交
162 163
  uint32_t loadBlockStatis;
  uint32_t discardBlocks;
164
  uint64_t elapsedTime;
H
Haojun Liao 已提交
165
  uint64_t firstStageMergeTime;
H
Haojun Liao 已提交
166 167
  uint64_t winInfoSize;
  uint64_t tableInfoSize;
H
Haojun Liao 已提交
168
  uint64_t hashSize;
169
  uint64_t numOfTimeWindows;
H
Haojun Liao 已提交
170
} SQueryCostInfo;
H
hjxilinx 已提交
171

H
Haojun Liao 已提交
172 173 174 175 176
typedef struct {
  int64_t vgroupLimit;
  int64_t ts;
} SOrderedPrjQueryInfo;

177 178 179 180 181
typedef struct {
  char*   tags;
  SArray* pResult;  // SArray<SStddevInterResult>
} SInterResult;

182
typedef struct SQuery {
H
Haojun Liao 已提交
183 184 185 186
  int16_t          numOfCols;
  int16_t          numOfTags;
  SOrderVal        order;
  STimeWindow      window;
187
  SInterval        interval;
188
  int16_t          precision;
H
Haojun Liao 已提交
189 190
  int16_t          numOfOutput;
  int16_t          fillType;
191
  int16_t          checkResultBuf;   // check if the buffer is full during scan each block
H
Haojun Liao 已提交
192
  SLimitVal        limit;
193 194 195 196 197 198

  int32_t          srcRowSize;       // todo extract struct
  int32_t          resultRowSize;
  int32_t          maxSrcColumnSize;
  int32_t          tagLen;           // tag value length of current query

H
Haojun Liao 已提交
199
  SSqlGroupbyExpr* pGroupbyExpr;
H
Haojun Liao 已提交
200
  SExprInfo*       pExpr1;
H
Haojun Liao 已提交
201 202 203
  SExprInfo*       pExpr2;
  int32_t          numOfExpr2;

H
Haojun Liao 已提交
204 205 206 207
  SColumnInfo*     colList;
  SColumnInfo*     tagColList;
  int32_t          numOfFilterCols;
  int64_t*         fillVal;
208
  uint32_t         status;             // query status
H
Haojun Liao 已提交
209 210 211 212
  SResultRec       rec;
  int32_t          pos;
  tFilePage**      sdata;
  STableQueryInfo* current;
213
  int32_t          numOfCheckedBlocks; // number of check data blocks
H
Haojun Liao 已提交
214

215
  SOrderedPrjQueryInfo prjInfo;        // limit value for each vgroup, only available in global order projection query.
216 217 218 219
  SSingleColumnFilterInfo* pFilterInfo;
} SQuery;

typedef struct SQueryRuntimeEnv {
H
Haojun Liao 已提交
220
  jmp_buf              env;
H
Haojun Liao 已提交
221 222
  SQuery*              pQuery;
  SQLFunctionCtx*      pCtx;
223
  int32_t              numOfRowsPerPage;
H
Haojun Liao 已提交
224
  uint16_t*            offset;
H
Haojun Liao 已提交
225
  uint16_t             scanFlag;         // denotes reversed scan of data or not
H
Haojun Liao 已提交
226
  SFillInfo*           pFillInfo;
H
Haojun Liao 已提交
227
  SResultRowInfo       resultRowInfo;
228

229
  SQueryCostInfo       summary;
H
Haojun Liao 已提交
230 231
  void*                pQueryHandle;
  void*                pSecQueryHandle;  // another thread for
H
Haojun Liao 已提交
232
  bool                 stableQuery;      // super table query or not
233
  bool                 topBotQuery;      // TODO used bitwise flag
234
  bool                 groupbyColumn;    // denote if this is a groupby normal column query
H
Haojun Liao 已提交
235
  bool                 hasTagResults;    // if there are tag values in final result or not
236
  bool                 timeWindowInterpo;// if the time window start/end required interpolation
237
  bool                 queryWindowIdentical; // all query time windows are identical for all tables in one group
238 239
  bool                 queryBlockDist;    // if query data block distribution
  bool                 stabledev;        // super table stddev query
H
Haojun Liao 已提交
240
  int32_t              interBufSize;     // intermediate buffer sizse
H
Haojun Liao 已提交
241
  int32_t              prevGroupId;      // previous executed group id
H
Haojun Liao 已提交
242
  SDiskbasedResultBuf* pResultBuf;       // query result buffer based on blocked-wised disk file
H
Haojun Liao 已提交
243
  SHashObj*            pResultRowHashTable; // quick locate the window object for each result
H
Haojun Liao 已提交
244
  char*                keyBuf;           // window key buffer
245
  SResultRowPool*      pool;             // window result object pool
H
Haojun Liao 已提交
246 247

  int32_t*             rowCellInfoOffset;// offset value for each row result cell info
248
  char**               prevRow;
249

250 251 252 253 254
  SArray*              prevResult;       // intermediate result, SArray<SInterResult>
  STSBuf*              pTsBuf;           // timestamp filter list
  STSCursor            cur;

  char*                tagVal;           // tag value of current data block
255
  SArithmeticSupport  *sasArray;
256 257
} SQueryRuntimeEnv;

258 259 260 261 262
enum {
  QUERY_RESULT_NOT_READY = 1,
  QUERY_RESULT_READY     = 2,
};

263
typedef struct SQInfo {
H
Haojun Liao 已提交
264
  void*            signature;
H
Haojun Liao 已提交
265
  int32_t          code;   // error code to returned to client
266
  int64_t          owner;  // if it is in execution
H
Haojun Liao 已提交
267
  void*            tsdb;
Y
TD-1733  
yihaoDeng 已提交
268
  SMemRef          memRef; 
H
Haojun Liao 已提交
269
  int32_t          vgId;
H
Haojun Liao 已提交
270
  STableGroupInfo  tableGroupInfo;       // table <tid, last_key> list  SArray<STableKeyInfo>
271
  STableGroupInfo  tableqinfoGroupInfo;  // this is a group array list, including SArray<STableQueryInfo*> structure
272
  SQueryRuntimeEnv runtimeEnv;
273
  SHashObj*        arrTableIdInfo;
H
Haojun Liao 已提交
274
  int32_t          groupIndex;
H
Haojun Liao 已提交
275

276 277 278 279
  /*
   * the query is executed position on which meter of the whole list.
   * when the index reaches the last one of the list, it means the query is completed.
   */
H
Haojun Liao 已提交
280
  int32_t          tableIndex;
H
Haojun Liao 已提交
281
  SGroupResInfo    groupResInfo;
282
  void*            pBuf;        // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
H
Haojun Liao 已提交
283

284
  pthread_mutex_t  lock;        // used to synchronize the rsp/query threads
H
Haojun Liao 已提交
285
  tsem_t           ready;
286 287
  int32_t          dataReady;   // denote if query result is ready or not
  void*            rspContext;  // response context
288
  int64_t          startExecTs; // start to exec timestamp
289
  char*            sql;         // query sql string
290 291
} SQInfo;

H
Haojun Liao 已提交
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
typedef struct SQueryParam {
  char            *sql;
  char            *tagCond;
  char            *tbnameCond;
  char            *prevResult;
  SArray          *pTableIdList;
  SSqlFuncMsg    **pExprMsg;
  SSqlFuncMsg    **pSecExprMsg;
  SExprInfo       *pExprs;
  SExprInfo       *pSecExprs;

  SColIndex       *pGroupColIndex;
  SColumnInfo     *pTagColumnInfo;
  SSqlGroupbyExpr *pGroupbyExpr;
} SQueryParam;

void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
                                   SColumnInfo* pTagCols);
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
                        SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql);
int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);

bool isQueryKilled(SQInfo *pQInfo);
int32_t checkForQueryBuf(size_t numOfTables);
bool doBuildResCheck(SQInfo* pQInfo);
void setQueryStatus(SQuery *pQuery, int8_t status);

bool onlyQueryTags(SQuery* pQuery);
void buildTagQueryResult(SQInfo *pQInfo);
void stableQueryImpl(SQInfo *pQInfo);
void buildTableBlockDistResult(SQInfo *pQInfo);
void tableQueryImpl(SQInfo *pQInfo);
bool isValidQInfo(void *param);

int32_t doDumpQueryResult(SQInfo *pQInfo, char *data);

size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo);
void queryCostStatis(SQInfo *pQInfo);
void freeQInfo(SQInfo *pQInfo);

int32_t getMaximumIdleDurationSec();

339
#endif  // TDENGINE_QUERYEXECUTOR_H