function.h 9.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16 17
#ifndef TDENGINE_FUNCTION_H
#define TDENGINE_FUNCTION_H
18 19 20 21 22

#ifdef __cplusplus
extern "C" {
#endif

H
Haojun Liao 已提交
23
#include "tbuffer.h"
S
common  
Shengliang Guan 已提交
24
#include "tcommon.h"
H
Haojun Liao 已提交
25
#include "tvariant.h"
H
Haojun Liao 已提交
26 27 28 29

struct SqlFunctionCtx;
struct SResultRowEntryInfo;

30 31
struct SFunctionNode;
typedef struct SScalarParam SScalarParam;
H
Haojun Liao 已提交
32 33 34 35 36

typedef struct SFuncExecEnv {
  int32_t calcMemSize;
} SFuncExecEnv;

37
typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
H
Haojun Liao 已提交
38
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
H
Haojun Liao 已提交
39
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
40
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
41
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
5
54liuyao 已提交
42
typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx);
43 44

typedef struct SScalarFuncExecFuncs {
45
  FExecGetEnv getEnv;
46 47
  FScalarExecProcess process;
} SScalarFuncExecFuncs;
H
Haojun Liao 已提交
48 49 50 51 52 53

typedef struct SFuncExecFuncs {
  FExecGetEnv getEnv;
  FExecInit init;
  FExecProcess process;
  FExecFinalize finalize;
5
54liuyao 已提交
54
  FExecCombine combine;
H
Haojun Liao 已提交
55
} SFuncExecFuncs;
56

H
Haojun Liao 已提交
57 58 59 60 61 62
typedef struct SFileBlockInfo {
  int32_t numBlocksOfStep;
} SFileBlockInfo;

#define TSDB_BLOCK_DIST_STEP_ROWS 8
#define MAX_INTERVAL_TIME_WINDOW  1000000  // maximum allowed time windows in final results
63

H
Haojun Liao 已提交
64 65
#define FUNCTION_TYPE_SCALAR       1
#define FUNCTION_TYPE_AGG          2
66

67
#define TOP_BOTTOM_QUERY_LIMIT    100
68 69
#define FUNCTIONS_NAME_MAX_LENGTH 16

H
Haojun Liao 已提交
70
typedef struct SResultRowEntryInfo {
H
Haojun Liao 已提交
71 72
  bool     initialized:1;     // output buffer has been initialized
  bool     complete:1;        // query has completed
H
Haojun Liao 已提交
73
  uint8_t  isNullRes:6;       // the result is null
74
  uint8_t  numOfRes;          // num of output result in current buffer
H
Haojun Liao 已提交
75 76
} SResultRowEntryInfo;

77 78
// determine the real data need to calculated the result
enum {
79 80 81 82
  BLK_DATA_NOT_LOAD  = 0x0,
  BLK_DATA_SMA_LOAD  = 0x1,
  BLK_DATA_DATA_LOAD = 0x3,
  BLK_DATA_FILTEROUT = 0x4,   // discard current data block since it is not qualified for filter
83 84 85
};

enum {
H
Haojun Liao 已提交
86
  MAIN_SCAN     = 0x0u,
87
  REVERSE_SCAN  = 0x1u,  // todo remove it
88 89 90 91
  REPEAT_SCAN   = 0x2u,  //repeat scan belongs to the master scan
  MERGE_STAGE   = 0x20u,
};

92 93 94 95 96
typedef struct SPoint1 {
  int64_t   key;
  union{double  val; char* ptr;};
} SPoint1;

H
Haojun Liao 已提交
97
struct SqlFunctionCtx;
98
struct SResultRowEntryInfo;
99 100

//for selectivity query, the corresponding tag value is assigned if the data is qualified
101
typedef struct SSubsidiaryResInfo {
102
  int16_t num;
103 104
  struct SqlFunctionCtx **pCtx;
} SSubsidiaryResInfo;
105

H
Haojun Liao 已提交
106
typedef struct SResultDataInfo {
H
Haojun Liao 已提交
107 108
  int16_t precision;
  int16_t scale;
H
Haojun Liao 已提交
109 110
  int16_t type;
  int16_t bytes;
H
Haojun Liao 已提交
111
  int32_t interBufSize;
H
Haojun Liao 已提交
112 113
} SResultDataInfo;

H
Haojun Liao 已提交
114 115
#define GET_RES_INFO(ctx)        ((ctx)->resultInfo)
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
116

H
Haojun Liao 已提交
117 118 119 120 121 122 123 124 125
typedef struct SInputColumnInfoData {
  int32_t           totalRows;      // total rows in current columnar data
  int32_t           startRowIndex;  // handle started row index
  int32_t           numOfRows;      // the number of rows needs to be handled
  int32_t           numOfInputCols; // PTS is not included
  bool              colDataAggIsSet;// if agg is set or not
  SColumnInfoData  *pPTS;           // primary timestamp column
  SColumnInfoData **pData;
  SColumnDataAgg  **pColumnDataAgg;
126
  uint64_t          uid;            // table uid, used to set the tag value when building the final query result for selectivity functions.
H
Haojun Liao 已提交
127
} SInputColumnInfoData;
H
Haojun Liao 已提交
128

