vnodeSvr.c 31.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

18 19 20 21 22 23 24 25 26
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
27 28
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
29
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
S
Shengliang Guan 已提交
30
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
31
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
32

33
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
34
  int32_t  code = 0;
H
Hongze Cheng 已提交
35
  SDecoder dc = {0};
H
Hongze Cheng 已提交
36

H
Hongze Cheng 已提交
37 38 39 40 41 42 43 44 45 46 47
  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      int64_t ctime = taosGetTimestampMs();
      int32_t nReqs;

      tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
      tStartDecode(&dc);

      tDecodeI32v(&dc, &nReqs);
      for (int32_t iReq = 0; iReq < nReqs; iReq++) {
        tb_uid_t uid = tGenIdPI64();
H
Hongze Cheng 已提交
48
        char    *name = NULL;
H
Hongze Cheng 已提交
49 50 51
        tStartDecode(&dc);

        tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
52
        tDecodeCStr(&dc, &name);
H
Hongze Cheng 已提交
53 54 55
        *(int64_t *)(dc.data + dc.pos) = uid;
        *(int64_t *)(dc.data + dc.pos + 8) = ctime;

56
        vTrace("vgId:%d, table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid);
H
Hongze Cheng 已提交
57 58 59 60 61 62 63 64 65 66 67
        tEndDecode(&dc);
      }

      tEndDecode(&dc);
      tDecoderClear(&dc);
    } break;
    case TDMT_VND_SUBMIT: {
      SSubmitMsgIter msgIter = {0};
      SSubmitReq    *pSubmitReq = (SSubmitReq *)pMsg->pCont;
      SSubmitBlk    *pBlock = NULL;
      int64_t        ctime = taosGetTimestampMs();
H
Hongze Cheng 已提交
68
      tb_uid_t       uid;
H
Hongze Cheng 已提交
69 70 71 72 73 74 75 76

      tInitSubmitMsgIter(pSubmitReq, &msgIter);

      for (;;) {
        tGetSubmitMsgNext(&msgIter, &pBlock);
        if (pBlock == NULL) break;

        if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
77
          char *name = NULL;
H
Hongze Cheng 已提交
78

H
Hongze Cheng 已提交
79 80 81 82
          tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
          tStartDecode(&dc);

          tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
83 84 85 86 87 88
          tDecodeCStr(&dc, &name);

          uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
          if (uid == 0) {
            uid = tGenIdPI64();
          }
H
Hongze Cheng 已提交
89
          *(int64_t *)(dc.data + dc.pos) = uid;
H
Hongze Cheng 已提交
90
          *(int64_t *)(dc.data + dc.pos + 8) = ctime;
H
Hongze Cheng 已提交
91
          pBlock->uid = htobe64(uid);
H
Hongze Cheng 已提交
92 93 94 95 96 97 98

          tEndDecode(&dc);
          tDecoderClear(&dc);
        }
      }

    } break;
H
Hongze Cheng 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
    case TDMT_VND_DELETE: {
      int32_t     size;
      int32_t     ret;
      uint8_t    *pCont;
      SEncoder   *pCoder = &(SEncoder){0};
      SDeleteRes  res = {0};
      SReadHandle handle = {
          .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};

      code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
      if (code) goto _err;

      // malloc and encode
      tEncodeSize(tEncodeDeleteRes, &res, size, ret);
      pCont = rpcMallocCont(size + sizeof(SMsgHead));

115 116
      ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
      ((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
H
Hongze Cheng 已提交
117 118 119 120 121 122 123 124 125 126 127

      tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
      tEncodeDeleteRes(pCoder, &res);
      tEncoderClear(pCoder);

      rpcFreeCont(pMsg->pCont);
      pMsg->pCont = pCont;
      pMsg->contLen = size + sizeof(SMsgHead);

      taosArrayDestroy(res.uidList);
    } break;
H
Hongze Cheng 已提交
128 129 130
    default:
      break;
  }
H
Hongze Cheng 已提交
131

S
Shengliang Guan 已提交
132
  return code;
H
Hongze Cheng 已提交
133 134 135 136

_err:
  vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
137 138
}

139
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
140 141 142 143
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
H
Hongze Cheng 已提交
144

145
  vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
146
         version);
H
Hongze Cheng 已提交
147

H
Hongze Cheng 已提交
148
  pVnode->state.applied = version;
H
Hongze Cheng 已提交
149
  pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
