mndFunc.c 19.4 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18 19 20
#include "mndFunc.h"
#include "mndShow.h"
#include "mndSync.h"
#include "mndTrans.h"
S
Shengliang Guan 已提交
21

S
Shengliang Guan 已提交
22 23 24 25 26 27
#define SDB_FUNC_VER 1

static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc);
static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw);
static int32_t  mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc);
static int32_t  mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc);
S
Shengliang 已提交
28
static int32_t  mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew);
S
Shengliang 已提交
29 30 31 32 33 34 35
static int32_t  mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate);
static int32_t  mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc);
static int32_t  mndProcessCreateFuncReq(SMnodeMsg *pReq);
static int32_t  mndProcessDropFuncReq(SMnodeMsg *pReq);
static int32_t  mndProcessRetrieveFuncReq(SMnodeMsg *pReq);
static int32_t  mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t  mndRetrieveFuncs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
36 37 38 39 40 41 42 43 44 45 46
static void     mndCancelGetNextFunc(SMnode *pMnode, void *pIter);

int32_t mndInitFunc(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_FUNC,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndFuncActionEncode,
                     .decodeFp = (SdbDecodeFp)mndFuncActionDecode,
                     .insertFp = (SdbInsertFp)mndFuncActionInsert,
                     .updateFp = (SdbUpdateFp)mndFuncActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndFuncActionDelete};

S
Shengliang 已提交
47 48 49
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_FUNC, mndProcessCreateFuncReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_FUNC, mndProcessDropFuncReq);
  mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_FUNC, mndProcessRetrieveFuncReq);
S
Shengliang Guan 已提交
50

S
Shengliang 已提交
51 52 53
  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndGetFuncMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndRetrieveFuncs);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndCancelGetNextFunc);
S
Shengliang Guan 已提交
54

S
Shengliang Guan 已提交
55 56 57 58 59 60
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupFunc(SMnode *pMnode) {}

static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) {
61 62
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
63 64
  int32_t  size = pFunc->commentSize + pFunc->codeSize + sizeof(SFuncObj);
  SSdbRaw *pRaw = sdbAllocRaw(SDB_FUNC, SDB_FUNC_VER, size);
65
  if (pRaw == NULL) goto FUNC_ENCODE_OVER;
S
Shengliang Guan 已提交
66 67

  int32_t dataPos = 0;
68 69 70 71 72 73 74 75
  SDB_SET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pFunc->createdTime, FUNC_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pFunc->funcType, FUNC_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pFunc->scriptType, FUNC_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pFunc->align, FUNC_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pFunc->outputType, FUNC_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pFunc->outputLen, FUNC_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pFunc->bufSize, FUNC_ENCODE_OVER)
S
Shengliang 已提交
76
  SDB_SET_INT64(pRaw, dataPos, pFunc->signature, FUNC_ENCODE_OVER)
77 78 79 80 81 82 83 84 85 86 87 88 89 90
  SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, FUNC_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, FUNC_ENCODE_OVER)
  SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, FUNC_ENCODE_OVER)
  SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, FUNC_ENCODE_OVER)
  SDB_SET_DATALEN(pRaw, dataPos, FUNC_ENCODE_OVER);

  terrno = 0;

FUNC_ENCODE_OVER:
  if (terrno != 0) {
    mError("func:%s, failed to encode to raw:%p since %s", pFunc->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }
S
Shengliang Guan 已提交
91

S
Shengliang Guan 已提交
92
  mTrace("func:%s, encode to raw:%p, row:%p", pFunc->name, pRaw, pFunc);
S
Shengliang Guan 已提交
93 94 95 96
  return pRaw;
}

static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
97 98
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
99
  int8_t sver = 0;
100
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto FUNC_DECODE_OVER;
S
Shengliang Guan 已提交
101 102 103

  if (sver != SDB_FUNC_VER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
104
    goto FUNC_DECODE_OVER;
S
Shengliang Guan 已提交
105 106
  }

S
Shengliang 已提交
107
  SSdbRow *pRow = sdbAllocRow(sizeof(SFuncObj));
108 109
  if (pRow == NULL) goto FUNC_DECODE_OVER;

S
Shengliang Guan 已提交
110
  SFuncObj *pFunc = sdbGetRowObj(pRow);
111
  if (pFunc == NULL) goto FUNC_DECODE_OVER;
S
Shengliang Guan 已提交
112 113

  int32_t dataPos = 0;
