queryExecutor.h 7.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef TDENGINE_QUERYEXECUTOR_H
#define TDENGINE_QUERYEXECUTOR_H

#include "os.h"

#include "hash.h"
#include "qinterpolation.h"
#include "qresultBuf.h"
#include "qsqlparser.h"
#include "qtsbuf.h"
#include "taosdef.h"
#include "tref.h"
#include "tsqlfunction.h"
H
hjxilinx 已提交
28
#include "tarray.h"
29 30 31 32 33 34 35

typedef struct SData {
  int32_t num;
  char    data[];
} SData;

enum {
H
hjxilinx 已提交
36
//  ST_QUERY_KILLED = 0,     // query killed
37 38 39 40 41 42
  ST_QUERY_PAUSED = 1,     // query paused, due to full of the response buffer
  ST_QUERY_COMPLETED = 2,  // query completed
};

struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
H
hjxilinx 已提交
43
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70

typedef struct SSqlGroupbyExpr {
  int16_t     tableIndex;
  int16_t     numOfGroupCols;
  SColIndexEx columnInfo[TSDB_MAX_TAGS];  // group by columns information
  int16_t     orderIndex;                 // order by column index
  int16_t     orderType;                  // order by type: asc/desc
} SSqlGroupbyExpr;

typedef struct SPosInfo {
  int16_t pageId;
  int16_t rowId;
} SPosInfo;

typedef struct SWindowStatus {
  bool closed;
} SWindowStatus;

typedef struct SWindowResult {
  uint16_t      numOfRows;
  SPosInfo      pos;         // Position of current result in disk-based output buffer
  SResultInfo*  resultInfo;  // For each result column, there is a resultInfo
  STimeWindow   window;      // The time window that current result covers.
  SWindowStatus status;
} SWindowResult;

typedef struct SResultRec {
H
hjxilinx 已提交
71 72 73 74
  int64_t total;
  int64_t size;
  int64_t capacity;
  int32_t threshold;   // the threshold size, when the number of rows in result buffer, return to client
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
} SResultRec;

typedef struct SWindowResInfo {
  SWindowResult* pResult;    // result list
  void*          hashList;   // hash list for quick access
  int16_t        type;       // data type for hash key
  int32_t        capacity;   // max capacity
  int32_t        curIndex;   // current start active index
  int32_t        size;       // number of result set
  int64_t        startTime;  // start time of the first time window for sliding query
  int64_t        prevSKey;   // previous (not completed) sliding window start key
  int64_t        threshold;  // threshold to pausing query and return closed results.
} SWindowResInfo;

typedef struct SColumnFilterElem {
  int16_t           bytes;  // column length
  __filter_func_t   fp;
  SColumnFilterInfo filterInfo;
} SColumnFilterElem;

typedef struct SSingleColumnFilterInfo {
  SColumnInfoEx      info;
  int32_t            numOfFilters;
  SColumnFilterElem* pFilters;
  void*              pData;
} SSingleColumnFilterInfo;

/* intermediate pos during multimeter query involves interval */
typedef struct STableQueryInfo {
  int64_t     lastKey;
  STimeWindow win;
  int32_t     numOfRes;
  int16_t     queryRangeSet;  // denote if the query range is set, only available for interval query
  int64_t     tag;
  STSCursor   cur;
  int32_t     sid;  // for retrieve the page id list

  SWindowResInfo windowResInfo;
} STableQueryInfo;

typedef struct STableDataInfo {
  int32_t          numOfBlocks;
H
hjxilinx 已提交
117
  int32_t          start;     // start block index
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
  int32_t          tableIndex;
  void*            pMeterObj;
  int32_t          groupIdx;  // group id in table list
  STableQueryInfo* pTableQInfo;
} STableDataInfo;

