function.h 10.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16 17
#ifndef TDENGINE_FUNCTION_H
#define TDENGINE_FUNCTION_H
18 19 20 21 22

#ifdef __cplusplus
extern "C" {
#endif

H
Haojun Liao 已提交
23
#include "tbuffer.h"
S
common  
Shengliang Guan 已提交
24
#include "tcommon.h"
H
Haojun Liao 已提交
25
#include "tvariant.h"
H
Haojun Liao 已提交
26 27 28 29

struct SqlFunctionCtx;
struct SResultRowEntryInfo;

30 31
struct SFunctionNode;
typedef struct SScalarParam SScalarParam;
H
Haojun Liao 已提交
32 33 34 35 36

typedef struct SFuncExecEnv {
  int32_t calcMemSize;
} SFuncExecEnv;

37
typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
H
Haojun Liao 已提交
38
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
H
Haojun Liao 已提交
39
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
40
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
41
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
5
54liuyao 已提交
42
typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx);
43 44

typedef struct SScalarFuncExecFuncs {
45
  FExecGetEnv getEnv;
46 47
  FScalarExecProcess process;
} SScalarFuncExecFuncs;
H
Haojun Liao 已提交
48 49 50 51 52 53

typedef struct SFuncExecFuncs {
  FExecGetEnv getEnv;
  FExecInit init;
  FExecProcess process;
  FExecFinalize finalize;
5
54liuyao 已提交
54
  FExecCombine combine;
H
Haojun Liao 已提交
55
} SFuncExecFuncs;
56

H
Haojun Liao 已提交
57 58 59 60 61 62
typedef struct SFileBlockInfo {
  int32_t numBlocksOfStep;
} SFileBlockInfo;

#define TSDB_BLOCK_DIST_STEP_ROWS 8
#define MAX_INTERVAL_TIME_WINDOW  1000000  // maximum allowed time windows in final results
63

H
Haojun Liao 已提交
64 65
#define FUNCTION_TYPE_SCALAR       1
#define FUNCTION_TYPE_AGG          2
66

67
#define TOP_BOTTOM_QUERY_LIMIT    100
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
#define FUNCTIONS_NAME_MAX_LENGTH 16

#define FUNCTION_INVALID_ID  -1
#define FUNCTION_COUNT        0
#define FUNCTION_SUM          1
#define FUNCTION_AVG          2
#define FUNCTION_MIN          3
#define FUNCTION_MAX          4
#define FUNCTION_STDDEV       5
#define FUNCTION_PERCT        6
#define FUNCTION_APERCT       7
#define FUNCTION_FIRST        8
#define FUNCTION_LAST         9
#define FUNCTION_LAST_ROW     10
#define FUNCTION_TOP          11
#define FUNCTION_BOTTOM       12
#define FUNCTION_SPREAD       13
#define FUNCTION_TWA          14
#define FUNCTION_LEASTSQR     15

#define FUNCTION_TS           16
#define FUNCTION_TS_DUMMY     17
#define FUNCTION_TAG_DUMMY    18
#define FUNCTION_TS_COMP      19

#define FUNCTION_TAG          20
#define FUNCTION_PRJ          21

#define FUNCTION_TAGPRJ       22
#define FUNCTION_ARITHM       23
#define FUNCTION_DIFF         24

#define FUNCTION_FIRST_DST    25
#define FUNCTION_LAST_DST     26
#define FUNCTION_STDDEV_DST   27
#define FUNCTION_INTERP       28

#define FUNCTION_RATE         29
#define FUNCTION_IRATE        30
#define FUNCTION_TID_TAG      31
#define FUNCTION_DERIVATIVE   32
#define FUNCTION_BLKINFO      33


112 113
#define FUNCTION_COV          38

H
Haojun Liao 已提交
114
typedef struct SResultRowEntryInfo {
H
Haojun Liao 已提交
115 116
  bool     initialized:1;     // output buffer has been initialized
  bool     complete:1;        // query has completed
H
Haojun Liao 已提交
117
  uint8_t  isNullRes:6;       // the result is null
118
  uint8_t  numOfRes;          // num of output result in current buffer
H
Haojun Liao 已提交
119 120
} SResultRowEntryInfo;