114 115 116 117 118 119 120 121
  SDB_GET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pFunc->createdTime, FUNC_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pFunc->funcType, FUNC_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pFunc->scriptType, FUNC_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pFunc->align, FUNC_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pFunc->outputType, FUNC_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pFunc->outputLen, FUNC_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pFunc->bufSize, FUNC_DECODE_OVER)
S
Shengliang 已提交
122
  SDB_GET_INT64(pRaw, dataPos, &pFunc->signature, FUNC_DECODE_OVER)
123 124
  SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, FUNC_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, FUNC_DECODE_OVER)
S
Shengliang 已提交
125 126 127 128 129 130 131 132 133

  pFunc->pComment = calloc(1, pFunc->commentSize);
  pFunc->pCode = calloc(1, pFunc->codeSize);
  if (pFunc->pComment == NULL || pFunc->pCode == NULL) {
    goto FUNC_DECODE_OVER;
  }

  SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, FUNC_DECODE_OVER)
  SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, FUNC_DECODE_OVER)
S
Shengliang Guan 已提交
134

135 136 137 138 139 140 141 142 143 144
  terrno = 0;

FUNC_DECODE_OVER:
  if (terrno != 0) {
    mError("func:%s, failed to decode from raw:%p since %s", pFunc->name, pRaw, terrstr());
    tfree(pRow);
    return NULL;
  }

  mTrace("func:%s, decode from raw:%p, row:%p", pFunc->name, pRaw, pFunc);
S
Shengliang Guan 已提交
145 146 147 148
  return pRow;
}

static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc) {
149
  mTrace("func:%s, perform insert action, row:%p", pFunc->name, pFunc);
S
Shengliang Guan 已提交
150 151 152 153
  return 0;
}

static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) {
154
  mTrace("func:%s, perform delete action, row:%p", pFunc->name, pFunc);
S
Shengliang 已提交
155 156
  tfree(pFunc->pCode);
  tfree(pFunc->pComment);
S
Shengliang Guan 已提交
157 158 159
  return 0;
}

S
Shengliang 已提交
160 161
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) {
  mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
S
Shengliang Guan 已提交
162 163 164
  return 0;
}

S
Shengliang 已提交
165 166 167 168 169 170 171 172
static SFuncObj *mndAcquireFunc(SMnode *pMnode, char *funcName) {
  SSdb     *pSdb = pMnode->pSdb;
  SFuncObj *pFunc = sdbAcquire(pSdb, SDB_FUNC, funcName);
  if (pFunc == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
    terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
  }
  return pFunc;
}
S
Shengliang Guan 已提交
173

S
Shengliang 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
static void mndReleaseFunc(SMnode *pMnode, SFuncObj *pFunc) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pFunc);
}

static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate) {
  int32_t code = -1;
  STrans *pTrans = NULL;

  SFuncObj func = {0};
  memcpy(func.name, pCreate->name, TSDB_FUNC_NAME_LEN);
  func.createdTime = taosGetTimestampMs();
  func.funcType = pCreate->funcType;
  func.scriptType = pCreate->scriptType;
  func.outputType = pCreate->outputType;
  func.outputLen = pCreate->outputLen;
  func.bufSize = pCreate->bufSize;
  func.signature = pCreate->signature;
  func.commentSize = pCreate->commentSize;
  func.codeSize = pCreate->codeSize;
  func.pComment = malloc(func.commentSize);
  func.pCode = malloc(func.codeSize);
  if (func.pCode == NULL || func.pCode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
199 200
  }

S
Shengliang 已提交
201 202 203 204 205 206
  memcpy(func.pComment, pCreate->pCont, pCreate->commentSize);
  memcpy(func.pCode, pCreate->pCont + pCreate->commentSize, func.codeSize);

  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
  if (pTrans == NULL) goto CREATE_FUNC_OVER;

S
Shengliang Guan 已提交
207 208
  mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name);

S
Shengliang 已提交
209 210 211
  SSdbRaw *pRedoRaw = mndFuncActionEncode(&func);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto CREATE_FUNC_OVER;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
212

S
Shengliang 已提交
213 214 215
  SSdbRaw *pUndoRaw = mndFuncActionEncode(&func);
  if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto CREATE_FUNC_OVER;
  if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
216

S
Shengliang 已提交
217 218 219
  SSdbRaw *pCommitRaw = mndFuncActionEncode(&func);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto CREATE_FUNC_OVER;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
220

