vnodeQueryImpl.h 11.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16 17
#ifndef TDENGINE_VNODEQUERYIMPL_H
#define TDENGINE_VNODEQUERYIMPL_H
H
hzcheng 已提交
18 19 20 21 22

#ifdef __cplusplus
extern "C" {
#endif

23
#include "os.h"
H
hzcheng 已提交
24

25 26
#include "hash.h"
#include "hashutil.h"
H
hzcheng 已提交
27

H
hjxilinx 已提交
28
#define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, query))
H
hzcheng 已提交
29 30
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0)

H
hjxilinx 已提交
31 32 33 34 35
/*
 * set the output buffer page size is 16k
 * The page size should be sufficient for at least one output result or intermediate result.
 * Some intermediate results may be extremely large, such as top/bottom(100) query.
 */
H
hjxilinx 已提交
36
#define DEFAULT_INTERN_BUF_SIZE 16384L
H
hjxilinx 已提交
37

H
hjxilinx 已提交
38 39
#define INIT_ALLOCATE_DISK_PAGES 60L
#define DEFAULT_DATA_FILE_MAPPING_PAGES 2L
H
hzcheng 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
#define DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE (DEFAULT_DATA_FILE_MAPPING_PAGES * DEFAULT_INTERN_BUF_SIZE)

#define IO_ENGINE_MMAP 0
#define IO_ENGINE_SYNC 1

#define DEFAULT_IO_ENGINE IO_ENGINE_SYNC

/**
 * check if the primary column is load by default, otherwise, the program will
 * forced to load primary column explicitly.
 */
#define PRIMARY_TSCOL_LOADED(query) ((query)->colList[0].data.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)

typedef enum {

  /*
   * the program will call this function again, if this status is set.
   * used to transfer from QUERY_RESBUF_FULL
   */
59
  QUERY_NOT_COMPLETED = 0x1u,
H
hzcheng 已提交
60 61 62 63 64 65 66

  /*
   * output buffer is full, so, the next query will be employed,
   * in this case, we need to set the appropriated start scan point for
   * the next query.
   *
   * this status is only exist in group-by clause and
H
hjxilinx 已提交
67
   * diff/add/division/multiply/ query.
H
hzcheng 已提交
68
   */
69
  QUERY_RESBUF_FULL = 0x2u,
H
hzcheng 已提交
70 71 72 73 74 75 76 77 78

  /*
   * query is over
   * 1. this status is used in one row result query process, e.g.,
   * count/sum/first/last/
   * avg...etc.
   * 2. when the query range on timestamp is satisfied, it is also denoted as
   * query_compeleted
   */
79
  QUERY_COMPLETED = 0x4u,
H
hzcheng 已提交
80 81 82 83 84

  /*
   * all data has been scanned, so current search is stopped,
   * At last, the function will transfer this status to QUERY_COMPLETED
   */
85
  QUERY_NO_DATA_TO_CHECK = 0x8u,
H
hzcheng 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
} vnodeQueryStatus;

typedef struct SPointInterpoSupporter {
  int32_t numOfCols;
  char**  pPrevPoint;
  char**  pNextPoint;
} SPointInterpoSupporter;

typedef struct SBlockInfo {
  TSKEY   keyFirst;
  TSKEY   keyLast;
  int32_t numOfCols;
  int32_t size;
} SBlockInfo;

typedef struct SMeterDataBlockInfoEx {
  SCompBlockFields pBlock;
  SMeterDataInfo*  pMeterDataInfo;
  int32_t          blockIndex;
  int32_t          groupIdx; /* number of group is less than the total number of meters */
} SMeterDataBlockInfoEx;

typedef enum {
  DISK_DATA_LOAD_FAILED = -0x1,
  DISK_DATA_LOADED = 0x0,
  DISK_DATA_DISCARDED = 0x01,
} vnodeDiskLoadStatus;

H
hjxilinx 已提交
114 115
#define IS_MASTER_SCAN(runtime) (((runtime)->scanFlag & 1u) == MASTER_SCAN)
#define IS_SUPPLEMENT_SCAN(runtime) ((runtime)->scanFlag == SUPPLEMENTARY_SCAN)
H
hzcheng 已提交
116 117 118 119 120 121
#define SET_SUPPLEMENT_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN)
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)

typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order);

static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) {
H
hjxilinx 已提交
122
  return *(SMeterObj**)taosGetDataFromHashTable(hashHandle, (const char*)&sid, sizeof(sid));
H
hzcheng 已提交
123 124 125 126 127
}

bool isQueryKilled(SQuery* pQuery);
bool isFixedOutputQuery(SQuery* pQuery);
bool isPointInterpoQuery(SQuery* pQuery);
L
lihui 已提交
128
bool isSumAvgRateQuery(SQuery *pQuery);
H
hzcheng 已提交
129 130
bool isTopBottomQuery(SQuery* pQuery);
bool isFirstLastRowQuery(SQuery* pQuery);
S
slguan 已提交
131
bool isTSCompQuery(SQuery* pQuery);
H
hjxilinx 已提交
132
bool notHasQueryTimeRange(SQuery* pQuery);
H
hzcheng 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150

