vnodeSvr.c 27.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

18 19 20 21 22 23 24 25 26
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
S
Shengliang Guan 已提交
27
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
28
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
29
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
30

31
int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
32
  int32_t  code = 0;
H
Hongze Cheng 已提交
33
  SDecoder dc = {0};
H
Hongze Cheng 已提交
34

H
Hongze Cheng 已提交
35 36 37 38 39 40 41 42 43 44 45
  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      int64_t ctime = taosGetTimestampMs();
      int32_t nReqs;

      tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
      tStartDecode(&dc);

      tDecodeI32v(&dc, &nReqs);
      for (int32_t iReq = 0; iReq < nReqs; iReq++) {
        tb_uid_t uid = tGenIdPI64();
H
Hongze Cheng 已提交
46
        char    *name = NULL;
H
Hongze Cheng 已提交
47 48 49
        tStartDecode(&dc);

        tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
50
        tDecodeCStr(&dc, &name);
H
Hongze Cheng 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64
        *(int64_t *)(dc.data + dc.pos) = uid;
        *(int64_t *)(dc.data + dc.pos + 8) = ctime;

        tEndDecode(&dc);
      }

      tEndDecode(&dc);
      tDecoderClear(&dc);
    } break;
    case TDMT_VND_SUBMIT: {
      SSubmitMsgIter msgIter = {0};
      SSubmitReq    *pSubmitReq = (SSubmitReq *)pMsg->pCont;
      SSubmitBlk    *pBlock = NULL;
      int64_t        ctime = taosGetTimestampMs();
H
Hongze Cheng 已提交
65
      tb_uid_t       uid;
H
Hongze Cheng 已提交
66 67 68 69 70 71 72 73

      tInitSubmitMsgIter(pSubmitReq, &msgIter);

      for (;;) {
        tGetSubmitMsgNext(&msgIter, &pBlock);
        if (pBlock == NULL) break;

        if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
74
          char *name = NULL;
H
Hongze Cheng 已提交
75

H
Hongze Cheng 已提交
76 77 78 79
          tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
          tStartDecode(&dc);

          tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
80 81 82 83 84 85
          tDecodeCStr(&dc, &name);

          uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
          if (uid == 0) {
            uid = tGenIdPI64();
          }
H
Hongze Cheng 已提交
86
          *(int64_t *)(dc.data + dc.pos) = uid;
H
Hongze Cheng 已提交
87
          *(int64_t *)(dc.data + dc.pos + 8) = ctime;
H
Hongze Cheng 已提交
88
          pBlock->uid = htobe64(uid);
H
Hongze Cheng 已提交
89 90 91 92 93 94 95

          tEndDecode(&dc);
          tDecoderClear(&dc);
        }
      }

    } break;
H
Hongze Cheng 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
    case TDMT_VND_DELETE: {
      int32_t     size;
      int32_t     ret;
      uint8_t    *pCont;
      SEncoder   *pCoder = &(SEncoder){0};
      SDeleteRes  res = {0};
      SReadHandle handle = {
          .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};

      code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
      if (code) goto _err;

      // malloc and encode
      tEncodeSize(tEncodeDeleteRes, &res, size, ret);
      pCont = rpcMallocCont(size + sizeof(SMsgHead));

      ((SMsgHead *)pCont)->contLen = htonl(size + sizeof(SMsgHead));
      ((SMsgHead *)pCont)->vgId = htonl(TD_VID(pVnode));

      tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
      tEncodeDeleteRes(pCoder, &res);
      tEncoderClear(pCoder);

      rpcFreeCont(pMsg->pCont);
      pMsg->pCont = pCont;
      pMsg->contLen = size + sizeof(SMsgHead);

      taosArrayDestroy(res.uidList);
    } break;
H
Hongze Cheng 已提交
125 126 127
    default:
      break;
  }
H
Hongze Cheng 已提交
128

S
Shengliang Guan 已提交
129
  return code;
H
Hongze Cheng 已提交
130 131 132 133

_err:
  vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
134 135
}

136 137 138 139 140
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
H
Hongze Cheng 已提交
141

142
  vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
143
         version);
H
Hongze Cheng 已提交
144

H
Hongze Cheng 已提交
145 146
  pVnode->state.applied = version;

H
Hongze Cheng 已提交
147 148 149
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
150 151

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
152
    /* META */
