mndFunc.c 19.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndFunc.h"
S
Shengliang Guan 已提交
18
#include "mndAuth.h"
S
Shengliang Guan 已提交
19 20 21
#include "mndShow.h"
#include "mndSync.h"
#include "mndTrans.h"
S
Shengliang Guan 已提交
22
#include "mndUser.h"
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24
#define SDB_FUNC_VER 1
S
Shengliang Guan 已提交
25
#define SDB_FUNC_RESERVE_SIZE 64
S
Shengliang Guan 已提交
26 27 28 29 30

static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc);
static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw);
static int32_t  mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc);
static int32_t  mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc);
S
Shengliang 已提交
31
static int32_t  mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew);
S
Shengliang 已提交
32 33 34 35 36 37 38
static int32_t  mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate);
static int32_t  mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc);
static int32_t  mndProcessCreateFuncReq(SMnodeMsg *pReq);
static int32_t  mndProcessDropFuncReq(SMnodeMsg *pReq);
static int32_t  mndProcessRetrieveFuncReq(SMnodeMsg *pReq);
static int32_t  mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t  mndRetrieveFuncs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
39 40 41 42 43 44 45 46 47 48 49
static void     mndCancelGetNextFunc(SMnode *pMnode, void *pIter);

int32_t mndInitFunc(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_FUNC,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndFuncActionEncode,
                     .decodeFp = (SdbDecodeFp)mndFuncActionDecode,
                     .insertFp = (SdbInsertFp)mndFuncActionInsert,
                     .updateFp = (SdbUpdateFp)mndFuncActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndFuncActionDelete};

S
Shengliang 已提交
50 51 52
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_FUNC, mndProcessCreateFuncReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_FUNC, mndProcessDropFuncReq);
  mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_FUNC, mndProcessRetrieveFuncReq);
S
Shengliang Guan 已提交
53

S
Shengliang 已提交
54 55 56
  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndGetFuncMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndRetrieveFuncs);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndCancelGetNextFunc);
S
Shengliang Guan 已提交
57

S
Shengliang Guan 已提交
58 59 60 61 62 63
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupFunc(SMnode *pMnode) {}

static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) {
64 65
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
66
  int32_t  size = pFunc->commentSize + pFunc->codeSize + sizeof(SFuncObj) + SDB_FUNC_RESERVE_SIZE;
S
Shengliang Guan 已提交
67
  SSdbRaw *pRaw = sdbAllocRaw(SDB_FUNC, SDB_FUNC_VER, size);
68
  if (pRaw == NULL) goto FUNC_ENCODE_OVER;
S
Shengliang Guan 已提交
69 70

  int32_t dataPos = 0;
71 72 73 74 75 76 77 78
  SDB_SET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pFunc->createdTime, FUNC_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pFunc->funcType, FUNC_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pFunc->scriptType, FUNC_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pFunc->align, FUNC_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pFunc->outputType, FUNC_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pFunc->outputLen, FUNC_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pFunc->bufSize, FUNC_ENCODE_OVER)
S
Shengliang 已提交
79
  SDB_SET_INT64(pRaw, dataPos, pFunc->signature, FUNC_ENCODE_OVER)
80 81 82 83
  SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, FUNC_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, FUNC_ENCODE_OVER)
  SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, FUNC_ENCODE_OVER)
  SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, FUNC_ENCODE_OVER)
S
Shengliang Guan 已提交
84
  SDB_SET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, FUNC_ENCODE_OVER)
85 86 87 88 89 90 91 92 93 94
  SDB_SET_DATALEN(pRaw, dataPos, FUNC_ENCODE_OVER);

  terrno = 0;

FUNC_ENCODE_OVER:
  if (terrno != 0) {
    mError("func:%s, failed to encode to raw:%p since %s", pFunc->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }
S
Shengliang Guan 已提交
95

S
Shengliang Guan 已提交
96
  mTrace("func:%s, encode to raw:%p, row:%p", pFunc->name, pRaw, pFunc);
S
Shengliang Guan 已提交
97 98 99 100
  return pRaw;
}