H
Hongze Cheng 已提交
150

H
Hongze Cheng 已提交
151 152 153
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
154 155

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
156
    /* META */
H
Hongze Cheng 已提交
157
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
158
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
159
      break;
H
Hongze Cheng 已提交
160
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
161
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
162
      break;
H
Hongze Cheng 已提交
163
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
164
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
165
      break;
H
Hongze Cheng 已提交
166
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
167
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
168 169
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
170
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
171
      break;
H
Hongze Cheng 已提交
172
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
173
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
174
      break;
175
    case TDMT_VND_DROP_TTL_TABLE:
176
      if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
177
      break;
S
Shengliang Guan 已提交
178 179 180 181
    case TDMT_VND_TRIM:
      if (vnodeProcessTrimReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
      break;
    case TDMT_VND_CREATE_SMA:
182
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
183
      break;
H
Hongze Cheng 已提交
184
    /* TSDB */
H
Hongze Cheng 已提交
185
    case TDMT_VND_SUBMIT:
H
Hongze Cheng 已提交
186
      if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
187
      break;
D
dapan1121 已提交
188
    case TDMT_VND_DELETE:
H
Hongze Cheng 已提交
189
      if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
190
      break;
H
Hongze Cheng 已提交
191
    /* TQ */
L
Liu Jicong 已提交
192 193 194
    case TDMT_VND_MQ_VG_CHANGE:
      if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                               pMsg->contLen - sizeof(SMsgHead)) < 0) {
195
        goto _err;
L
Liu Jicong 已提交
196 197
      }
      break;
L
Liu Jicong 已提交
198 199
    case TDMT_VND_MQ_VG_DELETE:
      if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
200 201 202 203 204 205 206
        goto _err;
      }
      break;
    case TDMT_VND_MQ_COMMIT_OFFSET:
      if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                   pMsg->contLen - sizeof(SMsgHead)) < 0) {
        goto _err;
L
Liu Jicong 已提交
207 208
      }
      break;
L
Liu Jicong 已提交
209 210 211 212 213 214
    case TDMT_VND_CHECK_ALTER_INFO:
      if (tqProcessCheckAlterInfoReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                     pMsg->contLen - sizeof(SMsgHead)) < 0) {
        goto _err;
      }
      break;
215
    case TDMT_STREAM_TASK_DEPLOY: {
L
Liu Jicong 已提交
216 217
      if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                 pMsg->contLen - sizeof(SMsgHead)) < 0) {
218
        goto _err;
H
Hongze Cheng 已提交
219 220
      }
    } break;
L
Liu Jicong 已提交
221
    case TDMT_STREAM_TASK_DROP: {
L
Liu Jicong 已提交
222 223 224 225
      if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
        goto _err;
      }
    } break;
226 227 228
    case TDMT_VND_ALTER_CONFIRM:
      vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
      break;
S
Shengliang Guan 已提交
229
    case TDMT_VND_ALTER_HASHRANGE:
230
      vnodeProcessAlterHashRangeReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
231
      break;
232
    case TDMT_VND_ALTER_CONFIG:
233
      vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
234
      break;
H
Hongze Cheng 已提交
235 236
    case TDMT_VND_COMMIT:
      goto _do_commit;
H
Hongze Cheng 已提交
237 238 239 240 241
    default:
      ASSERT(0);
      break;
  }

242
  vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
H
Hongze Cheng 已提交
243

244
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
S
Shengliang Guan 已提交
245
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
246 247 248
    return -1;
  }

H
Hongze Cheng 已提交
249
  // commit if need
H
Hongze Cheng 已提交
250
  if (vnodeShouldCommit(pVnode)) {
H
Hongze Cheng 已提交
251
  _do_commit:
S
Shengliang Guan 已提交
252
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
253 254 255 256 257
    // commit current change
    vnodeCommit(pVnode);

    // start a new one
    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
258 259 260
  }

  return 0;
H
Hongze Cheng 已提交
261 262

_err:
S
Shengliang Guan 已提交
263
  vError("vgId:%d, process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
264 265
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
266 267
}

268
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
269
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
270 271 272 273 274 275 276
    return 0;
  }

  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg);
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
277
  vTrace("message in vnode query queue is processing");
D
dapan1121 已提交
278
  if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
279 280 281 282
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