H
Hongze Cheng 已提交
153
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
154
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
155
      break;
H
Hongze Cheng 已提交
156
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
157
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
158
      break;
H
Hongze Cheng 已提交
159
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
160
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
161
      break;
H
Hongze Cheng 已提交
162
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
163
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
164 165
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
166
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
167
      break;
H
Hongze Cheng 已提交
168
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
169
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
170
      break;
171
    case TDMT_VND_DROP_TTL_TABLE:
172
      if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
173
      break;
174 175
    case TDMT_VND_CREATE_SMA: {
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
176 177
    } break;
    /* TSDB */
H
Hongze Cheng 已提交
178
    case TDMT_VND_SUBMIT:
H
Hongze Cheng 已提交
179
      if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
180
      break;
D
dapan1121 已提交
181
    case TDMT_VND_DELETE:
H
Hongze Cheng 已提交
182
      if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
183
      break;
H
Hongze Cheng 已提交
184
    /* TQ */
L
Liu Jicong 已提交
185 186 187
    case TDMT_VND_MQ_VG_CHANGE:
      if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                               pMsg->contLen - sizeof(SMsgHead)) < 0) {
188
        goto _err;
L
Liu Jicong 已提交
189 190
      }
      break;
L
Liu Jicong 已提交
191 192
    case TDMT_VND_MQ_VG_DELETE:
      if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
193 194 195 196 197 198 199
        goto _err;
      }
      break;
    case TDMT_VND_MQ_COMMIT_OFFSET:
      if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                   pMsg->contLen - sizeof(SMsgHead)) < 0) {
        goto _err;
L
Liu Jicong 已提交
200 201
      }
      break;
202
    case TDMT_STREAM_TASK_DEPLOY: {
L
Liu Jicong 已提交
203 204
      if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                 pMsg->contLen - sizeof(SMsgHead)) < 0) {
205
        goto _err;
H
Hongze Cheng 已提交
206 207
      }
    } break;
L
Liu Jicong 已提交
208
    case TDMT_STREAM_TASK_DROP: {
L
Liu Jicong 已提交
209 210 211 212
      if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
        goto _err;
      }
    } break;
213 214 215
    case TDMT_VND_ALTER_CONFIRM:
      vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
      break;
S
Shengliang Guan 已提交
216 217 218
    case TDMT_VND_ALTER_HASHRANGE:
      vnodeProcessAlterHasnRangeReq(pVnode, version, pReq, len, pRsp);
      break;
219
    case TDMT_VND_ALTER_CONFIG:
S
Shengliang Guan 已提交
220
      break;
H
Hongze Cheng 已提交
221 222
    case TDMT_VND_COMMIT:
      goto _do_commit;
H
Hongze Cheng 已提交
223 224 225 226 227
    default:
      ASSERT(0);
      break;
  }

228
  vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
H
Hongze Cheng 已提交
229

230
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
S
Shengliang Guan 已提交
231
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
232 233 234
    return -1;
  }

H
Hongze Cheng 已提交
235
  // commit if need
H
Hongze Cheng 已提交
236
  if (vnodeShouldCommit(pVnode)) {
H
Hongze Cheng 已提交
237
  _do_commit:
S
Shengliang Guan 已提交
238
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
239 240 241 242 243
    // commit current change
    vnodeCommit(pVnode);

    // start a new one
    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
244 245 246
  }

  return 0;
H
Hongze Cheng 已提交
247 248

_err:
S
Shengliang Guan 已提交
249
  vError("vgId:%d, process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
250 251
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
252 253
}

254
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
255
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
256 257 258 259 260 261 262
    return 0;
  }

  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg);
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
263
  vTrace("message in vnode query queue is processing");