121 122
// determine the real data need to calculated the result
enum {
123 124 125 126
  BLK_DATA_NOT_LOAD  = 0x0,
  BLK_DATA_SMA_LOAD  = 0x1,
  BLK_DATA_DATA_LOAD = 0x3,
  BLK_DATA_FILTEROUT = 0x4,   // discard current data block since it is not qualified for filter
127 128 129
};

enum {
H
Haojun Liao 已提交
130
  MAIN_SCAN     = 0x0u,
131
  REVERSE_SCAN  = 0x1u,  // todo remove it
132 133 134 135
  REPEAT_SCAN   = 0x2u,  //repeat scan belongs to the master scan
  MERGE_STAGE   = 0x20u,
};

136 137 138 139 140
typedef struct SPoint1 {
  int64_t   key;
  union{double  val; char* ptr;};
} SPoint1;

H
Haojun Liao 已提交
141
struct SqlFunctionCtx;
142
struct SResultRowEntryInfo;
143 144

//for selectivity query, the corresponding tag value is assigned if the data is qualified
145
typedef struct SSubsidiaryResInfo {
146
  int16_t num;
147 148
  struct SqlFunctionCtx **pCtx;
} SSubsidiaryResInfo;
149

H
Haojun Liao 已提交
150
typedef struct SResultDataInfo {
H
Haojun Liao 已提交
151 152
  int16_t precision;
  int16_t scale;
H
Haojun Liao 已提交
153 154
  int16_t type;
  int16_t bytes;
H
Haojun Liao 已提交
155
  int32_t interBufSize;
H
Haojun Liao 已提交
156 157
} SResultDataInfo;

H
Haojun Liao 已提交
158 159
#define GET_RES_INFO(ctx)        ((ctx)->resultInfo)
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
160

H
Haojun Liao 已提交
161 162 163 164 165 166 167 168 169
typedef struct SInputColumnInfoData {
  int32_t           totalRows;      // total rows in current columnar data
  int32_t           startRowIndex;  // handle started row index
  int32_t           numOfRows;      // the number of rows needs to be handled
  int32_t           numOfInputCols; // PTS is not included
  bool              colDataAggIsSet;// if agg is set or not
  SColumnInfoData  *pPTS;           // primary timestamp column
  SColumnInfoData **pData;
  SColumnDataAgg  **pColumnDataAgg;
170
  uint64_t          uid;            // table uid, used to set the tag value when building the final query result for selectivity functions.
H
Haojun Liao 已提交
171
} SInputColumnInfoData;
H
Haojun Liao 已提交
172

173
// sql function runtime context
H
Haojun Liao 已提交
174
typedef struct SqlFunctionCtx {
175 176 177 178 179 180 181 182 183 184 185 186
  SInputColumnInfoData   input;
  SResultDataInfo        resDataInfo;
  uint32_t               order;  // data block scanner order: asc|desc
  uint8_t                scanFlag;  // record current running step, default: 0
  int16_t                functionId;    // function id
  char                  *pOutput;       // final result output buffer, point to sdata->data
  int32_t                numOfParams;
  SFunctParam           *param;         // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
  int64_t               *ptsList;       // corresponding timestamp array list
  SColumnInfoData       *pTsOutput;     // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
  int32_t                offset;
  SVariant               tag;
187
  struct  SResultRowEntryInfo *resultInfo;
188 189 190 191 192
  SSubsidiaryResInfo     subsidiaries;
  SPoint1                start;
  SPoint1                end;
  SFuncExecFuncs         fpSet;
  SScalarFuncExecFuncs   sfp;
193
  struct SExprInfo      *pExpr;
194 195 196
  struct SDiskbasedBuf  *pBuf;
  struct SSDataBlock    *pSrcBlock;
  int32_t                curBufPage;
S
shenglian zhou 已提交
197

198
  char                   udfName[TSDB_FUNC_NAME_LEN];
H
Haojun Liao 已提交
199
} SqlFunctionCtx;
200