static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
101 102
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
103
  int8_t sver = 0;
104
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto FUNC_DECODE_OVER;
S
Shengliang Guan 已提交
105 106 107

  if (sver != SDB_FUNC_VER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
108
    goto FUNC_DECODE_OVER;
S
Shengliang Guan 已提交
109 110
  }

S
Shengliang 已提交
111
  SSdbRow *pRow = sdbAllocRow(sizeof(SFuncObj));
112 113
  if (pRow == NULL) goto FUNC_DECODE_OVER;

S
Shengliang Guan 已提交
114
  SFuncObj *pFunc = sdbGetRowObj(pRow);
115
  if (pFunc == NULL) goto FUNC_DECODE_OVER;
S
Shengliang Guan 已提交
116 117

  int32_t dataPos = 0;
118 119 120 121 122 123 124 125
  SDB_GET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pFunc->createdTime, FUNC_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pFunc->funcType, FUNC_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pFunc->scriptType, FUNC_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pFunc->align, FUNC_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pFunc->outputType, FUNC_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pFunc->outputLen, FUNC_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pFunc->bufSize, FUNC_DECODE_OVER)
S
Shengliang 已提交
126
  SDB_GET_INT64(pRaw, dataPos, &pFunc->signature, FUNC_DECODE_OVER)
127 128
  SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, FUNC_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, FUNC_DECODE_OVER)
S
Shengliang 已提交
129 130 131 132 133 134 135 136 137

  pFunc->pComment = calloc(1, pFunc->commentSize);
  pFunc->pCode = calloc(1, pFunc->codeSize);
  if (pFunc->pComment == NULL || pFunc->pCode == NULL) {
    goto FUNC_DECODE_OVER;
  }

  SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, FUNC_DECODE_OVER)
  SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, FUNC_DECODE_OVER)
S
Shengliang Guan 已提交
138
  SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, FUNC_DECODE_OVER)
S
Shengliang Guan 已提交
139

140 141 142 143 144 145 146 147 148 149
  terrno = 0;

FUNC_DECODE_OVER:
  if (terrno != 0) {
    mError("func:%s, failed to decode from raw:%p since %s", pFunc->name, pRaw, terrstr());
    tfree(pRow);
    return NULL;
  }

  mTrace("func:%s, decode from raw:%p, row:%p", pFunc->name, pRaw, pFunc);
S
Shengliang Guan 已提交
150 151 152 153
  return pRow;
}

static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc) {
154
  mTrace("func:%s, perform insert action, row:%p", pFunc->name, pFunc);
S
Shengliang Guan 已提交
155 156 157 158
  return 0;
}

static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) {
159
  mTrace("func:%s, perform delete action, row:%p", pFunc->name, pFunc);
S
Shengliang 已提交
160 161
  tfree(pFunc->pCode);
  tfree(pFunc->pComment);
S
Shengliang Guan 已提交
162 163 164
  return 0;
}

S
Shengliang 已提交
165 166
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) {
  mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
S
Shengliang Guan 已提交
167 168 169
  return 0;
}

S
Shengliang 已提交
170 171 172 173 174 175 176 177
static SFuncObj *mndAcquireFunc(SMnode *pMnode, char *funcName) {
  SSdb     *pSdb = pMnode->pSdb;
  SFuncObj *pFunc = sdbAcquire(pSdb, SDB_FUNC, funcName);
  if (pFunc == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
    terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
  }
  return pFunc;
}
S
Shengliang Guan 已提交
178

S
Shengliang 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
static void mndReleaseFunc(SMnode *pMnode, SFuncObj *pFunc) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pFunc);
}