264
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
265
  switch (pMsg->msgType) {
D
dapan1121 已提交
266
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
267
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
268
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
269
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
270
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
271 272 273 274 275 276
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

277
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
H
Hongze Cheng 已提交
278 279 280
  vTrace("message in fetch queue is processing");
  char   *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
C
Cary Xu 已提交
281

H
Hongze Cheng 已提交
282
  switch (pMsg->msgType) {
D
dapan1121 已提交
283
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
284
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
285
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
286
      return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
287
    case TDMT_SCH_CANCEL_TASK:
D
dapan1121 已提交
288
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
289
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
290
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
291
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
292
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
293 294
    case TDMT_VND_TABLE_META:
      return vnodeGetTableMeta(pVnode, pMsg);
D
dapan1121 已提交
295 296
    case TDMT_VND_TABLE_CFG:
      return vnodeGetTableCfg(pVnode, pMsg);
H
Hongze Cheng 已提交
297 298
    case TDMT_VND_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
299 300 301
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
302
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
303
    case TDMT_STREAM_TASK_RECOVER:
L
Liu Jicong 已提交
304
      return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
305 306
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
307
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
308
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
309
    case TDMT_STREAM_TASK_RECOVER_RSP:
L
Liu Jicong 已提交
310
      return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
311 312
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
313 314 315 316 317 318 319 320 321 322
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO

S
slzhou 已提交
323
  blockDebugShowDataBlocks(data, __func__);
324
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
325 326
}

D
dapan1121 已提交
327 328 329 330 331 332 333
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

L
Liu Jicong 已提交
334
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
335 336 337
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

L
Liu Jicong 已提交
338
  int32_t t = ntohl(*(int32_t *)pReq);
wmmhello's avatar
wmmhello 已提交
339 340
  vError("rec ttl time:%d", t);
  int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids);
L
Liu Jicong 已提交
341
  if (ret != 0) {
342 343
    goto end;
  }
L
Liu Jicong 已提交
344
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
345 346
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
347 348 349 350 351 352

end:
  taosArrayDestroy(tbUids);
  return ret;
}

353
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
354
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
355
  SDecoder       coder;
H
Hongze Cheng 已提交
356

H
Hongze Cheng 已提交
357 358 359 360 361 362
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
363
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
364 365

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
366 367
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
368 369
  }

H
Hongze Cheng 已提交
370
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
371 372
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
373 374
  }

375 376 377 378
  if (tdProcessRSmaCreate(pVnode, &req) < 0) {
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
379

H
Hongze Cheng 已提交
380
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
381
  return 0;
H
Hongze Cheng 已提交
382 383

_err:
H
Hongze Cheng 已提交
384
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
385
  return -1;
H
Hongze Cheng 已提交
386 387
}

388
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
389
  SDecoder           decoder = {0};
390
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
391 392
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
393 394 395
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
396
  STbUidStore       *pStore = NULL;
397
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
398 399

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
400 401 402
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
403

H
Hongze Cheng 已提交
404
  // decode
H
Hongze Cheng 已提交
405 406
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
407 408 409 410
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
411

H
Hongze Cheng 已提交
412
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
413 414
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
415 416 417 418 419
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
420
  // loop to create table
421
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
422
    pCreateReq = req.pReqs + iReq;
H
Hongze Cheng 已提交
423 424 425 426 427 428 429 430 431 432

    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
H
Hongze Cheng 已提交
433
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
H
Hongze Cheng 已提交
434 435 436 437 438
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
439
    } else {
H
Hongze Cheng 已提交
440
      cRsp.code = TSDB_CODE_SUCCESS;
441
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
442
      taosArrayPush(tbUids, &pCreateReq->uid);
H
Hongze Cheng 已提交
443
    }
H
Hongze Cheng 已提交
444 445

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
446 447
  }

H
Hongze Cheng 已提交
448
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
449

450
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
451 452
  tdUpdateTbUidList(pVnode->pSma, pStore);
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
453

H
Hongze Cheng 已提交
454
  // prepare rsp
H
Hongze Cheng 已提交
455 456
  SEncoder encoder = {0};
  int32_t  ret = 0;
wafwerar's avatar
wafwerar 已提交
457
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
458 459 460 461 462 463
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
464 465 466
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
467

H
Hongze Cheng 已提交
468
_exit:
H
Hongze Cheng 已提交
469
  taosArrayDestroy(rsp.pArray);
470
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
471 472
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
473
  return rcode;
H
Hongze Cheng 已提交
474 475
}

476
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
492
  }
H
Hongze Cheng 已提交
493 494 495 496 497 498 499 500 501

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
502 503 504
  return 0;
}

505
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
506
  SVDropStbReq req = {0};
507
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
508
  SDecoder     decoder = {0};
H
Hongze Cheng 已提交
509 510 511 512 513 514

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
515 516
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
517 518 519 520 521
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
522 523 524 525
  if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