283
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
284
  switch (pMsg->msgType) {
D
dapan1121 已提交
285
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
286
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
287
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
288
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
289
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
290 291 292 293 294 295
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

296
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
H
Hongze Cheng 已提交
297
  vTrace("message in fetch queue is processing");
M
Minglei Jin 已提交
298
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META ||
D
dapan1121 已提交
299
       pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType = TDMT_VND_BATCH_META) &&
M
Minglei Jin 已提交
300
      !vnodeIsLeader(pVnode)) {
301 302 303 304
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

H
Hongze Cheng 已提交
305 306
  char   *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
C
Cary Xu 已提交
307

H
Hongze Cheng 已提交
308
  switch (pMsg->msgType) {
D
dapan1121 已提交
309
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
310
    case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
311
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
312
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
313
      return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
314
    case TDMT_SCH_CANCEL_TASK:
D
dapan1121 已提交
315
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
316
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
317
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
318
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
319
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
320
    case TDMT_VND_TABLE_META:
D
dapan1121 已提交
321
      return vnodeGetTableMeta(pVnode, pMsg, true);
D
dapan1121 已提交
322
    case TDMT_VND_TABLE_CFG:
D
dapan1121 已提交
323 324 325
      return vnodeGetTableCfg(pVnode, pMsg, true);
    case TDMT_VND_BATCH_META:
      return vnodeGetBatchMeta(pVnode, pMsg);
H
Hongze Cheng 已提交
326
    case TDMT_VND_CONSUME:
L
Liu Jicong 已提交
327
      return tqProcessPollReq(pVnode->pTq, pMsg);
328 329 330
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
331
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
332
    case TDMT_STREAM_TASK_RECOVER:
L
Liu Jicong 已提交
333
      return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
334 335
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
336
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
337
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
338
    case TDMT_STREAM_TASK_RECOVER_RSP:
L
Liu Jicong 已提交
339
      return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
340 341
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
342 343 344 345 346 347 348 349 350 351
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO

S
slzhou 已提交
352
  blockDebugShowDataBlocks(data, __func__);
353
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
354 355
}

D
dapan1121 已提交
356 357 358 359 360 361 362
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

S
Shengliang Guan 已提交
363
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
364
  int32_t     code = 0;
S
Shengliang Guan 已提交
365 366 367
  SVTrimDbReq trimReq = {0};

  vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
H
Hongze Cheng 已提交
368 369 370 371 372

  // decode
  if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
S
Shengliang Guan 已提交
373 374
  }

H
Hongze Cheng 已提交
375 376 377 378 379 380
  // process
  code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
  if (code) goto _exit;

_exit:
  return code;
S
Shengliang Guan 已提交
381 382
}

L
Liu Jicong 已提交
383
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
384 385 386
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
387 388 389 390 391 392
  SVDropTtlTableReq ttlReq = {0};
  if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto end;
  }

393
  vDebug("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp);
S
Shengliang Guan 已提交
394
  int32_t ret = metaTtlDropTable(pVnode->pMeta, ttlReq.timestamp, tbUids);
L
Liu Jicong 已提交
395
  if (ret != 0) {
396 397
    goto end;
  }
L
Liu Jicong 已提交
398
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
399 400
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
401 402 403 404 405 406

end:
  taosArrayDestroy(tbUids);
  return ret;
}

407
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
408
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
409
  SDecoder       coder;
H
Hongze Cheng 已提交
410

H
Hongze Cheng 已提交
411 412 413 414 415 416
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
417
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
418 419

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
420 421
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
422 423
  }

H
Hongze Cheng 已提交
424
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
425 426
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
427 428
  }

429
  if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) {
430 431 432
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
433

M
Minglei Jin 已提交
434 435
  // taosMemoryFree(req.schemaRow.pSchema);
  // taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
436
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
437
  return 0;
H
Hongze Cheng 已提交
438 439

_err:
M
Minglei Jin 已提交
440 441
  taosMemoryFree(req.schemaRow.pSchema);
  taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
442
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
443
  return -1;
H
Hongze Cheng 已提交
444 445
}

446
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
447
  SDecoder           decoder = {0};
448
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
449 450
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
451 452 453
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
454
  STbUidStore       *pStore = NULL;
455
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
456 457

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
458 459 460
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
461

H
Hongze Cheng 已提交
462
  // decode
H
Hongze Cheng 已提交
463 464
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
465 466 467 468
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
469

H
Hongze Cheng 已提交
470
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
471 472
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
473 474 475 476 477
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
478
  // loop to create table