bool needSupplementaryScan(SQuery* pQuery);
bool onDemandLoadDatablock(SQuery* pQuery, int16_t queryRangeSet);

void setQueryStatus(SQuery* pQuery, int8_t status);

bool doRevisedResultsByLimit(SQInfo* pQInfo);
void truncateResultByLimit(SQInfo* pQInfo, int64_t* final, int32_t* interpo);

void initCtxOutputBuf(SQueryRuntimeEnv* pRuntimeEnv);
void resetCtxOutputBuf(SQueryRuntimeEnv* pRuntimeEnv);
void forwardCtxOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, int64_t output);

bool needPrimaryTimestampCol(SQuery* pQuery, SBlockInfo* pBlockInfo);
void vnodeScanAllData(SQueryRuntimeEnv* pRuntimeEnv);

int32_t vnodeQueryResultInterpolate(SQInfo* pQInfo, tFilePage** pDst, tFilePage** pDataSrc, int32_t numOfRows,
                                    int32_t* numOfInterpo);
H
hjxilinx 已提交
151
void    copyResToQueryResultBuf(STableQuerySupportObj* pSupporter, SQuery* pQuery);
H
hzcheng 已提交
152

H
hjxilinx 已提交
153 154
void    doSkipResults(SQueryRuntimeEnv* pRuntimeEnv);
void    doFinalizeResult(SQueryRuntimeEnv* pRuntimeEnv);
H
hzcheng 已提交
155 156 157 158
int64_t getNumOfResult(SQueryRuntimeEnv* pRuntimeEnv);

void forwardQueryStartPosition(SQueryRuntimeEnv* pRuntimeEnv);

H
hjxilinx 已提交
159
bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, STableQuerySupportObj* pSupporter,
H
hjxilinx 已提交
160
                               SPointInterpoSupporter* pPointInterpSupporter, int64_t* key);
H
hzcheng 已提交
161 162 163 164 165 166

void pointInterpSupporterInit(SQuery* pQuery, SPointInterpoSupporter* pInterpoSupport);
void pointInterpSupporterDestroy(SPointInterpoSupporter* pPointInterpSupport);
void pointInterpSupporterSetData(SQInfo* pQInfo, SPointInterpoSupporter* pPointInterpSupport);

int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv* pRuntimeEnv, SPositionInfo* position);
H
hjxilinx 已提交
167 168 169
int32_t doCloseAllOpenedResults(STableQuerySupportObj* pSupporter);
void    disableFunctForSuppleScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
void    enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
H
hzcheng 已提交
170

H
hjxilinx 已提交
171
int32_t mergeMetersResultToOneGroups(STableQuerySupportObj* pSupporter);
172
void    copyFromWindowResToSData(SQInfo* pQInfo, SWindowResult* result);
H
hzcheng 已提交
173

H
hjxilinx 已提交
174 175
SBlockInfo getBlockInfo(SQueryRuntimeEnv *pRuntimeEnv);
SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void* pBlock, int32_t type);
H
hzcheng 已提交
176

H
hjxilinx 已提交
177
SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot);
H
hzcheng 已提交
178

H
hjxilinx 已提交
179
void stableApplyFunctionsOnBlock(STableQuerySupportObj* pSupporter, SMeterDataInfo* pMeterDataInfo,
H
hjxilinx 已提交
180 181 182 183
                               SBlockInfo* pBlockInfo, SField* pFields, __block_search_fn_t searchFn);

int32_t vnodeFilterQualifiedMeters(SQInfo* pQInfo, int32_t vid, tSidSet* pSidSet, SMeterDataInfo* pMeterDataInfo,
                                   int32_t* numOfMeters, SMeterDataInfo*** pReqMeterDataInfo);
184
int32_t vnodeGetVnodeHeaderFileIndex(int32_t* fid, SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
H
hzcheng 已提交
185 186 187 188

int32_t createDataBlocksInfoEx(SMeterDataInfo** pMeterDataInfo, int32_t numOfMeters,
                               SMeterDataBlockInfoEx** pDataBlockInfoEx, int32_t numOfCompBlocks,
                               int32_t* nAllocBlocksInfoSize, int64_t addr);
H
hjxilinx 已提交
189
void    freeMeterBlockInfoEx(SMeterDataBlockInfoEx* pDataBlockInfoEx, int32_t len);
H
hzcheng 已提交
190

H
hjxilinx 已提交
191 192 193
void    setExecutionContext(STableQuerySupportObj* pSupporter, SMeterQueryInfo* pMeterQueryInfo, int32_t meterIdx,
                            int32_t groupIdx, TSKEY nextKey);
int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, SMeterQueryInfo *pMeterQueryInfo);
H
hjxilinx 已提交
194 195
void    doGetAlignedIntervalQueryRangeImpl(SQuery* pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast,
                                           int64_t* actualSkey, int64_t* actualEkey, int64_t* skey, int64_t* ekey);
H
hzcheng 已提交
196

H
hjxilinx 已提交
197
int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slot, int32_t* pos, bool ignoreQueryRange);
H
hzcheng 已提交
198