129
// sql function runtime context
H
Haojun Liao 已提交
130
typedef struct SqlFunctionCtx {
131 132 133 134 135 136 137 138
  SInputColumnInfoData   input;
  SResultDataInfo        resDataInfo;
  uint32_t               order;  // data block scanner order: asc|desc
  uint8_t                scanFlag;  // record current running step, default: 0
  int16_t                functionId;    // function id
  char                  *pOutput;       // final result output buffer, point to sdata->data
  int32_t                numOfParams;
  SFunctParam           *param;         // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
139
  int64_t               *ptsList;       // corresponding timestamp array list, todo remove it
140 141
  SColumnInfoData       *pTsOutput;     // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
  int32_t                offset;
142
  struct  SResultRowEntryInfo *resultInfo;
143 144 145 146 147
  SSubsidiaryResInfo     subsidiaries;
  SPoint1                start;
  SPoint1                end;
  SFuncExecFuncs         fpSet;
  SScalarFuncExecFuncs   sfp;
148
  struct SExprInfo      *pExpr;
149 150 151
  struct SDiskbasedBuf  *pBuf;
  struct SSDataBlock    *pSrcBlock;
  int32_t                curBufPage;
S
shenglian zhou 已提交
152

153
  char                   udfName[TSDB_FUNC_NAME_LEN];
H
Haojun Liao 已提交
154
} SqlFunctionCtx;
155

156 157 158 159
enum {
  TEXPR_NODE_DUMMY     = 0x0,
  TEXPR_BINARYEXPR_NODE= 0x1,
  TEXPR_UNARYEXPR_NODE = 0x2,
160
  TEXPR_FUNCTION_NODE  = 0x3,
161 162 163 164 165
  TEXPR_COL_NODE       = 0x4,
  TEXPR_VALUE_NODE     = 0x8,
};

typedef struct tExprNode {
H
Haojun Liao 已提交
166
  int32_t nodeType;
167 168 169
  union {
    SSchema            *pSchema;// column node
    struct SVariant    *pVal;   // value node
170 171

    struct {// function node
H
Haojun Liao 已提交
172 173
      char              functionName[FUNCTIONS_NAME_MAX_LENGTH];  // todo refactor
      int32_t           functionId;
174
      int32_t           num;
175
      struct SFunctionNode    *pFunctNode;
176
    } _function;
177 178 179 180

    struct {
      struct SNode* pRootNode;
    } _optrRoot;
181 182 183 184 185
  };
} tExprNode;

void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));

186
typedef struct SAggFunctionInfo {
187 188 189 190 191
  char      name[FUNCTIONS_NAME_MAX_LENGTH];
  int8_t    type;         // Scalar function or aggregation function
  uint32_t  functionId;   // Function Id
  int8_t    sFunctionId;  // Transfer function for super table query
  uint16_t  status;
192

H
Haojun Liao 已提交
193 194
  bool (*init)(SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);  // setup the execute environment
  void (*addInput)(SqlFunctionCtx *pCtx);
195 196

  // finalizer must be called after all exec has been executed to generated final result.
H
Haojun Liao 已提交
197 198
  void (*finalize)(SqlFunctionCtx *pCtx);
  void (*combine)(SqlFunctionCtx *pCtx);
199

H
Haojun Liao 已提交
200
  int32_t (*dataReqFunc)(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
201 202
} SAggFunctionInfo;

203
struct SScalarParam {
204 205 206 207
  SColumnInfoData *columnData;
  SHashObj        *pHashFilter;
  void            *param;  // other parameter, such as meta handle from vnode, to extract table name/tag value
  int32_t          numOfRows;
208
};
209

210 211
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
                          bool isSuperTable);
212

213 214
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId);

H
Haojun Liao 已提交
215
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
216
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
H
Haojun Liao 已提交
217
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock);
218 219 220 221 222 223 224 225 226 227 228 229 230
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry);
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry);

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// fill api
struct SFillInfo;
struct SFillColInfo;

typedef struct SPoint {
  int64_t key;
  void *  val;
} SPoint;

231 232 233 234 235 236 237 238 239 240 241 242 243
//void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
//void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
//void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
//struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const SValueNode* val);
//bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
//
//struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
//                                     SInterval* pInterval, int32_t fillType,
//                                     struct SFillColInfo* pCol, const char* id);
//
//void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
//int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
//int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
244 245 246

int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);

H
Haojun Liao 已提交
247 248 249 250 251 252 253
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// udf api
struct SUdfInfo;

void qAddUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo);
void qRemoveUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo);

S
slzhou 已提交
254
/**
255
 * create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf
S
slzhou 已提交
256 257 258 259 260 261 262 263 264 265
 * @return error code
 */
int32_t udfcOpen();

/**
 * destroy udfd proxy
 * @return error code
 */
int32_t udfcClose();

266 267 268 269 270 271 272 273 274 275 276
/**
 * start udfd that serves udf function invocation under dnode startDnodeId
 * @param startDnodeId
 * @return
 */
int32_t udfStartUdfd(int32_t startDnodeId);
/**
 * stop udfd
 * @return
 */
int32_t udfStopUdfd();
277 278 279 280
#ifdef __cplusplus
}
#endif

281
#endif  // TDENGINE_FUNCTION_H