479
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
480
    pCreateReq = req.pReqs + iReq;
H
Hongze Cheng 已提交
481

C
Cary Xu 已提交
482 483 484 485 486
    if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
      rcode = -1;
      goto _exit;
    }
    
H
Hongze Cheng 已提交
487 488 489 490 491 492 493 494 495
    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
H
Hongze Cheng 已提交
496
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
H
Hongze Cheng 已提交
497 498 499 500 501
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
502
    } else {
H
Hongze Cheng 已提交
503
      cRsp.code = TSDB_CODE_SUCCESS;
504
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
505
      taosArrayPush(tbUids, &pCreateReq->uid);
H
Hongze Cheng 已提交
506
    }
H
Hongze Cheng 已提交
507 508

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
509 510
  }

511
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
512 513
  tdUpdateTbUidList(pVnode->pSma, pStore);
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
514

H
Hongze Cheng 已提交
515
  // prepare rsp
H
Hongze Cheng 已提交
516 517
  SEncoder encoder = {0};
  int32_t  ret = 0;
wafwerar's avatar
wafwerar 已提交
518
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
519 520 521 522 523 524
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
525 526
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
H
Hongze Cheng 已提交
527

H
Hongze Cheng 已提交
528
_exit:
wmmhello's avatar
wmmhello 已提交
529 530 531 532
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
    taosArrayDestroy(pCreateReq->ctb.tagName);
  }
H
Hongze Cheng 已提交
533
  taosArrayDestroy(rsp.pArray);
534
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
535 536
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
537
  return rcode;
H
Hongze Cheng 已提交
538 539
}

540
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
556
  }
H
Hongze Cheng 已提交
557 558 559 560 561 562 563 564 565

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
566 567 568
  return 0;
}

569
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
570
  SVDropStbReq req = {0};
571
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
572
  SDecoder     decoder = {0};
573
  SArray      *tbUidList = NULL;
H
Hongze Cheng 已提交
574 575 576 577 578 579

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
580 581
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
582 583 584 585 586
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
587 588 589 590 591 592 593 594
  tbUidList = taosArrayInit(8, sizeof(int64_t));
  if (tbUidList == NULL) goto _exit;
  if (metaDropSTable(pVnode->pMeta, version, &req, tbUidList) < 0) {
    rcode = terrno;
    goto _exit;
  }

  if (tqUpdateTbUidList(pVnode->pTq, tbUidList, false) < 0) {
595 596 597
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
598

599 600 601 602 603
  if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
604 605
  // return rsp
_exit:
606
  if (tbUidList) taosArrayDestroy(tbUidList);
H
Hongze Cheng 已提交
607
  pRsp->code = rcode;
H
Hongze Cheng 已提交
608
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
609 610 611
  return 0;
}

612
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
613 614 615
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
616 617
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
618 619
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
620 621 622 623 624 625 626 627 628 629

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
630
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
631
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
632 633
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
634 635 636
  }

  // process
D
dapan1121 已提交
637
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq, &vMetaRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
638
    vAlterTbRsp.code = terrno;
H
Hongze Cheng 已提交
639
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
640 641
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
642 643
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
644

D
dapan1121 已提交
645 646 647 648 649
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
650 651 652 653 654 655
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
M
Minglei Jin 已提交
656 657 658
  if (vMetaRsp.pSchemas) {
    taosMemoryFree(vMetaRsp.pSchemas);
  }
H
Hongze Cheng 已提交
659 660 661
  return 0;
}

662
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
663 664
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
665
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
666
  SEncoder         encoder = {0};
667
  int32_t          ret;
668
  SArray          *tbUids = NULL;
H
Hongze Cheng 已提交
669

H
Hongze Cheng 已提交
670
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
671 672 673
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
674 675

  // decode req
H
Hongze Cheng 已提交
676 677
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
678 679 680 681 682
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
683 684

  // process req
685
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
686
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
687 688
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

689
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
690 691
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
H
Hongze Cheng 已提交
692

H
Hongze Cheng 已提交
693
    /* code */
694
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
H
Hongze Cheng 已提交
695
    if (ret < 0) {
H
Hongze Cheng 已提交
696 697 698 699 700
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
701
    } else {
H
Hongze Cheng 已提交
702
      dropTbRsp.code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
703 704 705 706 707
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

708 709
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);