S
Shengliang 已提交
221
  if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
222

S
Shengliang 已提交
223 224 225 226 227
  code = 0;

CREATE_FUNC_OVER:
  free(func.pCode);
  free(func.pComment);
S
Shengliang Guan 已提交
228
  mndTransDrop(pTrans);
S
Shengliang 已提交
229
  return code;
S
Shengliang Guan 已提交
230 231
}

S
Shengliang 已提交
232
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) {
S
Shengliang 已提交
233
  int32_t code = -1;
S
Shengliang 已提交
234
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
S
Shengliang 已提交
235 236
  if (pTrans == NULL) goto DROP_FUNC_OVER;

S
Shengliang Guan 已提交
237 238 239
  mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);

  SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc);
S
Shengliang 已提交
240
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto DROP_FUNC_OVER;
S
Shengliang Guan 已提交
241 242 243
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);

  SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc);
S
Shengliang 已提交
244
  if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto DROP_FUNC_OVER;
S
Shengliang Guan 已提交
245 246 247
  sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);

  SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc);
S
Shengliang 已提交
248
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto DROP_FUNC_OVER;
S
Shengliang Guan 已提交
249 250
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);

S
Shengliang 已提交
251 252 253
  if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_FUNC_OVER;

  code = 0;
S
Shengliang Guan 已提交
254

S
Shengliang 已提交
255
DROP_FUNC_OVER:
S
Shengliang Guan 已提交
256
  mndTransDrop(pTrans);
S
Shengliang 已提交
257
  return code;
S
Shengliang Guan 已提交
258 259
}

