queryExecutor.h 7.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef TDENGINE_QUERYEXECUTOR_H
#define TDENGINE_QUERYEXECUTOR_H

#include "os.h"

#include "hash.h"
#include "qinterpolation.h"
#include "qresultBuf.h"
#include "qsqlparser.h"
#include "qtsbuf.h"
#include "taosdef.h"
#include "tref.h"
#include "tsqlfunction.h"
H
hjxilinx 已提交
28
#include "tarray.h"
29 30 31 32 33 34 35

typedef struct SData {
  int32_t num;
  char    data[];
} SData;

enum {
H
hjxilinx 已提交
36
//  ST_QUERY_KILLED = 0,     // query killed
37 38 39 40 41 42
  ST_QUERY_PAUSED = 1,     // query paused, due to full of the response buffer
  ST_QUERY_COMPLETED = 2,  // query completed
};

struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
H
hjxilinx 已提交
43
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144

typedef struct SSqlGroupbyExpr {
  int16_t     tableIndex;
  int16_t     numOfGroupCols;
  SColIndexEx columnInfo[TSDB_MAX_TAGS];  // group by columns information
  int16_t     orderIndex;                 // order by column index
  int16_t     orderType;                  // order by type: asc/desc
} SSqlGroupbyExpr;

typedef struct SPosInfo {
  int16_t pageId;
  int16_t rowId;
} SPosInfo;

typedef struct SWindowStatus {
  bool closed;
} SWindowStatus;

typedef struct SWindowResult {
  uint16_t      numOfRows;
  SPosInfo      pos;         // Position of current result in disk-based output buffer
  SResultInfo*  resultInfo;  // For each result column, there is a resultInfo
  STimeWindow   window;      // The time window that current result covers.
  SWindowStatus status;
} SWindowResult;

typedef struct SResultRec {
  int64_t pointsTotal;
  int64_t pointsRead;
} SResultRec;

typedef struct SWindowResInfo {
  SWindowResult* pResult;    // result list
  void*          hashList;   // hash list for quick access
  int16_t        type;       // data type for hash key
  int32_t        capacity;   // max capacity
  int32_t        curIndex;   // current start active index
  int32_t        size;       // number of result set
  int64_t        startTime;  // start time of the first time window for sliding query
  int64_t        prevSKey;   // previous (not completed) sliding window start key
  int64_t        threshold;  // threshold to pausing query and return closed results.
} SWindowResInfo;

typedef struct SColumnFilterElem {
  int16_t           bytes;  // column length
  __filter_func_t   fp;
  SColumnFilterInfo filterInfo;
} SColumnFilterElem;

typedef struct SSingleColumnFilterInfo {
  SColumnInfoEx      info;
  int32_t            numOfFilters;
  SColumnFilterElem* pFilters;
  void*              pData;
} SSingleColumnFilterInfo;

/* intermediate pos during multimeter query involves interval */
typedef struct STableQueryInfo {
  int64_t     lastKey;
  STimeWindow win;
  int32_t     numOfRes;
  int16_t     queryRangeSet;  // denote if the query range is set, only available for interval query
  int64_t     tag;
  STSCursor   cur;
  int32_t     sid;  // for retrieve the page id list

  SWindowResInfo windowResInfo;
} STableQueryInfo;

typedef struct STableDataInfo {
  int32_t          numOfBlocks;
  int32_t          start;  // start block index
  int32_t          tableIndex;
  void*            pMeterObj;
  int32_t          groupIdx;  // group id in table list
  STableQueryInfo* pTableQInfo;
} STableDataInfo;

typedef struct SQuery {
  int16_t           numOfCols;
  SOrderVal         order;
  STimeWindow       window;
  int64_t           intervalTime;
  int64_t           slidingTime;      // sliding time for sliding window query
  char              slidingTimeUnit;  // interval data type, used for daytime revise
  int8_t            precision;
  int16_t           numOfOutputCols;
  int16_t           interpoType;
  int16_t           checkBufferInLoop;  // check if the buffer is full during scan each block
  SLimitVal         limit;
  int32_t           rowSize;
  SSqlGroupbyExpr*  pGroupbyExpr;
  SSqlFunctionExpr* pSelectExpr;
  SColumnInfoEx*    colList;
  int32_t           numOfFilterCols;
  int64_t*          defaultVal;
  TSKEY             lastKey;
  uint32_t          status;  // query status
  SResultRec        rec;
  int32_t           pos;
  int64_t           pointsOffset;  // the number of points offset to save read data
H
hjxilinx 已提交
145 146
  SData**           sdata;
  int32_t           capacity;
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
  SSingleColumnFilterInfo* pFilterInfo;
} SQuery;

typedef struct SQueryCostSummary {
} SQueryCostSummary;

typedef struct SQueryRuntimeEnv {
  SResultInfo*       resultInfo;  // todo refactor to merge with SWindowResInfo
  SQuery*            pQuery;
  SData**            pInterpoBuf;
  SQLFunctionCtx*    pCtx;
  int16_t            numOfRowsPerPage;
  int16_t            offset[TSDB_MAX_COLUMNS];
  uint16_t           scanFlag;  // denotes reversed scan of data or not
  SInterpolationInfo interpoInfo;
  SWindowResInfo     windowResInfo;
  STSBuf*            pTSBuf;
  STSCursor          cur;
  SQueryCostSummary  summary;
  bool               stableQuery;  // super table query or not
  void*              pQueryHandle;

  SDiskbasedResultBuf* pResultBuf;  // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv;

typedef struct SQInfo {
H
hjxilinx 已提交
173
  void*            signature;
174
  void*            pVnode;
175
  TSKEY            startTime;
H
hjxilinx 已提交
176
  TSKEY            elapsedTime;
177
  SResultRec       rec;
H
hjxilinx 已提交
178 179
  int32_t          pointsInterpo;
  int32_t          code;   // error code to returned to client
H
hjxilinx 已提交
180
//  int32_t          killed; // denotes if current query is killed
181
  sem_t            dataReady;
H
hjxilinx 已提交
182
  SArray*          pTableIdList;  // table list
183 184 185
  SQueryRuntimeEnv runtimeEnv;
  int32_t          subgroupIdx;
  int32_t          offset; /* offset in group result set of subgroup */
H
hjxilinx 已提交
186
  
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
  T_REF_DECLARE()
  /*
   * the query is executed position on which meter of the whole list.
   * when the index reaches the last one of the list, it means the query is completed.
   * We later may refactor to remove this attribution by using another flag to denote
   * whether a multimeter query is completed or not.
   */
  int32_t         tableIndex;
  int32_t         numOfGroupResultPages;
  STableDataInfo* pTableDataInfo;
  TSKEY*          tsList;
} SQInfo;

/**
 * create the qinfo object before adding the query task to each tsdb query worker
 *
 * @param pReadMsg
 * @param pQInfo
 * @return
 */
207
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
208 209 210 211 212

/**
 * query on single table
 * @param pReadMsg
 */
H
hjxilinx 已提交
213
void qTableQuery(SQInfo* pQInfo);
214 215 216 217 218 219 220

/**
 * query on super table
 * @param pReadMsg
 */
void qSuperTableQuery(void* pReadMsg);

H
hjxilinx 已提交
221 222 223 224 225 226
/**
 * wait for the query completed, and retrieve final results to client
 * @param pQInfo
 */
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize);

227 228 229 230 231 232 233
/**
 *
 * @param pQInfo
 * @param pRsp
 * @return
 */
int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
H
hjxilinx 已提交
234

235
#endif  // TDENGINE_QUERYEXECUTOR_H