H
Hongze Cheng 已提交
710
_exit:
711
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
712
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
713 714 715 716 717
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
718
  taosArrayDestroy(rsp.pArray);
H
Hongze Cheng 已提交
719 720 721
  return 0;
}

722 723
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
724 725 726 727
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
728
  int32_t        rv = -1;
D
dapan 已提交
729 730 731

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
C
Cary Xu 已提交
732
  if (!pSchema || (suid != msgIter->suid) || rv != TD_ROW_SVER(blkIter.row)) {
D
dapan 已提交
733 734 735
    if (pSchema) {
      taosMemoryFreeClear(pSchema);
    }
C
Cary Xu 已提交
736
    pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row));  // TODO: use the real schema
D
dapan 已提交
737 738
    if (pSchema) {
      suid = msgIter->suid;
C
Cary Xu 已提交
739
      rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756
    }
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

757
static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
C
Cary Xu 已提交
758 759 760 761 762 763 764 765 766
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SMeta         *pMeta = pVnode->pMeta;
  SSubmitBlk    *pBlock = NULL;

  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
H
Hongze Cheng 已提交
767

D
dapan 已提交
768 769
    vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
  }
C
Cary Xu 已提交
770 771 772 773

  return 0;
}

774
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
775
  SSubmitReq    *pSubmitReq = (SSubmitReq *)pReq;
H
Hongze Cheng 已提交
776
  SSubmitRsp     submitRsp = {0};
H
Hongze Cheng 已提交
777 778 779 780
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock;
  SSubmitRsp     rsp = {0};
  SVCreateTbReq  createTbReq = {0};
H
Hongze Cheng 已提交
781
  SDecoder       decoder = {0};
H
Hongze Cheng 已提交
782
  int32_t        nRows;
H
Hongze Cheng 已提交
783 784
  int32_t        tsize, ret;
  SEncoder       encoder = {0};
785
  SArray        *newTbUids = NULL;
786
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
787 788

  pRsp->code = 0;
C
Cary Xu 已提交
789
  pSubmitReq->version = version;
C
Cary Xu 已提交
790

C
Cary Xu 已提交
791 792 793 794
#ifdef TD_DEBUG_PRINT_ROW
  vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif

C
Cary Xu 已提交
795 796 797 798 799
  if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
    pRsp->code = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
800
  // handle the request
H
Hongze Cheng 已提交
801 802 803
  if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    goto _exit;
H
Hongze Cheng 已提交
804 805
  }

C
Cary Xu 已提交
806 807
  submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
  newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
C
Cary Xu 已提交
808
  if (!submitRsp.pArray || !newTbUids) {
809 810 811 812
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
813
  for (;;) {
H
Hongze Cheng 已提交
814 815 816
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
817 818
    SSubmitBlkRsp submitBlkRsp = {0};

H
Hongze Cheng 已提交
819 820
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
821 822
      submitBlkRsp.hashMeta = 1;

H
Hongze Cheng 已提交
823 824
      tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
H
Hongze Cheng 已提交
825
        pRsp->code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
826
        tDecoderClear(&decoder);
wmmhello's avatar
wmmhello 已提交
827
        taosArrayDestroy(createTbReq.ctb.tagName);
H
Hongze Cheng 已提交
828 829 830
        goto _exit;
      }

C
Cary Xu 已提交
831 832 833 834 835 836 837
      if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
        pRsp->code = terrno;
        tDecoderClear(&decoder);
        taosArrayDestroy(createTbReq.ctb.tagName);
        goto _exit;
      }

H
Hongze Cheng 已提交
838 839
      if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
H
Hongze Cheng 已提交
840
          submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
841
          tDecoderClear(&decoder);
wmmhello's avatar
wmmhello 已提交
842
          taosArrayDestroy(createTbReq.ctb.tagName);
H
Hongze Cheng 已提交
843 844 845
          goto _exit;
        }
      }
846
      taosArrayPush(newTbUids, &createTbReq.uid);
H
Hongze Cheng 已提交
847

H
Hongze Cheng 已提交
848
      submitBlkRsp.uid = createTbReq.uid;
D
dapan 已提交
849
      submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
D
dapan 已提交
850
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
851

H
Hongze Cheng 已提交
852 853 854 855 856 857 858
      msgIter.uid = createTbReq.uid;
      if (createTbReq.type == TSDB_CHILD_TABLE) {
        msgIter.suid = createTbReq.ctb.suid;
      } else {
        msgIter.suid = 0;
      }

