functionMgt.c 10.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "functionMgt.h"

18
#include "builtins.h"
19 20 21 22
#include "functionMgtInt.h"
#include "taos.h"
#include "taoserror.h"
#include "thash.h"
S
shenglian zhou 已提交
23
#include "tudf.h"
24 25 26 27 28

typedef struct SFuncMgtService {
  SHashObj* pFuncNameHashTable;
} SFuncMgtService;

29 30
typedef struct SUdfInfo {
  SDataType outputDt;
31
  int8_t    funcType;
32 33
} SUdfInfo;

34
static SFuncMgtService gFunMgtService;
35 36
static TdThreadOnce    functionHashTableInit = PTHREAD_ONCE_INIT;
static int32_t         initFunctionCode = 0;
37

38
static void doInitFunctionTable() {
39 40
  gFunMgtService.pFuncNameHashTable =
      taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
41
  if (NULL == gFunMgtService.pFuncNameHashTable) {
H
Haojun Liao 已提交
42 43
    initFunctionCode = TSDB_CODE_FAILED;
    return;
44
  }
H
Haojun Liao 已提交
45

46
  for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
47 48
    if (TSDB_CODE_SUCCESS != taosHashPut(gFunMgtService.pFuncNameHashTable, funcMgtBuiltins[i].name,
                                         strlen(funcMgtBuiltins[i].name), &i, sizeof(int32_t))) {
H
Haojun Liao 已提交
49 50
      initFunctionCode = TSDB_CODE_FAILED;
      return;
51
    }
52
  }
D
dapan1121 已提交
53 54
}

55
static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
D
dapan1121 已提交
56
  if (fmIsUserDefinedFunc(funcId)) {
57 58 59
    return FUNC_MGT_AGG_FUNC == classification
               ? FUNC_AGGREGATE_UDF_ID == funcId
               : (FUNC_MGT_SCALAR_FUNC == classification ? FUNC_SCALAR_UDF_ID == funcId : false);
D
dapan1121 已提交
60
  }
61 62 63 64 65 66
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
    return false;
  }
  return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification);
}

H
Haojun Liao 已提交
67
int32_t fmFuncMgtInit() {
68
  taosThreadOnce(&functionHashTableInit, doInitFunctionTable);
H
Haojun Liao 已提交
69
  return initFunctionCode;
70 71
}

72
int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen) {
73 74 75 76
  void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc->functionName, strlen(pFunc->functionName));
  if (NULL != pVal) {
    pFunc->funcId = *(int32_t*)pVal;
    pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
77
    return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pMsg, msgLen);
78
  }
79
  return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
80 81
}

82 83 84 85
bool fmIsBuiltinFunc(const char* pFunc) {
  return NULL != taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc, strlen(pFunc));
}

86
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
87
  if (fmIsUserDefinedFunc(pFunc->funcId) || pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
88
    return FUNC_DATA_REQUIRED_DATA_LOAD;
89 90
  }
  if (NULL == funcMgtBuiltins[pFunc->funcId].dataRequiredFunc) {
91
    return FUNC_DATA_REQUIRED_DATA_LOAD;
92 93 94 95
  }
  return funcMgtBuiltins[pFunc->funcId].dataRequiredFunc(pFunc, pTimeWindow);
}

X
Xiaoyu Wang 已提交
96
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
97
  if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
X
Xiaoyu Wang 已提交
98 99 100 101 102 103
    return TSDB_CODE_FAILED;
  }
  pFpSet->getEnv = funcMgtBuiltins[funcId].getEnvFunc;
  pFpSet->init = funcMgtBuiltins[funcId].initFunc;
  pFpSet->process = funcMgtBuiltins[funcId].processFunc;
  pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc;
5
54liuyao 已提交
104
  pFpSet->combine = funcMgtBuiltins[funcId].combineFunc;
X
Xiaoyu Wang 已提交
105 106 107
  return TSDB_CODE_SUCCESS;
}

108 109 110 111
int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
  if (!fmIsUserDefinedFunc(funcId)) {
    return TSDB_CODE_FAILED;
  }
S
shenglian zhou 已提交
112 113 114 115 116 117 118
  pFpSet->getEnv = udfAggGetEnv;
  pFpSet->init = udfAggInit;
  pFpSet->process = udfAggProcess;
  pFpSet->finalize = udfAggFinalize;
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
119
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
120
  if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
D
dapan1121 已提交
121 122 123
    return TSDB_CODE_FAILED;
  }
  pFpSet->process = funcMgtBuiltins[funcId].sprocessFunc;
124
  pFpSet->getEnv = funcMgtBuiltins[funcId].getEnvFunc;
D
dapan1121 已提交
125 126 127
  return TSDB_CODE_SUCCESS;
}

128
bool fmIsAggFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_AGG_FUNC); }
129

130
bool fmIsScalarFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCALAR_FUNC); }
131

X
Xiaoyu Wang 已提交
132 133
bool fmIsVectorFunc(int32_t funcId) { return !fmIsScalarFunc(funcId); }

134 135
bool fmIsSelectFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SELECT_FUNC); }

X
Xiaoyu Wang 已提交
136 137
bool fmIsTimelineFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_TIMELINE_FUNC); }

138
bool fmIsPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PSEUDO_COLUMN_FUNC); }
X
Xiaoyu Wang 已提交
139

140
bool fmIsScanPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCAN_PC_FUNC); }
141

142
bool fmIsWindowPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_WINDOW_PC_FUNC); }
X
Xiaoyu Wang 已提交
143

144 145
bool fmIsWindowClauseFunc(int32_t funcId) { return fmIsAggFunc(funcId) || fmIsWindowPseudoColumnFunc(funcId); }