static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate) {
  int32_t code = -1;
  STrans *pTrans = NULL;

  SFuncObj func = {0};
  memcpy(func.name, pCreate->name, TSDB_FUNC_NAME_LEN);
  func.createdTime = taosGetTimestampMs();
  func.funcType = pCreate->funcType;
  func.scriptType = pCreate->scriptType;
  func.outputType = pCreate->outputType;
  func.outputLen = pCreate->outputLen;
  func.bufSize = pCreate->bufSize;
  func.signature = pCreate->signature;
  func.commentSize = pCreate->commentSize;
  func.codeSize = pCreate->codeSize;
  func.pComment = malloc(func.commentSize);
  func.pCode = malloc(func.codeSize);
  if (func.pCode == NULL || func.pCode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
204 205
  }

S
Shengliang Guan 已提交
206 207
  memcpy(func.pComment, pCreate->pComment, pCreate->commentSize);
  memcpy(func.pCode, pCreate->pCode, func.codeSize);
S
Shengliang 已提交
208 209 210 211

  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
  if (pTrans == NULL) goto CREATE_FUNC_OVER;

S
Shengliang Guan 已提交
212 213
  mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name);

S
Shengliang 已提交
214 215 216
  SSdbRaw *pRedoRaw = mndFuncActionEncode(&func);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto CREATE_FUNC_OVER;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
217

S
Shengliang 已提交
218 219 220
  SSdbRaw *pUndoRaw = mndFuncActionEncode(&func);
  if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto CREATE_FUNC_OVER;
  if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
221

S
Shengliang 已提交
222 223 224
  SSdbRaw *pCommitRaw = mndFuncActionEncode(&func);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto CREATE_FUNC_OVER;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
225

S
Shengliang 已提交
226
  if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
227

S
Shengliang 已提交
228 229 230 231 232
  code = 0;

CREATE_FUNC_OVER:
  free(func.pCode);
  free(func.pComment);
S
Shengliang Guan 已提交
233
  mndTransDrop(pTrans);
S
Shengliang 已提交
234
  return code;
S
Shengliang Guan 已提交
235 236
}

S
Shengliang 已提交
237
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) {
S
Shengliang 已提交
238
  int32_t code = -1;
S
Shengliang 已提交
239
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
S
Shengliang 已提交
240 241
  if (pTrans == NULL) goto DROP_FUNC_OVER;

S
Shengliang Guan 已提交
242 243 244
  mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);

  SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc);
S
Shengliang 已提交
245
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto DROP_FUNC_OVER;
S
Shengliang Guan 已提交
246 247 248
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);

  SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc);
S
Shengliang 已提交
249
  if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto DROP_FUNC_OVER;
S
Shengliang Guan 已提交
250 251 252
  sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);

  SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc);
S
Shengliang 已提交
253
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto DROP_FUNC_OVER;
S
Shengliang Guan 已提交
254 255
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);

S
Shengliang 已提交
256 257 258
  if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_FUNC_OVER;

  code = 0;
S
Shengliang Guan 已提交
259

S
Shengliang 已提交
260
DROP_FUNC_OVER:
S
Shengliang Guan 已提交
261
  mndTransDrop(pTrans);
S
Shengliang 已提交
262
  return code;
S
Shengliang Guan 已提交
263 264
}