526 527 528 529

  // return rsp
_exit:
  pRsp->code = rcode;
H
Hongze Cheng 已提交
530
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
531 532 533
  return 0;
}

534
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
535 536 537
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
538 539
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
540 541
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
542 543 544 545 546 547 548 549 550 551

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
552
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
553
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
554 555
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
556 557 558
  }

  // process
D
dapan1121 已提交
559
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq, &vMetaRsp) < 0) {
H
Hongze Cheng 已提交
560
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
561
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
562 563
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
564 565
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
566

D
dapan1121 已提交
567 568 569 570 571
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
572 573 574 575 576 577
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
578 579 580
  return 0;
}

581
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
582 583
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
584
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
585
  SEncoder         encoder = {0};
586
  int32_t          ret;
587
  SArray          *tbUids = NULL;
H
Hongze Cheng 已提交
588

H
Hongze Cheng 已提交
589
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
590 591 592
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
593 594

  // decode req
H
Hongze Cheng 已提交
595 596
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
597 598 599 600 601
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
602 603

  // process req
604
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
605
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
606 607
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

608
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
609 610
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
H
Hongze Cheng 已提交
611

H
Hongze Cheng 已提交
612
    /* code */
613
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
H
Hongze Cheng 已提交
614
    if (ret < 0) {
H
Hongze Cheng 已提交
615 616 617 618 619
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
620
    } else {
H
Hongze Cheng 已提交
621
      dropTbRsp.code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
622 623 624 625 626
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

627 628
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);

H
Hongze Cheng 已提交
629
_exit:
630
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
631
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
632 633 634 635 636
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
637 638 639
  return 0;
}

640 641
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
642 643 644 645
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
646
  int32_t        rv = -1;
D
dapan 已提交
647 648 649

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
C
Cary Xu 已提交
650
  if (!pSchema || (suid != msgIter->suid) || rv != TD_ROW_SVER(blkIter.row)) {
D
dapan 已提交
651 652 653
    if (pSchema) {
      taosMemoryFreeClear(pSchema);
    }
C
Cary Xu 已提交
654
    pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row));  // TODO: use the real schema
D
dapan 已提交
655 656
    if (pSchema) {
      suid = msgIter->suid;
C
Cary Xu 已提交
657
      rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
    }
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

675
static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
C
Cary Xu 已提交
676 677 678 679 680 681 682 683 684
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SMeta         *pMeta = pVnode->pMeta;
  SSubmitBlk    *pBlock = NULL;

  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
H
Hongze Cheng 已提交
685

D
dapan 已提交
686 687
    vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
  }
C
Cary Xu 已提交
688 689 690 691

  return 0;
}

692
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
693
  SSubmitReq    *pSubmitReq = (SSubmitReq *)pReq;
H
Hongze Cheng 已提交
694
  SSubmitRsp     submitRsp = {0};
H
Hongze Cheng 已提交
695 696 697 698
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock;
  SSubmitRsp     rsp = {0};
  SVCreateTbReq  createTbReq = {0};
H
Hongze Cheng 已提交
699
  SDecoder       decoder = {0};
H
Hongze Cheng 已提交
700
  int32_t        nRows;
H
Hongze Cheng 已提交
701 702
  int32_t        tsize, ret;
  SEncoder       encoder = {0};
703
  SArray        *newTbUids = NULL;
704
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
705 706

  pRsp->code = 0;
C
Cary Xu 已提交
707

C
Cary Xu 已提交
708 709 710 711
#ifdef TD_DEBUG_PRINT_ROW
  vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif

C
Cary Xu 已提交
712 713 714 715 716
  if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
    pRsp->code = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
717
  // handle the request
H
Hongze Cheng 已提交
718 719 720
  if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    goto _exit;
H
Hongze Cheng 已提交
721 722
  }

C
Cary Xu 已提交
723 724
  submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
  newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
725 726 727 728 729
  if (!submitRsp.pArray) {
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
730
  for (;;) {
H
Hongze Cheng 已提交
731 732 733
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
734 735
    SSubmitBlkRsp submitBlkRsp = {0};

H
Hongze Cheng 已提交
736 737
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
738 739
      submitBlkRsp.hashMeta = 1;

H
Hongze Cheng 已提交
740 741
      tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
H
Hongze Cheng 已提交
742
        pRsp->code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
743
        tDecoderClear(&decoder);
H
Hongze Cheng 已提交
744 745 746 747 748
        goto _exit;
      }

      if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
H
Hongze Cheng 已提交
749
          submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
750
          tDecoderClear(&decoder);
H
Hongze Cheng 已提交
751 752 753
          goto _exit;
        }
      }