S
Shengliang 已提交
260 261
static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) {
  SMnode *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
262

S
Shengliang 已提交
263
  SCreateFuncReq *pCreate = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
264 265
  pCreate->outputLen = htonl(pCreate->outputLen);
  pCreate->bufSize = htonl(pCreate->bufSize);
S
Shengliang 已提交
266
  pCreate->signature = htobe64(pCreate->signature);
S
Shengliang Guan 已提交
267 268 269 270 271
  pCreate->commentSize = htonl(pCreate->commentSize);
  pCreate->codeSize = htonl(pCreate->codeSize);

  mDebug("func:%s, start to create", pCreate->name);

S
Shengliang 已提交
272
  SFuncObj *pFunc = mndAcquireFunc(pMnode, pCreate->name);
S
Shengliang Guan 已提交
273
  if (pFunc != NULL) {
S
Shengliang 已提交
274
    mndReleaseFunc(pMnode, pFunc);
S
Shengliang 已提交
275 276 277 278 279 280 281 282
    if (pCreate->igExists) {
      mDebug("stb:%s, already exist, ignore exist is set", pCreate->name);
      return 0;
    } else {
      terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST;
      mError("func:%s, failed to create since %s", pCreate->name, terrstr());
      return -1;
    }
S
Shengliang 已提交
283
  } else if (terrno == TSDB_CODE_MND_FUNC_ALREADY_EXIST) {
S
Shengliang 已提交
284
    mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
S
Shengliang Guan 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
    return -1;
  }

  if (pCreate->name[0] == 0) {
    terrno = TSDB_CODE_MND_INVALID_FUNC_NAME;
    mError("func:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }

  if (pCreate->commentSize <= 0 || pCreate->commentSize > TSDB_FUNC_COMMENT_LEN) {
    terrno = TSDB_CODE_MND_INVALID_FUNC_COMMENT;
    mError("func:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }

  if (pCreate->codeSize <= 0 || pCreate->codeSize > TSDB_FUNC_CODE_LEN) {
    terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
    mError("func:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }

  if (pCreate->pCont[0] == 0) {
    terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
    mError("func:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }

S
Shengliang 已提交
312
  if (pCreate->bufSize <= 0 || pCreate->bufSize > TSDB_FUNC_BUF_SIZE) {
S
Shengliang Guan 已提交
313 314 315 316 317
    terrno = TSDB_CODE_MND_INVALID_FUNC_BUFSIZE;
    mError("func:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }

S
Shengliang 已提交
318
  int32_t code = mndCreateFunc(pMnode, pReq, pCreate);
S
Shengliang Guan 已提交
319 320 321 322 323 324 325 326
  if (code != 0) {
    mError("func:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }

  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

S
Shengliang 已提交
327 328 329
static int32_t mndProcessDropFuncReq(SMnodeMsg *pReq) {
  SMnode       *pMnode = pReq->pMnode;
  SDropFuncReq *pDrop = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
330 331 332 333 334 335 336 337 338

  mDebug("func:%s, start to drop", pDrop->name);

  if (pDrop->name[0] == 0) {
    terrno = TSDB_CODE_MND_INVALID_FUNC_NAME;
    mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
    return -1;
  }

S
Shengliang 已提交
339
  SFuncObj *pFunc = mndAcquireFunc(pMnode, pDrop->name);
S
Shengliang Guan 已提交
340
  if (pFunc == NULL) {
S
Shengliang 已提交
341 342 343 344 345 346 347 348
    if (pDrop->igNotExists) {
      mDebug("func:%s, not exist, ignore not exist is set", pDrop->name);
      return 0;
    } else {
      terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
      mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
      return -1;
    }
S
Shengliang Guan 已提交
349 350
  }

S
Shengliang 已提交
351
  int32_t code = mndDropFunc(pMnode, pReq, pFunc);
S
Shengliang 已提交
352 353
  mndReleaseFunc(pMnode, pFunc);

S
Shengliang Guan 已提交
354 355 356 357 358 359 360 361
  if (code != 0) {
    mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
    return -1;
  }

  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

S
Shengliang 已提交
362
static int32_t mndProcessRetrieveFuncReq(SMnodeMsg *pReq) {
S
Shengliang 已提交
363
  int32_t code = -1;
S
Shengliang 已提交
364
  SMnode *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
365

S
Shengliang 已提交
366
  SRetrieveFuncReq *pRetrieve = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
367
  pRetrieve->numOfFuncs = htonl(pRetrieve->numOfFuncs);
S
Shengliang 已提交
368 369 370 371
  if (pRetrieve->numOfFuncs <= 0 || pRetrieve->numOfFuncs > TSDB_FUNC_MAX_RETRIEVE) {
    terrno = TSDB_CODE_MND_INVALID_FUNC_RETRIEVE;
    return -1;
  }
S
Shengliang Guan 已提交
372

S
Shengliang 已提交
373 374
  int32_t fsize = sizeof(SFuncInfo) + TSDB_FUNC_CODE_LEN + TSDB_FUNC_COMMENT_LEN;
  int32_t size = sizeof(SRetrieveFuncRsp) + fsize * pRetrieve->numOfFuncs;
S
Shengliang Guan 已提交
375 376

  SRetrieveFuncRsp *pRetrieveRsp = rpcMallocCont(size);
S
Shengliang 已提交
377 378 379 380 381
  if (pRetrieveRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto FUNC_RETRIEVE_OVER;
  }

S
Shengliang Guan 已提交
382 383 384 385 386 387 388
  pRetrieveRsp->numOfFuncs = htonl(pRetrieve->numOfFuncs);
  char *pOutput = pRetrieveRsp->pFuncInfos;

  for (int32_t i = 0; i < pRetrieve->numOfFuncs; ++i) {
    char funcName[TSDB_FUNC_NAME_LEN] = {0};
    memcpy(funcName, pRetrieve->pFuncNames + i * TSDB_FUNC_NAME_LEN, TSDB_FUNC_NAME_LEN);

S
Shengliang 已提交
389
    SFuncObj *pFunc = mndAcquireFunc(pMnode, funcName);
S
Shengliang Guan 已提交
390 391 392
    if (pFunc == NULL) {
      terrno = TSDB_CODE_MND_INVALID_FUNC;
      mError("func:%s, failed to retrieve since %s", funcName, terrstr());
S
Shengliang 已提交
393
      goto FUNC_RETRIEVE_OVER;
S
Shengliang Guan 已提交
394 395 396
    }

    SFuncInfo *pFuncInfo = (SFuncInfo *)pOutput;
S
Shengliang 已提交
397
    memcpy(pFuncInfo->name, pFunc->name, TSDB_FUNC_NAME_LEN);
S
Shengliang Guan 已提交
398 399 400 401 402
    pFuncInfo->funcType = pFunc->funcType;
    pFuncInfo->scriptType = pFunc->scriptType;
    pFuncInfo->outputType = pFunc->outputType;
    pFuncInfo->outputLen = htonl(pFunc->outputLen);
    pFuncInfo->bufSize = htonl(pFunc->bufSize);
S
Shengliang 已提交
403
    pFuncInfo->signature = htobe64(pFunc->signature);
S
Shengliang Guan 已提交
404 405
    pFuncInfo->commentSize = htonl(pFunc->commentSize);
    pFuncInfo->codeSize = htonl(pFunc->codeSize);
S
Shengliang 已提交
406 407
    memcpy(pFuncInfo->pCont, pFunc->pComment, pFunc->commentSize);
    memcpy(pFuncInfo->pCont + pFunc->commentSize, pFunc->pCode, pFunc->codeSize);
S
Shengliang 已提交
408
    pOutput += (sizeof(SFuncInfo) + pFunc->commentSize + pFunc->codeSize);
S
Shengliang 已提交
409
    mndReleaseFunc(pMnode, pFunc);
S
Shengliang Guan 已提交
410 411
  }

S
Shengliang 已提交
412 413
  pReq->pCont = pRetrieveRsp;
  pReq->contLen = (int32_t)(pOutput - (char *)pRetrieveRsp);
S
Shengliang Guan 已提交
414

S
Shengliang 已提交
415 416 417 418 419 420
  code = 0;

FUNC_RETRIEVE_OVER:
  if (code != 0) rpcFreeCont(pRetrieveRsp);

  return code;
S
Shengliang Guan 已提交
421 422
}

S
Shengliang 已提交
423 424
static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
425 426 427
  SSdb   *pSdb = pMnode->pSdb;

  int32_t  cols = 0;
S
Shengliang Guan 已提交
428
  SSchema *pSchema = pMeta->pSchema;
S
Shengliang Guan 已提交
429 430 431 432

  pShow->bytes[cols] = TSDB_FUNC_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "name");
H
Haojun Liao 已提交
433
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
434 435 436 437 438
  cols++;

  pShow->bytes[cols] = PATH_MAX + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "comment");
H
Haojun Liao 已提交
439
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
440 441 442 443 444
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "aggregate");
H
Haojun Liao 已提交
445
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
446 447 448 449 450
  cols++;

  pShow->bytes[cols] = TSDB_TYPE_STR_MAX_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "outputtype");
H
Haojun Liao 已提交
451
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
452 453 454 455 456
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "create_time");
H
Haojun Liao 已提交
457
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
458 459 460 461 462
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "code_len");
H
Haojun Liao 已提交
463
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
464 465 466 467 468
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "bufsize");
H
Haojun Liao 已提交
469
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
470 471
  cols++;

H
Haojun Liao 已提交
472
  pMeta->numOfColumns = htonl(cols);
S
Shengliang Guan 已提交
473 474 475 476 477 478 479 480 481
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = sdbGetSize(pSdb, SDB_FUNC);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
S
Shengliang 已提交
482
  strcpy(pMeta->tbFname, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
483 484 485 486 487 488 489 490 491 492 493

  return 0;
}

static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int16_t len) {
  char *msg = "unknown";
  if (type >= sizeof(tDataTypes) / sizeof(tDataTypes[0])) {
    return msg;
  }

  if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
S
Shengliang Guan 已提交
494
    int32_t bytes = len > 0 ? (int32_t)(len - VARSTR_HEADER_SIZE) : len;
S
Shengliang Guan 已提交
495 496 497 498 499 500 501 502 503 504

    snprintf(buf, buflen - 1, "%s(%d)", tDataTypes[type].name, type == TSDB_DATA_TYPE_NCHAR ? bytes / 4 : bytes);
    buf[buflen - 1] = 0;

    return buf;
  }

  return tDataTypes[type].name;
}

S
Shengliang 已提交
505 506
static int32_t mndRetrieveFuncs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
  SSdb     *pSdb = pMnode->pSdb;
  int32_t   numOfRows = 0;
  SFuncObj *pFunc = NULL;
  int32_t   cols = 0;
  char     *pWrite;
  char      buf[TSDB_TYPE_STR_MAX_LEN];

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_FUNC, pShow->pIter, (void **)&pFunc);
    if (pShow->pIter == NULL) break;

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pFunc->name, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pFunc->pComment, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pFunc->funcType == TSDB_FUNC_TYPE_AGGREGATE ? 1 : 0;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, mnodeGenTypeStr(buf, TSDB_TYPE_STR_MAX_LEN, pFunc->outputType, pFunc->outputLen),
                               pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int64_t *)pWrite = pFunc->createdTime;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pFunc->codeSize;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pFunc->bufSize;
    cols++;

    numOfRows++;
    sdbRelease(pSdb, pFunc);
  }

S
Shengliang Guan 已提交
553
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
554 555 556 557 558 559 560 561
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}