mndFunc.c 19.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18 19 20
#include "mndFunc.h"
#include "mndShow.h"
#include "mndSync.h"
#include "mndTrans.h"
S
Shengliang Guan 已提交
21

S
Shengliang Guan 已提交
22
#define SDB_FUNC_VER 1
S
Shengliang Guan 已提交
23
#define SDB_FUNC_RESERVE_SIZE 64
S
Shengliang Guan 已提交
24 25 26 27 28

static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc);
static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw);
static int32_t  mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc);
static int32_t  mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc);
S
Shengliang 已提交
29
static int32_t  mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew);
S
Shengliang 已提交
30 31 32 33 34 35 36
static int32_t  mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate);
static int32_t  mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc);
static int32_t  mndProcessCreateFuncReq(SMnodeMsg *pReq);
static int32_t  mndProcessDropFuncReq(SMnodeMsg *pReq);
static int32_t  mndProcessRetrieveFuncReq(SMnodeMsg *pReq);
static int32_t  mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t  mndRetrieveFuncs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45 46 47
static void     mndCancelGetNextFunc(SMnode *pMnode, void *pIter);

int32_t mndInitFunc(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_FUNC,
                     .keyType = SDB_KEY_BINARY,
                     .encodeFp = (SdbEncodeFp)mndFuncActionEncode,
                     .decodeFp = (SdbDecodeFp)mndFuncActionDecode,
                     .insertFp = (SdbInsertFp)mndFuncActionInsert,
                     .updateFp = (SdbUpdateFp)mndFuncActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndFuncActionDelete};

S
Shengliang 已提交
48 49 50
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_FUNC, mndProcessCreateFuncReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_FUNC, mndProcessDropFuncReq);
  mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_FUNC, mndProcessRetrieveFuncReq);
S
Shengliang Guan 已提交
51

S
Shengliang 已提交
52 53 54
  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndGetFuncMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndRetrieveFuncs);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndCancelGetNextFunc);
S
Shengliang Guan 已提交
55

S
Shengliang Guan 已提交
56 57 58 59 60 61
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupFunc(SMnode *pMnode) {}

static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) {
62 63
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
64
  int32_t  size = pFunc->commentSize + pFunc->codeSize + sizeof(SFuncObj) + SDB_FUNC_RESERVE_SIZE;
S
Shengliang Guan 已提交
65
  SSdbRaw *pRaw = sdbAllocRaw(SDB_FUNC, SDB_FUNC_VER, size);
66
  if (pRaw == NULL) goto FUNC_ENCODE_OVER;
S
Shengliang Guan 已提交
67 68

  int32_t dataPos = 0;
69 70 71 72 73 74 75 76
  SDB_SET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pFunc->createdTime, FUNC_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pFunc->funcType, FUNC_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pFunc->scriptType, FUNC_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pFunc->align, FUNC_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pFunc->outputType, FUNC_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pFunc->outputLen, FUNC_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pFunc->bufSize, FUNC_ENCODE_OVER)
S
Shengliang 已提交
77
  SDB_SET_INT64(pRaw, dataPos, pFunc->signature, FUNC_ENCODE_OVER)
78 79 80 81
  SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, FUNC_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, FUNC_ENCODE_OVER)
  SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, FUNC_ENCODE_OVER)
  SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, FUNC_ENCODE_OVER)
S
Shengliang Guan 已提交
82
  SDB_SET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, FUNC_ENCODE_OVER)
83 84 85 86 87 88 89 90 91 92
  SDB_SET_DATALEN(pRaw, dataPos, FUNC_ENCODE_OVER);

  terrno = 0;

FUNC_ENCODE_OVER:
  if (terrno != 0) {
    mError("func:%s, failed to encode to raw:%p since %s", pFunc->name, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }
S
Shengliang Guan 已提交
93

S
Shengliang Guan 已提交
94
  mTrace("func:%s, encode to raw:%p, row:%p", pFunc->name, pRaw, pFunc);
S
Shengliang Guan 已提交
95 96 97 98
  return pRaw;
}

static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
99 100
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
101
  int8_t sver = 0;
102
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto FUNC_DECODE_OVER;
S
Shengliang Guan 已提交
103 104 105

  if (sver != SDB_FUNC_VER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
106
    goto FUNC_DECODE_OVER;
S
Shengliang Guan 已提交
107 108
  }

S
Shengliang 已提交
109
  SSdbRow *pRow = sdbAllocRow(sizeof(SFuncObj));
110 111
  if (pRow == NULL) goto FUNC_DECODE_OVER;