754
      taosArrayPush(newTbUids, &createTbReq.uid);
H
Hongze Cheng 已提交
755

H
Hongze Cheng 已提交
756
      submitBlkRsp.uid = createTbReq.uid;
D
dapan 已提交
757
      submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
D
dapan 已提交
758
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
759

H
Hongze Cheng 已提交
760 761 762 763 764 765 766
      msgIter.uid = createTbReq.uid;
      if (createTbReq.type == TSDB_CHILD_TABLE) {
        msgIter.suid = createTbReq.ctb.suid;
      } else {
        msgIter.suid = 0;
      }

wmmhello's avatar
wmmhello 已提交
767
#ifdef TD_DEBUG_PRINT_ROW
D
dapan 已提交
768
      vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
wmmhello's avatar
wmmhello 已提交
769
#endif
H
Hongze Cheng 已提交
770
      tDecoderClear(&decoder);
D
dapan1121 已提交
771 772 773
    } else {
      submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
774 775
    }

H
Hongze Cheng 已提交
776
    if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
777
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
778 779
    }

H
Hongze Cheng 已提交
780 781 782
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
    taosArrayPush(submitRsp.pArray, &submitBlkRsp);
H
Hongze Cheng 已提交
783
  }
784
  tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
785

H
Hongze Cheng 已提交
786
_exit:
787
  taosArrayDestroy(newTbUids);
H
Hongze Cheng 已提交
788 789 790 791 792 793 794 795
  tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  pRsp->pCont = rpcMallocCont(tsize);
  pRsp->contLen = tsize;
  tEncoderInit(&encoder, pRsp->pCont, tsize);
  tEncodeSSubmitRsp(&encoder, &submitRsp);
  tEncoderClear(&encoder);

  for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
D
dapan 已提交
796
    taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
H
Hongze Cheng 已提交
797 798 799
  }

  taosArrayDestroy(submitRsp.pArray);
H
Hongze Cheng 已提交
800

801
  // TODO: the partial success scenario and the error case
C
Cary Xu 已提交
802
  // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level 1/level 2.
803
  // TODO: refactor
C
Cary Xu 已提交
804
  if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
L
Liu Jicong 已提交
805
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
806
  }
C
Cary Xu 已提交
807

H
Hongze Cheng 已提交
808
  return 0;
L
Liu Jicong 已提交
809
}
810

811
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
812
  SVCreateTSmaReq req = {0};
H
Hongze Cheng 已提交
813
  SDecoder        coder;
814

C
Cary Xu 已提交
815 816 817 818 819 820
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
821 822 823 824 825

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
826 827
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
828
    goto _err;
829
  }
C
Cary Xu 已提交
830

C
Cary Xu 已提交
831
  if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
C
Cary Xu 已提交
832
    if (pRsp) pRsp->code = terrno;
833
    goto _err;
834
  }
C
Cary Xu 已提交
835

836
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
837
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
C
Cary Xu 已提交
838
         req.indexName, req.indexUid, version, req.tableUid);
H
Hongze Cheng 已提交
839
  return 0;
840 841 842

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
843
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
C
Cary Xu 已提交
844
         TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr(terrno));
845
  return -1;
L
Liu Jicong 已提交
846
}
C
Cary Xu 已提交
847 848 849 850 851 852 853 854 855 856 857 858

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
859 860 861 862 863 864 865 866 867

static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  vInfo("vgId:%d, alter replica confim msg is processed", TD_VID(pVnode));
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  return 0;
868
}
S
Shengliang Guan 已提交
869 870 871 872

static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));

873 874
  // todo
  // 1. stop work
S
Shengliang Guan 已提交
875 876 877
  // 2. adjust hash range / compact / remove wals / rename vgroups
  // 3. reload sync
  return 0;
M
Minghao Li 已提交
878
}
H
Hongze Cheng 已提交
879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906

static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
    code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
                               pRes->skey, pRes->ekey);
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
  return code;

_err:
  return code;
}