S
Shengliang 已提交
265
static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) {
S
Shengliang Guan 已提交
266 267 268 269 270 271 272 273 274 275
  SMnode        *pMnode = pReq->pMnode;
  int32_t        code = -1;
  SUserObj      *pUser = NULL;
  SFuncObj      *pFunc = NULL;
  SCreateFuncReq createReq = {0};

  if (tDeserializeSCreateFuncReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto CREATE_FUNC_OVER;
  }
S
Shengliang Guan 已提交
276

S
Shengliang Guan 已提交
277
  mDebug("func:%s, start to create", createReq.name);
S
Shengliang Guan 已提交
278

S
Shengliang Guan 已提交
279
  pFunc = mndAcquireFunc(pMnode, createReq.name);
S
Shengliang Guan 已提交
280
  if (pFunc != NULL) {
S
Shengliang Guan 已提交
281 282 283 284
    if (createReq.igExists) {
      mDebug("func:%s, already exist, ignore exist is set", createReq.name);
      code = 0;
      goto CREATE_FUNC_OVER;
S
Shengliang 已提交
285 286
    } else {
      terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST;
S
Shengliang Guan 已提交
287
      goto CREATE_FUNC_OVER;
S
Shengliang 已提交
288
    }
S
Shengliang 已提交
289
  } else if (terrno == TSDB_CODE_MND_FUNC_ALREADY_EXIST) {
S
Shengliang Guan 已提交
290
    goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
291 292
  }

S
Shengliang Guan 已提交
293
  if (createReq.name[0] == 0) {
S
Shengliang Guan 已提交
294
    terrno = TSDB_CODE_MND_INVALID_FUNC_NAME;
S
Shengliang Guan 已提交
295
    goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
296 297
  }

S
Shengliang Guan 已提交
298
  if (createReq.commentSize <= 0 || createReq.commentSize > TSDB_FUNC_COMMENT_LEN) {
S
Shengliang Guan 已提交
299
    terrno = TSDB_CODE_MND_INVALID_FUNC_COMMENT;
S
Shengliang Guan 已提交
300
    goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
301 302
  }

S
Shengliang Guan 已提交
303
  if (createReq.codeSize <= 0 || createReq.codeSize > TSDB_FUNC_CODE_LEN) {
S
Shengliang Guan 已提交
304
    terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
S
Shengliang Guan 已提交
305
    goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
306 307
  }

S
Shengliang Guan 已提交
308
  if (createReq.pCode[0] == 0) {
S
Shengliang Guan 已提交
309
    terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
S
Shengliang Guan 已提交
310
    goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
311 312
  }

S
Shengliang Guan 已提交
313
  if (createReq.bufSize <= 0 || createReq.bufSize > TSDB_FUNC_BUF_SIZE) {
S
Shengliang Guan 已提交
314
    terrno = TSDB_CODE_MND_INVALID_FUNC_BUFSIZE;
S
Shengliang Guan 已提交
315
    goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
316 317
  }

S
Shengliang Guan 已提交
318 319 320 321 322 323
  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    goto CREATE_FUNC_OVER;
  }

S
Shengliang Guan 已提交
324
  if (mndCheckFuncAuth(pUser)) {
S
Shengliang Guan 已提交
325 326 327 328 329 330 331 332 333
    goto CREATE_FUNC_OVER;
  }

  code = mndCreateFunc(pMnode, pReq, &createReq);
  if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;

CREATE_FUNC_OVER:
  if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
    mError("func:%s, failed to create since %s", createReq.name, terrstr());
S
Shengliang Guan 已提交
334 335
  }

S
Shengliang Guan 已提交
336 337 338 339
  mndReleaseFunc(pMnode, pFunc);
  mndReleaseUser(pMnode, pUser);

  return code;
S
Shengliang Guan 已提交
340 341
}

S
Shengliang 已提交
342
static int32_t mndProcessDropFuncReq(SMnodeMsg *pReq) {
S
Shengliang Guan 已提交
343 344 345 346 347 348 349 350 351 352
  SMnode      *pMnode = pReq->pMnode;
  int32_t      code = -1;
  SUserObj    *pUser = NULL;
  SFuncObj    *pFunc = NULL;
  SDropFuncReq dropReq = {0};

  if (tDeserializeSDropFuncReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto DROP_FUNC_OVER;
  }
S
Shengliang Guan 已提交
353

S
Shengliang Guan 已提交
354
  mDebug("func:%s, start to drop", dropReq.name);
S
Shengliang Guan 已提交
355

S
Shengliang Guan 已提交
356
  if (dropReq.name[0] == 0) {
S
Shengliang Guan 已提交
357
    terrno = TSDB_CODE_MND_INVALID_FUNC_NAME;
S
Shengliang Guan 已提交
358
    goto DROP_FUNC_OVER;
S
Shengliang Guan 已提交
359 360
  }

S
Shengliang Guan 已提交
361
  pFunc = mndAcquireFunc(pMnode, dropReq.name);
S
Shengliang Guan 已提交
362
  if (pFunc == NULL) {
S
Shengliang Guan 已提交
363 364 365 366
    if (dropReq.igNotExists) {
      mDebug("func:%s, not exist, ignore not exist is set", dropReq.name);
      code = 0;
      goto DROP_FUNC_OVER;
S
Shengliang 已提交
367 368
    } else {
      terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
S
Shengliang Guan 已提交
369
      goto DROP_FUNC_OVER;
S
Shengliang 已提交
370
    }
S
Shengliang Guan 已提交
371 372
  }

S
Shengliang Guan 已提交
373 374 375 376 377 378
  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    goto DROP_FUNC_OVER;
  }