S
Shengliang Guan 已提交
112
  SFuncObj *pFunc = sdbGetRowObj(pRow);
113
  if (pFunc == NULL) goto FUNC_DECODE_OVER;
S
Shengliang Guan 已提交
114 115

  int32_t dataPos = 0;
116 117 118 119 120 121 122 123
  SDB_GET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pFunc->createdTime, FUNC_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pFunc->funcType, FUNC_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pFunc->scriptType, FUNC_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pFunc->align, FUNC_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pFunc->outputType, FUNC_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pFunc->outputLen, FUNC_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pFunc->bufSize, FUNC_DECODE_OVER)
S
Shengliang 已提交
124
  SDB_GET_INT64(pRaw, dataPos, &pFunc->signature, FUNC_DECODE_OVER)
125 126
  SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, FUNC_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, FUNC_DECODE_OVER)
S
Shengliang 已提交
127 128 129 130 131 132 133 134 135

  pFunc->pComment = calloc(1, pFunc->commentSize);
  pFunc->pCode = calloc(1, pFunc->codeSize);
  if (pFunc->pComment == NULL || pFunc->pCode == NULL) {
    goto FUNC_DECODE_OVER;
  }

  SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, FUNC_DECODE_OVER)
  SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, FUNC_DECODE_OVER)
S
Shengliang Guan 已提交
136
  SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, FUNC_DECODE_OVER)
S
Shengliang Guan 已提交
137

138 139 140 141 142 143 144 145 146 147
  terrno = 0;

FUNC_DECODE_OVER:
  if (terrno != 0) {
    mError("func:%s, failed to decode from raw:%p since %s", pFunc->name, pRaw, terrstr());
    tfree(pRow);
    return NULL;
  }

  mTrace("func:%s, decode from raw:%p, row:%p", pFunc->name, pRaw, pFunc);
S
Shengliang Guan 已提交
148 149 150 151
  return pRow;
}

static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc) {
152
  mTrace("func:%s, perform insert action, row:%p", pFunc->name, pFunc);
S
Shengliang Guan 已提交
153 154 155 156
  return 0;
}

static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) {
157
  mTrace("func:%s, perform delete action, row:%p", pFunc->name, pFunc);
S
Shengliang 已提交
158 159
  tfree(pFunc->pCode);
  tfree(pFunc->pComment);
S
Shengliang Guan 已提交
160 161 162
  return 0;
}

S
Shengliang 已提交
163 164
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) {
  mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
S
Shengliang Guan 已提交
165 166 167
  return 0;
}

S
Shengliang 已提交
168 169 170 171 172 173 174 175
static SFuncObj *mndAcquireFunc(SMnode *pMnode, char *funcName) {
  SSdb     *pSdb = pMnode->pSdb;
  SFuncObj *pFunc = sdbAcquire(pSdb, SDB_FUNC, funcName);
  if (pFunc == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
    terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
  }
  return pFunc;
}
S
Shengliang Guan 已提交
176

S
Shengliang 已提交
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
static void mndReleaseFunc(SMnode *pMnode, SFuncObj *pFunc) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pFunc);
}

static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate) {
  int32_t code = -1;
  STrans *pTrans = NULL;

  SFuncObj func = {0};
  memcpy(func.name, pCreate->name, TSDB_FUNC_NAME_LEN);
  func.createdTime = taosGetTimestampMs();
  func.funcType = pCreate->funcType;
  func.scriptType = pCreate->scriptType;
  func.outputType = pCreate->outputType;
  func.outputLen = pCreate->outputLen;
  func.bufSize = pCreate->bufSize;
  func.signature = pCreate->signature;
  func.commentSize = pCreate->commentSize;
  func.codeSize = pCreate->codeSize;
  func.pComment = malloc(func.commentSize);
  func.pCode = malloc(func.codeSize);
  if (func.pCode == NULL || func.pCode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
202 203
  }

S
Shengliang 已提交
204 205 206 207 208 209
  memcpy(func.pComment, pCreate->pCont, pCreate->commentSize);
  memcpy(func.pCode, pCreate->pCont + pCreate->commentSize, func.codeSize);

  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
  if (pTrans == NULL) goto CREATE_FUNC_OVER;

S
Shengliang Guan 已提交
210 211
  mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name);

S
Shengliang 已提交
212 213 214
  SSdbRaw *pRedoRaw = mndFuncActionEncode(&func);
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto CREATE_FUNC_OVER;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
215

S
Shengliang 已提交
216 217 218
  SSdbRaw *pUndoRaw = mndFuncActionEncode(&func);
  if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto CREATE_FUNC_OVER;
  if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