wmmhello's avatar
wmmhello 已提交
859
#ifdef TD_DEBUG_PRINT_ROW
D
dapan 已提交
860
      vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
wmmhello's avatar
wmmhello 已提交
861
#endif
H
Hongze Cheng 已提交
862
      tDecoderClear(&decoder);
wmmhello's avatar
wmmhello 已提交
863
      taosArrayDestroy(createTbReq.ctb.tagName);
D
dapan1121 已提交
864 865 866
    } else {
      submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
867 868
    }

H
Hongze Cheng 已提交
869
    if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
870
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
871 872
    }

H
Hongze Cheng 已提交
873 874 875
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
    taosArrayPush(submitRsp.pArray, &submitBlkRsp);
H
Hongze Cheng 已提交
876
  }
877
  tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
878

H
Hongze Cheng 已提交
879
_exit:
880
  taosArrayDestroy(newTbUids);
H
Hongze Cheng 已提交
881 882 883 884 885 886 887 888
  tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  pRsp->pCont = rpcMallocCont(tsize);
  pRsp->contLen = tsize;
  tEncoderInit(&encoder, pRsp->pCont, tsize);
  tEncodeSSubmitRsp(&encoder, &submitRsp);
  tEncoderClear(&encoder);

  for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
D
dapan 已提交
889
    taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
H
Hongze Cheng 已提交
890 891 892
  }

  taosArrayDestroy(submitRsp.pArray);
H
Hongze Cheng 已提交
893

894
  // TODO: the partial success scenario and the error case
H
Hongze Cheng 已提交
895 896
  // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
  // 1/level 2.
897
  // TODO: refactor
C
Cary Xu 已提交
898
  if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
L
Liu Jicong 已提交
899
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
900
  }
C
Cary Xu 已提交
901

L
Liu Jicong 已提交
902 903
  vDebug("successful submit in vg %d version %ld", pVnode->config.vgId, version);

H
Hongze Cheng 已提交
904
  return 0;
L
Liu Jicong 已提交
905
}
906

907
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
908
  SVCreateTSmaReq req = {0};
H
Hongze Cheng 已提交
909
  SDecoder        coder;
910

C
Cary Xu 已提交
911 912 913 914 915 916
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
917 918 919 920 921

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
922 923
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
924
    goto _err;
925
  }
C
Cary Xu 已提交
926

C
Cary Xu 已提交
927
  if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
C
Cary Xu 已提交
928
    if (pRsp) pRsp->code = terrno;
929
    goto _err;
930
  }
C
Cary Xu 已提交
931

932
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
933
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
C
Cary Xu 已提交
934
         req.indexName, req.indexUid, version, req.tableUid);
H
Hongze Cheng 已提交
935
  return 0;
936 937 938

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
939
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
C
Cary Xu 已提交
940
         TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr());
941
  return -1;
L
Liu Jicong 已提交
942
}
C
Cary Xu 已提交
943 944 945 946 947 948 949 950 951 952 953 954

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
955 956 957 958 959 960 961 962 963

static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  vInfo("vgId:%d, alter replica confim msg is processed", TD_VID(pVnode));
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  return 0;
964
}
S
Shengliang Guan 已提交
965

966
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
967 968
  vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));

969 970
  // todo
  // 1. stop work
S
Shengliang Guan 已提交
971 972 973
  // 2. adjust hash range / compact / remove wals / rename vgroups
  // 3. reload sync
  return 0;
M
Minghao Li 已提交
974
}
H
Hongze Cheng 已提交
975

976 977 978 979 980 981 982 983 984
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SAlterVnodeReq alterReq = {0};
  if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return TSDB_CODE_INVALID_MSG;
  }

  vInfo("vgId:%d, start to alter vnode config, cacheLast:%d cacheLastSize:%d", TD_VID(pVnode), alterReq.cacheLast,
        alterReq.cacheLastSize);
985 986 987 988 989
  if (pVnode->config.cacheLastSize != alterReq.cacheLastSize) {
    pVnode->config.cacheLastSize = alterReq.cacheLastSize;
    // TODO: save config
    tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
  }
990 991 992
  return 0;
}

H
Hongze Cheng 已提交
993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
    code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
                               pRes->skey, pRes->ekey);
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
  return code;

_err:
  return code;
M
Minglei Jin 已提交
1019
}