S
Shengliang Guan 已提交
379
  if (mndCheckFuncAuth(pUser)) {
S
Shengliang Guan 已提交
380 381 382 383 384
    goto DROP_FUNC_OVER;
  }

  code = mndDropFunc(pMnode, pReq, pFunc);
  if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
S
Shengliang 已提交
385

S
Shengliang Guan 已提交
386 387 388
DROP_FUNC_OVER:
  if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
    mError("func:%s, failed to drop since %s", dropReq.name, terrstr());
S
Shengliang Guan 已提交
389 390
  }

S
Shengliang Guan 已提交
391 392 393 394
  mndReleaseFunc(pMnode, pFunc);
  mndReleaseUser(pMnode, pUser);

  return code;
S
Shengliang Guan 已提交
395 396
}

S
Shengliang 已提交
397
static int32_t mndProcessRetrieveFuncReq(SMnodeMsg *pReq) {
S
Shengliang Guan 已提交
398 399 400 401 402 403 404 405 406
  SMnode          *pMnode = pReq->pMnode;
  int32_t          code = -1;
  SRetrieveFuncReq retrieveReq = {0};
  SRetrieveFuncRsp retrieveRsp = {0};

  if (tDeserializeSRetrieveFuncReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &retrieveReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto RETRIEVE_FUNC_OVER;
  }
S
Shengliang Guan 已提交
407

S
Shengliang Guan 已提交
408
  if (retrieveReq.numOfFuncs <= 0 || retrieveReq.numOfFuncs > TSDB_FUNC_MAX_RETRIEVE) {
S
Shengliang 已提交
409
    terrno = TSDB_CODE_MND_INVALID_FUNC_RETRIEVE;
S
Shengliang Guan 已提交
410
    goto RETRIEVE_FUNC_OVER;
S
Shengliang 已提交
411
  }
S
Shengliang Guan 已提交
412

S
Shengliang Guan 已提交
413 414 415
  retrieveRsp.numOfFuncs = retrieveReq.numOfFuncs;
  retrieveRsp.pFuncInfos = taosArrayInit(retrieveReq.numOfFuncs, sizeof(SFuncInfo));
  if (retrieveRsp.pFuncInfos == NULL) {
S
Shengliang 已提交
416
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
417
    goto RETRIEVE_FUNC_OVER;
S
Shengliang 已提交
418 419
  }

S
Shengliang Guan 已提交
420 421
  for (int32_t i = 0; i < retrieveReq.numOfFuncs; ++i) {
    char *funcName = taosArrayGet(retrieveReq.pFuncNames, i);
S
Shengliang Guan 已提交
422

S
Shengliang 已提交
423
    SFuncObj *pFunc = mndAcquireFunc(pMnode, funcName);
S
Shengliang Guan 已提交
424 425
    if (pFunc == NULL) {
      terrno = TSDB_CODE_MND_INVALID_FUNC;
S
Shengliang Guan 已提交
426
      goto RETRIEVE_FUNC_OVER;
S
Shengliang Guan 已提交
427 428
    }

S
Shengliang Guan 已提交
429 430 431 432 433 434 435 436 437 438 439 440 441
    SFuncInfo funcInfo = {0};
    memcpy(funcInfo.name, pFunc->name, TSDB_FUNC_NAME_LEN);
    funcInfo.funcType = pFunc->funcType;
    funcInfo.scriptType = pFunc->scriptType;
    funcInfo.outputType = pFunc->outputType;
    funcInfo.outputLen = pFunc->outputLen;
    funcInfo.bufSize = pFunc->bufSize;
    funcInfo.signature = pFunc->signature;
    funcInfo.commentSize = pFunc->commentSize;
    funcInfo.codeSize = pFunc->codeSize;
    memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize);
    memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize);
    taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo);