219

S
Shengliang 已提交
220 221 222
  SSdbRaw *pCommitRaw = mndFuncActionEncode(&func);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto CREATE_FUNC_OVER;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
223

S
Shengliang 已提交
224
  if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_FUNC_OVER;
S
Shengliang Guan 已提交
225

S
Shengliang 已提交
226 227 228 229 230
  code = 0;

CREATE_FUNC_OVER:
  free(func.pCode);
  free(func.pComment);
S
Shengliang Guan 已提交
231
  mndTransDrop(pTrans);
S
Shengliang 已提交
232
  return code;
S
Shengliang Guan 已提交
233 234
}

S
Shengliang 已提交
235
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) {
S
Shengliang 已提交
236
  int32_t code = -1;
S
Shengliang 已提交
237
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
S
Shengliang 已提交
238 239
  if (pTrans == NULL) goto DROP_FUNC_OVER;

S
Shengliang Guan 已提交
240 241 242
  mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);

  SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc);
S
Shengliang 已提交
243
  if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto DROP_FUNC_OVER;
S
Shengliang Guan 已提交
244 245 246
  sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);

  SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc);
S
Shengliang 已提交
247
  if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto DROP_FUNC_OVER;
S
Shengliang Guan 已提交
248 249 250
  sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);

  SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc);
S
Shengliang 已提交
251
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto DROP_FUNC_OVER;
S
Shengliang Guan 已提交
252 253
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);

S
Shengliang 已提交
254 255 256
  if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_FUNC_OVER;

  code = 0;
S
Shengliang Guan 已提交
257

S
Shengliang 已提交
258
DROP_FUNC_OVER:
S
Shengliang Guan 已提交
259
  mndTransDrop(pTrans);
S
Shengliang 已提交
260
  return code;
S
Shengliang Guan 已提交
261 262
}