H
hjxilinx 已提交
199 200
int32_t getDataBlocksForMeters(STableQuerySupportObj* pSupporter, SQuery* pQuery, int32_t numOfMeters,
                               const char* filePath, SMeterDataInfo** pMeterDataInfo, uint32_t* numOfBlocks);
H
hjxilinx 已提交
201
int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, uint8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv,
H
hzcheng 已提交
202
                              int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand);
H
hjxilinx 已提交
203
int32_t vnodeGetHeaderFile(SQueryRuntimeEnv* pRuntimeEnv, int32_t fileIndex);
H
hzcheng 已提交
204

S
slguan 已提交
205 206 207 208 209 210 211 212
/**
 * Create SMeterQueryInfo.
 * The MeterQueryInfo is created one for each table during super table query
 *
 * @param skey
 * @param ekey
 * @return
 */
H
hjxilinx 已提交
213
SMeterQueryInfo* createMeterQueryInfo(STableQuerySupportObj* pSupporter, int32_t sid, TSKEY skey, TSKEY ekey);
S
slguan 已提交
214 215 216 217 218 219

/**
 * Destroy meter query info
 * @param pMeterQInfo
 * @param numOfCols
 */
H
hjxilinx 已提交
220
void destroyMeterQueryInfo(SMeterQueryInfo* pMeterQueryInfo, int32_t numOfCols);
S
slguan 已提交
221 222 223 224 225 226 227

/**
 * change the meter query info for supplement scan
 * @param pMeterQueryInfo
 * @param skey
 * @param ekey
 */
H
hjxilinx 已提交
228
void changeMeterQueryInfoForSuppleQuery(SQuery* pQuery, SMeterQueryInfo* pMeterQueryInfo,
H
hjxilinx 已提交
229
                                        TSKEY skey, TSKEY ekey);
S
slguan 已提交
230 231 232 233

/**
 * add the new allocated disk page to meter query info
 * the new allocated disk page is used to keep the intermediate (interval) results
H
hjxilinx 已提交
234
 * @param pQuery
S
slguan 已提交
235 236 237
 * @param pMeterQueryInfo
 * @param pSupporter
 */
H
hjxilinx 已提交
238 239
tFilePage* addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo* pMeterQueryInfo,
                                        STableQuerySupportObj* pSupporter);
S
slguan 已提交
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262

/**
 * save the query range data into SMeterQueryInfo
 * @param pRuntimeEnv
 * @param pMeterQueryInfo
 */
void saveIntervalQueryRange(SQueryRuntimeEnv* pRuntimeEnv, SMeterQueryInfo* pMeterQueryInfo);

/**
 * restore the query range data from SMeterQueryInfo to runtime environment
 *
 * @param pRuntimeEnv
 * @param pMeterQueryInfo
 */
void restoreIntervalQueryRange(SQueryRuntimeEnv* pRuntimeEnv, SMeterQueryInfo* pMeterQueryInfo);

/**
 * set the interval query range for the interval query, when handling a data(cache) block
 *
 * @param pMeterQueryInfo
 * @param pSupporter
 * @param key
 */
H
hjxilinx 已提交
263
void setIntervalQueryRange(SMeterQueryInfo* pMeterQueryInfo, STableQuerySupportObj* pSupporter, int64_t key);
S
slguan 已提交
264 265 266 267 268 269 270 271

/**
 * set the meter data information
 * @param pMeterDataInfo
 * @param pMeterObj current query meter object
 * @param meterIdx  meter index in the sid list
 * @param groupId  group index, which the meter is belonged to
 */
H
hzcheng 已提交
272 273 274 275 276 277 278 279
void setMeterDataInfo(SMeterDataInfo* pMeterDataInfo, SMeterObj* pMeterObj, int32_t meterIdx, int32_t groupId);

void vnodeSetTagValueInParam(tSidSet* pSidSet, SQueryRuntimeEnv* pRuntimeEnv, SMeterSidExtInfo* pMeterInfo);

void vnodeCheckIfDataExists(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj* pMeterObj, bool* dataInDisk, bool* dataInCache);

void displayInterResult(SData** pdata, SQuery* pQuery, int32_t numOfRows);

H
hjxilinx 已提交
280
void vnodePrintQueryStatistics(STableQuerySupportObj* pSupporter);
H
hzcheng 已提交
281

H
hjxilinx 已提交
282 283
void clearTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* pOneOutputRes);
void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, const SWindowResult* src);
H
hjxilinx 已提交
284

H
hjxilinx 已提交
285 286
int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t size,
                          int32_t threshold, int16_t type);
287

H
hjxilinx 已提交
288 289
void    cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv);
void    resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo);
290 291
void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num);

H
hjxilinx 已提交
292
void    clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv);
293
int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo);
H
hjxilinx 已提交
294 295
void    closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void    closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
S
slguan 已提交
296

H
hzcheng 已提交
297 298 299 300
#ifdef __cplusplus
}
#endif

301
#endif  // TDENGINE_VNODEQUERYIMPL_H