X
Xiaoyu Wang 已提交
146
bool fmIsIndefiniteRowsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INDEFINITE_ROWS_FUNC); }
H
Haojun Liao 已提交
147

148 149 150 151 152 153 154
bool fmIsSpecialDataRequiredFunc(int32_t funcId) {
  return isSpecificClassifyFunc(funcId, FUNC_MGT_SPECIAL_DATA_REQUIRED);
}

bool fmIsDynamicScanOptimizedFunc(int32_t funcId) {
  return isSpecificClassifyFunc(funcId, FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED);
}
H
Haojun Liao 已提交
155

156
bool fmIsMultiResFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_MULTI_RES_FUNC); }
157

X
Xiaoyu Wang 已提交
158 159
bool fmIsRepeatScanFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_REPEAT_SCAN_FUNC); }

160
bool fmIsUserDefinedFunc(int32_t funcId) { return funcId > FUNC_UDF_ID_START; }
161

X
Xiaoyu Wang 已提交
162 163
void fmFuncMgtDestroy() {
  void* m = gFunMgtService.pFuncNameHashTable;
wafwerar's avatar
wafwerar 已提交
164
  if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
X
Xiaoyu Wang 已提交
165 166 167
    taosHashCleanup(m);
  }
}
5
54liuyao 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187

int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet) {
  if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
    return TSDB_CODE_FAILED;
  }
  pFpSet->process = funcMgtBuiltins[funcId].invertFunc;
  return TSDB_CODE_SUCCESS;
}

int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet) {
  if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
    return TSDB_CODE_FAILED;
  }
  pFpSet->process = funcMgtBuiltins[funcId].processFunc;
  return TSDB_CODE_SUCCESS;
}

bool fmIsInvertible(int32_t funcId) {
  bool res = false;
  switch (funcMgtBuiltins[funcId].type) {
X
Xiaoyu Wang 已提交
188 189 190 191
    case FUNCTION_TYPE_COUNT:
    case FUNCTION_TYPE_SUM:
    case FUNCTION_TYPE_STDDEV:
    case FUNCTION_TYPE_AVG:
192 193 194
    case FUNCTION_TYPE_WSTARTTS:
    case FUNCTION_TYPE_WENDTS:
    case FUNCTION_TYPE_WDURATION:
X
Xiaoyu Wang 已提交
195 196 197 198
      res = true;
      break;
    default:
      break;
5
54liuyao 已提交
199 200 201
  }
  return res;
}
X
Xiaoyu Wang 已提交
202

203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
static int32_t getFuncInfo(SFunctionNode* pFunc) {
  char msg[64] = {0};
  if (NULL != gFunMgtService.pFuncNameHashTable) {
    return fmGetFuncInfo(pFunc, msg, sizeof(msg));
  }
  for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
    if (0 == strcmp(funcMgtBuiltins[i].name, pFunc->functionName)) {
      pFunc->funcId = i;
      pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
      return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, msg, sizeof(msg));
    }
  }
  return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
}

X
Xiaoyu Wang 已提交
218 219 220 221 222 223 224
static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) {
  SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
  if (NULL == pFunc) {
    return NULL;
  }
  strcpy(pFunc->functionName, pName);
  pFunc->pParameterList = pParameterList;
225 226
  if (TSDB_CODE_SUCCESS != getFuncInfo(pFunc)) {
    pFunc->pParameterList = NULL;
X
Xiaoyu Wang 已提交
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
    nodesDestroyNode(pFunc);
    return NULL;
  }
  return pFunc;
}

static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) {
  SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
  if (NULL == pCol) {
    return NULL;
  }
  strcpy(pCol->colName, pFunc->node.aliasName);
  pCol->node.resType = pFunc->node.resType;
  return pCol;
}

bool fmIsDistExecFunc(int32_t funcId) {
X
Xiaoyu Wang 已提交
244 245 246
  if (fmIsUserDefinedFunc(funcId)) {
    return false;
  }
X
Xiaoyu Wang 已提交
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
  if (!fmIsVectorFunc(funcId)) {
    return true;
  }
  return (NULL != funcMgtBuiltins[funcId].pPartialFunc && NULL != funcMgtBuiltins[funcId].pMergeFunc);
}

static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNode** pPartialFunc) {
  SNodeList* pParameterList = nodesCloneList(pSrcFunc->pParameterList);
  if (NULL == pParameterList) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  *pPartialFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pPartialFunc, pParameterList);
  if (NULL == *pPartialFunc) {
    nodesDestroyList(pParameterList);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  snprintf((*pPartialFunc)->node.aliasName, sizeof((*pPartialFunc)->node.aliasName), "%s.%p",
           (*pPartialFunc)->functionName, pSrcFunc);
  return TSDB_CODE_SUCCESS;
}

static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
                                   SFunctionNode** pMergeFunc) {
  SNodeList* pParameterList = NULL;
  nodesListMakeStrictAppend(&pParameterList, createColumnByFunc(pPartialFunc));
  *pMergeFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList);
  if (NULL == *pMergeFunc) {
    nodesDestroyList(pParameterList);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName);
  return TSDB_CODE_SUCCESS;
}

int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc) {
  if (!fmIsDistExecFunc(pFunc->funcId)) {
    return TSDB_CODE_FAILED;
  }

  int32_t code = createPartialFunction(pFunc, pPartialFunc);
  if (TSDB_CODE_SUCCESS == code) {
    code = createMergeFunction(pFunc, *pPartialFunc, pMergeFunc);
  }

  if (TSDB_CODE_SUCCESS != code) {
    nodesDestroyNode(*pPartialFunc);
    nodesDestroyNode(*pMergeFunc);
  }

  return code;
}