S
Shengliang 已提交
263 264
static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) {
  SMnode *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
265

S
Shengliang 已提交
266
  SCreateFuncReq *pCreate = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
267 268
  pCreate->outputLen = htonl(pCreate->outputLen);
  pCreate->bufSize = htonl(pCreate->bufSize);
S
Shengliang 已提交
269
  pCreate->signature = htobe64(pCreate->signature);
S
Shengliang Guan 已提交
270 271 272 273 274
  pCreate->commentSize = htonl(pCreate->commentSize);
  pCreate->codeSize = htonl(pCreate->codeSize);

  mDebug("func:%s, start to create", pCreate->name);

S
Shengliang 已提交
275
  SFuncObj *pFunc = mndAcquireFunc(pMnode, pCreate->name);
S
Shengliang Guan 已提交
276
  if (pFunc != NULL) {
S
Shengliang 已提交
277
    mndReleaseFunc(pMnode, pFunc);
S
Shengliang 已提交
278 279 280 281 282 283 284 285
    if (pCreate->igExists) {
      mDebug("stb:%s, already exist, ignore exist is set", pCreate->name);
      return 0;
    } else {
      terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST;
      mError("func:%s, failed to create since %s", pCreate->name, terrstr());
      return -1;
    }
S
Shengliang 已提交
286
  } else if (terrno == TSDB_CODE_MND_FUNC_ALREADY_EXIST) {
S
Shengliang 已提交
287
    mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
S
Shengliang Guan 已提交
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
    return -1;
  }

  if (pCreate->name[0] == 0) {
    terrno = TSDB_CODE_MND_INVALID_FUNC_NAME;
    mError("func:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }

  if (pCreate->commentSize <= 0 || pCreate->commentSize > TSDB_FUNC_COMMENT_LEN) {
    terrno = TSDB_CODE_MND_INVALID_FUNC_COMMENT;
    mError("func:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }

  if (pCreate->codeSize <= 0 || pCreate->codeSize > TSDB_FUNC_CODE_LEN) {
    terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
    mError("func:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }

  if (pCreate->pCont[0] == 0) {
    terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
    mError("func:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }

S
Shengliang 已提交
315
  if (pCreate->bufSize <= 0 || pCreate->bufSize > TSDB_FUNC_BUF_SIZE) {
S
Shengliang Guan 已提交
316 317 318 319 320
    terrno = TSDB_CODE_MND_INVALID_FUNC_BUFSIZE;
    mError("func:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }

S
Shengliang 已提交
321
  int32_t code = mndCreateFunc(pMnode, pReq, pCreate);
S
Shengliang Guan 已提交
322 323 324 325 326 327 328 329
  if (code != 0) {
    mError("func:%s, failed to create since %s", pCreate->name, terrstr());
    return -1;
  }

  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

S
Shengliang 已提交
330 331 332
static int32_t mndProcessDropFuncReq(SMnodeMsg *pReq) {
  SMnode       *pMnode = pReq->pMnode;
  SDropFuncReq *pDrop = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
333 334 335 336 337 338 339 340 341

  mDebug("func:%s, start to drop", pDrop->name);

  if (pDrop->name[0] == 0) {
    terrno = TSDB_CODE_MND_INVALID_FUNC_NAME;
    mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
    return -1;
  }

S
Shengliang 已提交
342
  SFuncObj *pFunc = mndAcquireFunc(pMnode, pDrop->name);
S
Shengliang Guan 已提交
343
  if (pFunc == NULL) {
S
Shengliang 已提交
344 345 346 347 348 349 350 351
    if (pDrop->igNotExists) {
      mDebug("func:%s, not exist, ignore not exist is set", pDrop->name);
      return 0;
    } else {
      terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
      mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
      return -1;
    }
S
Shengliang Guan 已提交
352 353
  }

S
Shengliang 已提交
354
  int32_t code = mndDropFunc(pMnode, pReq, pFunc);
S
Shengliang 已提交
355 356
  mndReleaseFunc(pMnode, pFunc);

S
Shengliang Guan 已提交
357 358 359 360 361 362 363 364
  if (code != 0) {
    mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
    return -1;
  }

  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

S
Shengliang 已提交
365
static int32_t mndProcessRetrieveFuncReq(SMnodeMsg *pReq) {
S
Shengliang 已提交
366
  int32_t code = -1;
S
Shengliang 已提交
367
  SMnode *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
368

S
Shengliang 已提交
369
  SRetrieveFuncReq *pRetrieve = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
370
  pRetrieve->numOfFuncs = htonl(pRetrieve->numOfFuncs);
S
Shengliang 已提交
371 372 373 374
  if (pRetrieve->numOfFuncs <= 0 || pRetrieve->numOfFuncs > TSDB_FUNC_MAX_RETRIEVE) {
    terrno = TSDB_CODE_MND_INVALID_FUNC_RETRIEVE;
    return -1;
  }
S
Shengliang Guan 已提交
375

S
Shengliang 已提交
376 377
  int32_t fsize = sizeof(SFuncInfo) + TSDB_FUNC_CODE_LEN + TSDB_FUNC_COMMENT_LEN;
  int32_t size = sizeof(SRetrieveFuncRsp) + fsize * pRetrieve->numOfFuncs;
S
Shengliang Guan 已提交
378 379

  SRetrieveFuncRsp *pRetrieveRsp = rpcMallocCont(size);
S
Shengliang 已提交
380 381 382 383 384
  if (pRetrieveRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto FUNC_RETRIEVE_OVER;
  }

S
Shengliang Guan 已提交
385 386 387 388 389 390 391
  pRetrieveRsp->numOfFuncs = htonl(pRetrieve->numOfFuncs);
  char *pOutput = pRetrieveRsp->pFuncInfos;

  for (int32_t i = 0; i < pRetrieve->numOfFuncs; ++i) {
    char funcName[TSDB_FUNC_NAME_LEN] = {0};
    memcpy(funcName, pRetrieve->pFuncNames + i * TSDB_FUNC_NAME_LEN, TSDB_FUNC_NAME_LEN);

S
Shengliang 已提交
392
    SFuncObj *pFunc = mndAcquireFunc(pMnode, funcName);
S
Shengliang Guan 已提交
393 394 395
    if (pFunc == NULL) {
      terrno = TSDB_CODE_MND_INVALID_FUNC;
      mError("func:%s, failed to retrieve since %s", funcName, terrstr());
S
Shengliang 已提交
396
      goto FUNC_RETRIEVE_OVER;
S
Shengliang Guan 已提交
397 398 399
    }

    SFuncInfo *pFuncInfo = (SFuncInfo *)pOutput;
S
Shengliang 已提交
400
    memcpy(pFuncInfo->name, pFunc->name, TSDB_FUNC_NAME_LEN);
S
Shengliang Guan 已提交
401 402 403 404 405
    pFuncInfo->funcType = pFunc->funcType;
    pFuncInfo->scriptType = pFunc->scriptType;
    pFuncInfo->outputType = pFunc->outputType;
    pFuncInfo->outputLen = htonl(pFunc->outputLen);
    pFuncInfo->bufSize = htonl(pFunc->bufSize);
S
Shengliang 已提交
406
    pFuncInfo->signature = htobe64(pFunc->signature);
S
Shengliang Guan 已提交
407 408
    pFuncInfo->commentSize = htonl(pFunc->commentSize);
    pFuncInfo->codeSize = htonl(pFunc->codeSize);
S
Shengliang 已提交
409 410
    memcpy(pFuncInfo->pCont, pFunc->pComment, pFunc->commentSize);
    memcpy(pFuncInfo->pCont + pFunc->commentSize, pFunc->pCode, pFunc->codeSize);
S
Shengliang 已提交
411
    pOutput += (sizeof(SFuncInfo) + pFunc->commentSize + pFunc->codeSize);
S
Shengliang 已提交
412
    mndReleaseFunc(pMnode, pFunc);
S
Shengliang Guan 已提交
413 414
  }

S
Shengliang 已提交
415 416
  pReq->pCont = pRetrieveRsp;
  pReq->contLen = (int32_t)(pOutput - (char *)pRetrieveRsp);
S
Shengliang Guan 已提交
417

S
Shengliang 已提交
418 419 420 421 422 423
  code = 0;

FUNC_RETRIEVE_OVER:
  if (code != 0) rpcFreeCont(pRetrieveRsp);

  return code;
S
Shengliang Guan 已提交
424 425
}

S
Shengliang 已提交
426 427
static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
428 429 430
  SSdb   *pSdb = pMnode->pSdb;

  int32_t  cols = 0;
S
Shengliang Guan 已提交
431
  SSchema *pSchema = pMeta->pSchema;
S
Shengliang Guan 已提交
432 433 434 435

  pShow->bytes[cols] = TSDB_FUNC_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "name");
H
Haojun Liao 已提交
436
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
437 438 439 440 441
  cols++;

  pShow->bytes[cols] = PATH_MAX + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "comment");
H
Haojun Liao 已提交
442
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
443 444 445 446 447
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "aggregate");
H
Haojun Liao 已提交
448
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
449 450 451 452 453
  cols++;

  pShow->bytes[cols] = TSDB_TYPE_STR_MAX_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "outputtype");
H
Haojun Liao 已提交
454
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
455 456 457 458 459
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "create_time");
H
Haojun Liao 已提交
460
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
461 462 463 464 465
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "code_len");
H
Haojun Liao 已提交
466
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
467 468 469 470 471
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "bufsize");
H
Haojun Liao 已提交
472
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
473 474
  cols++;

H
Haojun Liao 已提交
475
  pMeta->numOfColumns = htonl(cols);
S
Shengliang Guan 已提交
476 477 478 479 480 481 482 483 484
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = sdbGetSize(pSdb, SDB_FUNC);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
485
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
486 487 488 489 490 491 492 493 494 495 496

  return 0;
}

static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int16_t len) {
  char *msg = "unknown";
  if (type >= sizeof(tDataTypes) / sizeof(tDataTypes[0])) {
    return msg;
  }

  if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
S
Shengliang Guan 已提交
497
    int32_t bytes = len > 0 ? (int32_t)(len - VARSTR_HEADER_SIZE) : len;
S
Shengliang Guan 已提交
498 499 500 501 502 503 504 505 506 507

    snprintf(buf, buflen - 1, "%s(%d)", tDataTypes[type].name, type == TSDB_DATA_TYPE_NCHAR ? bytes / 4 : bytes);
    buf[buflen - 1] = 0;

    return buf;
  }

  return tDataTypes[type].name;
}

S
Shengliang 已提交
508 509
static int32_t mndRetrieveFuncs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
  SSdb     *pSdb = pMnode->pSdb;
  int32_t   numOfRows = 0;
  SFuncObj *pFunc = NULL;
  int32_t   cols = 0;
  char     *pWrite;
  char      buf[TSDB_TYPE_STR_MAX_LEN];

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_FUNC, pShow->pIter, (void **)&pFunc);
    if (pShow->pIter == NULL) break;

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pFunc->name, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pFunc->pComment, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pFunc->funcType == TSDB_FUNC_TYPE_AGGREGATE ? 1 : 0;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, mnodeGenTypeStr(buf, TSDB_TYPE_STR_MAX_LEN, pFunc->outputType, pFunc->outputLen),
                               pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int64_t *)pWrite = pFunc->createdTime;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pFunc->codeSize;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pFunc->bufSize;
    cols++;

    numOfRows++;
    sdbRelease(pSdb, pFunc);
  }

S
Shengliang Guan 已提交
556
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
557 558 559 560 561 562 563 564
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}