typedef struct SQuery {
  int16_t           numOfCols;
  SOrderVal         order;
  STimeWindow       window;
  int64_t           intervalTime;
  int64_t           slidingTime;      // sliding time for sliding window query
  char              slidingTimeUnit;  // interval data type, used for daytime revise
  int8_t            precision;
  int16_t           numOfOutputCols;
  int16_t           interpoType;
  int16_t           checkBufferInLoop;  // check if the buffer is full during scan each block
  SLimitVal         limit;
  int32_t           rowSize;
  SSqlGroupbyExpr*  pGroupbyExpr;
  SSqlFunctionExpr* pSelectExpr;
  SColumnInfoEx*    colList;
  int32_t           numOfFilterCols;
  int64_t*          defaultVal;
  TSKEY             lastKey;
  uint32_t          status;  // query status
  SResultRec        rec;
  int32_t           pos;
  int64_t           pointsOffset;  // the number of points offset to save read data
H
hjxilinx 已提交
147
  SData**           sdata;
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
  SSingleColumnFilterInfo* pFilterInfo;
} SQuery;

typedef struct SQueryCostSummary {
} SQueryCostSummary;

typedef struct SQueryRuntimeEnv {
  SResultInfo*       resultInfo;  // todo refactor to merge with SWindowResInfo
  SQuery*            pQuery;
  SData**            pInterpoBuf;
  SQLFunctionCtx*    pCtx;
  int16_t            numOfRowsPerPage;
  int16_t            offset[TSDB_MAX_COLUMNS];
  uint16_t           scanFlag;  // denotes reversed scan of data or not
  SInterpolationInfo interpoInfo;
  SWindowResInfo     windowResInfo;
  STSBuf*            pTSBuf;
  STSCursor          cur;
  SQueryCostSummary  summary;
  bool               stableQuery;  // super table query or not
  void*              pQueryHandle;

  SDiskbasedResultBuf* pResultBuf;  // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv;

typedef struct SQInfo {
H
hjxilinx 已提交
174
  void*            signature;
H
[TD-32]  
hjxilinx 已提交
175
//  void*            param;         // pointer to the RpcReadMsg
176
  TSKEY            startTime;
H
hjxilinx 已提交
177
  TSKEY            elapsedTime;
H
hjxilinx 已提交
178
  int32_t          pointsInterpo;
H
hjxilinx 已提交
179
  int32_t          code;          // error code to returned to client
180
  sem_t            dataReady;
H
hjxilinx 已提交
181
  SArray*          pTableIdList;  // table id list
182 183 184
  SQueryRuntimeEnv runtimeEnv;
  int32_t          subgroupIdx;
  int32_t          offset; /* offset in group result set of subgroup */
H
hjxilinx 已提交
185
  
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
  T_REF_DECLARE()
  /*
   * the query is executed position on which meter of the whole list.
   * when the index reaches the last one of the list, it means the query is completed.
   * We later may refactor to remove this attribution by using another flag to denote
   * whether a multimeter query is completed or not.
   */
  int32_t         tableIndex;
  int32_t         numOfGroupResultPages;
  STableDataInfo* pTableDataInfo;
  TSKEY*          tsList;
} SQInfo;

/**
 * create the qinfo object before adding the query task to each tsdb query worker
 *
 * @param pReadMsg
 * @param pQInfo
 * @return
 */
H
hjxilinx 已提交
206
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, void* param, SQInfo** pQInfo);
207 208 209 210 211

/**
 * query on single table
 * @param pReadMsg
 */
H
hjxilinx 已提交
212
void qTableQuery(SQInfo* pQInfo);
213 214 215 216 217 218 219

/**
 * query on super table
 * @param pReadMsg
 */
void qSuperTableQuery(void* pReadMsg);

H
hjxilinx 已提交
220 221 222 223
/**
 * wait for the query completed, and retrieve final results to client
 * @param pQInfo
 */
H
hjxilinx 已提交
224
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo);
H
hjxilinx 已提交
225

226 227 228 229 230 231 232
/**
 *
 * @param pQInfo
 * @param pRsp
 * @return
 */
int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
H
hjxilinx 已提交
233

H
hjxilinx 已提交
234 235 236 237 238
/**
 *
 * @param pQInfo
 * @return
 */
H
[TD-32]  
hjxilinx 已提交
239
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo);
H
hjxilinx 已提交
240

241
#endif  // TDENGINE_QUERYEXECUTOR_H