201 202 203 204
enum {
  TEXPR_NODE_DUMMY     = 0x0,
  TEXPR_BINARYEXPR_NODE= 0x1,
  TEXPR_UNARYEXPR_NODE = 0x2,
205
  TEXPR_FUNCTION_NODE  = 0x3,
206 207 208 209 210
  TEXPR_COL_NODE       = 0x4,
  TEXPR_VALUE_NODE     = 0x8,
};

typedef struct tExprNode {
H
Haojun Liao 已提交
211
  int32_t nodeType;
212 213 214
  union {
    SSchema            *pSchema;// column node
    struct SVariant    *pVal;   // value node
215 216

    struct {// function node
H
Haojun Liao 已提交
217 218
      char              functionName[FUNCTIONS_NAME_MAX_LENGTH];  // todo refactor
      int32_t           functionId;
219
      int32_t           num;
220
      struct SFunctionNode    *pFunctNode;
221
    } _function;
222 223 224 225

    struct {
      struct SNode* pRootNode;
    } _optrRoot;
226 227 228 229 230
  };
} tExprNode;

void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));

231
typedef struct SAggFunctionInfo {
232 233 234 235 236
  char      name[FUNCTIONS_NAME_MAX_LENGTH];
  int8_t    type;         // Scalar function or aggregation function
  uint32_t  functionId;   // Function Id
  int8_t    sFunctionId;  // Transfer function for super table query
  uint16_t  status;
237

H
Haojun Liao 已提交
238 239
  bool (*init)(SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);  // setup the execute environment
  void (*addInput)(SqlFunctionCtx *pCtx);
240 241

  // finalizer must be called after all exec has been executed to generated final result.
H
Haojun Liao 已提交
242 243
  void (*finalize)(SqlFunctionCtx *pCtx);
  void (*combine)(SqlFunctionCtx *pCtx);
244

H
Haojun Liao 已提交
245
  int32_t (*dataReqFunc)(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
246 247
} SAggFunctionInfo;

248
struct SScalarParam {
249 250 251 252
  SColumnInfoData *columnData;
  SHashObj        *pHashFilter;
  void            *param;  // other parameter, such as meta handle from vnode, to extract table name/tag value
  int32_t          numOfRows;
253
};
254

255 256
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
                          bool isSuperTable);
257

258 259
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId);

H
Haojun Liao 已提交
260
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
261
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
H
Haojun Liao 已提交
262
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock);
263 264 265 266 267 268 269 270 271 272 273 274 275
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry);
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry);

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// fill api
struct SFillInfo;
struct SFillColInfo;

typedef struct SPoint {
  int64_t key;
  void *  val;
} SPoint;

276 277 278 279 280 281 282 283 284 285 286 287 288
//void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
//void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
//void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
//struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const SValueNode* val);
//bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
//
//struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
//                                     SInterval* pInterval, int32_t fillType,
//                                     struct SFillColInfo* pCol, const char* id);
//
//void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
//int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
//int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
289 290 291

int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);

H
Haojun Liao 已提交
292 293 294 295 296 297 298
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// udf api
struct SUdfInfo;

void qAddUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo);
void qRemoveUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo);

S
slzhou 已提交
299
/**
300
 * create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf
S
slzhou 已提交
301 302 303 304 305 306 307 308 309 310
 * @return error code
 */
int32_t udfcOpen();

/**
 * destroy udfd proxy
 * @return error code
 */
int32_t udfcClose();

311 312 313 314 315 316 317 318 319 320 321
/**
 * start udfd that serves udf function invocation under dnode startDnodeId
 * @param startDnodeId
 * @return
 */
int32_t udfStartUdfd(int32_t startDnodeId);
/**
 * stop udfd
 * @return
 */
int32_t udfStopUdfd();
322 323 324 325
#ifdef __cplusplus
}
#endif

326
#endif  // TDENGINE_FUNCTION_H