executil.h 6.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H

S
common  
Shengliang Guan 已提交
18
#include "tcommon.h"
19
#include "tbuffer.h"
H
Haojun Liao 已提交
20
#include "tpagedbuf.h"
21

H
Haojun Liao 已提交
22 23 24 25 26 27 28
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid)     \
  do {                                               \
    assert(sizeof(_uid) == sizeof(uint64_t));        \
    *(uint64_t *)(_k) = (_uid);                      \
    memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \
  } while (0)

W
fix bug  
wpan 已提交
29 30 31
#define SET_RES_EXT_WINDOW_KEY(_k, _ori, _len, _uid, _buf)             \
  do {                                                                 \
    assert(sizeof(_uid) == sizeof(uint64_t));                          \
W
fix bug  
wpan 已提交
32
    *(void **)(_k) = (_buf);                                             \
W
fix bug  
wpan 已提交
33 34 35 36 37
    *(uint64_t *)((_k) + POINTER_BYTES) = (_uid);                      \
    memcpy((_k) + POINTER_BYTES + sizeof(uint64_t), (_ori), (_len));   \
  } while (0)


H
Haojun Liao 已提交
38
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
W
fix bug  
wpan 已提交
39 40
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)

41
#define GET_TASKID(_t)  (((SExecTaskInfo*)(_t))->id.str)
H
Haojun Liao 已提交
42

H
Haojun Liao 已提交
43 44
#define curTimeWindowIndex(_winres)        ((_winres)->curIndex)

45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
struct SColumnFilterElem;

typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, const char* val1, const char* val2, int16_t type);

typedef struct SGroupResInfo {
  int32_t totalGroup;
  int32_t currentGroup;
  int32_t index;
  SArray* pRows;      // SArray<SResultRow*>
  bool    ordered;
  int32_t position;
} SGroupResInfo;

typedef struct SResultRow {
  int32_t       pageId;      // pageId & rowId is the position of current result in disk-based output buffer
  int32_t       offset:29;   // row index in buffer page
  bool          startInterp; // the time window start timestamp has done the interpolation already.
  bool          endInterp;   // the time window end timestamp has done the interpolation already.
  bool          closed;      // this result status: closed or opened
  uint32_t      numOfRows;   // number of rows of current time window
  struct SResultRowEntryInfo* pEntryInfo;  // For each result column, there is a resultInfo
  STimeWindow   win;
  char         *key;               // start key of current result row
} SResultRow;

typedef struct SResultRowInfo {
H
Haojun Liao 已提交
71
  SList* pRows;
72
  SResultRow** pResult;    // result list
73 74
//  int16_t      type:8;     // data type for hash key
  int32_t      size;       // number of result set
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
  int32_t      capacity;   // max capacity
  int32_t      curPos;     // current active result row index of pResult list
} SResultRowInfo;

typedef struct SResultRowPool {
  int32_t elemSize;
  int32_t blockSize;
  int32_t numOfElemPerBlock;

  struct {
    int32_t blockIndex;
    int32_t pos;
  } position;

  SArray* pData;    // SArray<void*>
} SResultRowPool;

H
Haojun Liao 已提交
92 93
struct STaskAttr;
struct STaskRuntimeEnv;
94 95
struct SUdfInfo;

H
Haojun Liao 已提交
96
int32_t getOutputInterResultBufSize(struct STaskAttr* pQueryAttr);
97

H
Haojun Liao 已提交
98
size_t  getResultRowSize(SArray* pExprInfo);
99
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size);
H
Haojun Liao 已提交
100
void    cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
101

H
Haojun Liao 已提交
102
void    resetResultRowInfo(struct STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo);
H
Haojun Liao 已提交
103 104
int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo);
void    closeAllResultRows(SResultRowInfo* pResultRowInfo);
105

H
Haojun Liao 已提交
106 107 108
int32_t initResultRow(SResultRow *pResultRow);
void    closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot);
bool    isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot);
109
void    clearResultRow(struct STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow);
110

111
struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset);
H
Haojun Liao 已提交
112

H
Haojun Liao 已提交
113
void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr);
H
Haojun Liao 已提交
114
int32_t getRowNumForMultioutput(struct STaskAttr* pQueryAttr, bool topBottomQuery, bool stable);
H
Haojun Liao 已提交
115

H
Haojun Liao 已提交
116 117 118
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
  assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
  return pResultRowInfo->pResult[slot];
H
Haojun Liao 已提交
119
}
H
Haojun Liao 已提交
120

H
Haojun Liao 已提交
121
static FORCE_INLINE char* getPosInResultPage(struct STaskAttr* pQueryAttr, SFilePage* page, int32_t rowOffset,
122
                                             int32_t offset) {
H
Haojun Liao 已提交
123
  assert(rowOffset >= 0 && pQueryAttr != NULL);
H
Haojun Liao 已提交
124

125 126
//  int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
//  return ((char *)page->data) + rowOffset + offset * numOfRows;
H
Haojun Liao 已提交
127
}
128

129 130 131 132 133 134 135
static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffset, int32_t offset) {
  assert(rowOffset >= 0);

  int32_t numOfRows = 1;//(int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
  return ((char *)page->data) + rowOffset + offset * numOfRows;
}

136 137
//bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
//bool notNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
H
Haojun Liao 已提交
138

139
__filter_func_t getFilterOperator(int32_t lowerOptr, int32_t upperOptr);
140

H
Haojun Liao 已提交
141 142 143 144 145 146
SResultRowPool* initResultRowPool(size_t size);
SResultRow* getNewResultRow(SResultRowPool* p);
int64_t getResultRowPoolMemSize(SResultRowPool* p);
void* destroyResultRowPool(SResultRowPool* p);
int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
int32_t getNumOfUsedResultRows(SResultRowPool* p);
147

148 149 150 151 152 153 154 155
typedef struct {
  SArray* pResult;     // SArray<SResPair>
  int32_t colId;
} SStddevInterResult;

void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen);
SArray* interResFromBinary(const char* data, int32_t len);
void freeInterResult(void* param);
156

H
Haojun Liao 已提交
157
void    initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo);
H
Haojun Liao 已提交
158
void    cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
H
Haojun Liao 已提交
159
bool    hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo);
H
Haojun Liao 已提交
160
bool    hasRemainData(SGroupResInfo* pGroupResInfo);
H
Haojun Liao 已提交
161

H
Haojun Liao 已提交
162 163 164
bool    incNextGroup(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);

H
Haojun Liao 已提交
165
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, struct STaskRuntimeEnv *pRuntimeEnv, int32_t* offset);
H
Haojun Liao 已提交
166

167
//int32_t initUdfInfo(struct SUdfInfo* pUdfInfo);
D
dapan1121 已提交
168

169
#endif  // TDENGINE_QUERYUTIL_H