S
Shengliang 已提交
442
    mndReleaseFunc(pMnode, pFunc);
S
Shengliang Guan 已提交
443 444
  }

S
Shengliang Guan 已提交
445 446 447 448 449 450 451 452 453 454 455
  int32_t contLen = tSerializeSRetrieveFuncRsp(NULL, 0, &retrieveRsp);
  void   *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto RETRIEVE_FUNC_OVER;
  }

  tSerializeSRetrieveFuncRsp(pRsp, contLen, &retrieveRsp);

  pReq->pCont = pRsp;
  pReq->contLen = contLen;
S
Shengliang Guan 已提交
456

S
Shengliang 已提交
457 458
  code = 0;

S
Shengliang Guan 已提交
459 460 461
RETRIEVE_FUNC_OVER:
  taosArrayDestroy(retrieveReq.pFuncNames);
  taosArrayDestroy(retrieveRsp.pFuncInfos);
S
Shengliang 已提交
462 463

  return code;
S
Shengliang Guan 已提交
464 465
}

S
Shengliang 已提交
466 467
static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
468 469 470
  SSdb   *pSdb = pMnode->pSdb;

  int32_t  cols = 0;
S
Shengliang Guan 已提交
471
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
472 473 474 475

  pShow->bytes[cols] = TSDB_FUNC_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "name");
S
Shengliang Guan 已提交
476
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
477 478 479 480 481
  cols++;

  pShow->bytes[cols] = PATH_MAX + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "comment");
S
Shengliang Guan 已提交
482
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
483 484 485 486 487
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "aggregate");
S
Shengliang Guan 已提交
488
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
489 490 491 492 493
  cols++;

  pShow->bytes[cols] = TSDB_TYPE_STR_MAX_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "outputtype");
S
Shengliang Guan 已提交
494
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
495 496 497 498 499
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "create_time");
S
Shengliang Guan 已提交
500
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
501 502 503 504 505
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "code_len");
S
Shengliang Guan 已提交
506
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
507 508 509 510 511
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "bufsize");
S
Shengliang Guan 已提交
512
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
513 514
  cols++;

S
Shengliang Guan 已提交
515
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
516 517 518 519 520 521 522 523 524
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = sdbGetSize(pSdb, SDB_FUNC);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
525
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
526 527 528 529 530 531 532 533 534 535 536

  return 0;
}

static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int16_t len) {
  char *msg = "unknown";
  if (type >= sizeof(tDataTypes) / sizeof(tDataTypes[0])) {
    return msg;
  }

  if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
S
Shengliang Guan 已提交
537
    int32_t bytes = len > 0 ? (int32_t)(len - VARSTR_HEADER_SIZE) : len;
S
Shengliang Guan 已提交
538 539 540 541 542 543 544 545 546 547

    snprintf(buf, buflen - 1, "%s(%d)", tDataTypes[type].name, type == TSDB_DATA_TYPE_NCHAR ? bytes / 4 : bytes);
    buf[buflen - 1] = 0;

    return buf;
  }

  return tDataTypes[type].name;
}

S
Shengliang 已提交
548 549
static int32_t mndRetrieveFuncs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
  SSdb     *pSdb = pMnode->pSdb;
  int32_t   numOfRows = 0;
  SFuncObj *pFunc = NULL;
  int32_t   cols = 0;
  char     *pWrite;
  char      buf[TSDB_TYPE_STR_MAX_LEN];

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_FUNC, pShow->pIter, (void **)&pFunc);
    if (pShow->pIter == NULL) break;

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pFunc->name, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pFunc->pComment, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pFunc->funcType == TSDB_FUNC_TYPE_AGGREGATE ? 1 : 0;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, mnodeGenTypeStr(buf, TSDB_TYPE_STR_MAX_LEN, pFunc->outputType, pFunc->outputLen),
                               pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int64_t *)pWrite = pFunc->createdTime;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pFunc->codeSize;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pFunc->bufSize;
    cols++;

    numOfRows++;
    sdbRelease(pSdb, pFunc);
  }

S
Shengliang Guan 已提交
596
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
597 598 599 600 601 602 603 604
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}