vnodeSvr.c 28.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

18 19 20 21 22 23 24 25 26
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
27 28
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
29
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
30
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
31

32
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
33
  int32_t  code = 0;
H
Hongze Cheng 已提交
34
  SDecoder dc = {0};
H
Hongze Cheng 已提交
35

H
Hongze Cheng 已提交
36 37 38 39 40 41 42 43 44 45 46
  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      int64_t ctime = taosGetTimestampMs();
      int32_t nReqs;

      tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
      tStartDecode(&dc);

      tDecodeI32v(&dc, &nReqs);
      for (int32_t iReq = 0; iReq < nReqs; iReq++) {
        tb_uid_t uid = tGenIdPI64();
H
Hongze Cheng 已提交
47
        char    *name = NULL;
H
Hongze Cheng 已提交
48 49 50
        tStartDecode(&dc);

        tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
51
        tDecodeCStr(&dc, &name);
H
Hongze Cheng 已提交
52 53 54 55 56 57 58 59 60 61 62 63 64 65
        *(int64_t *)(dc.data + dc.pos) = uid;
        *(int64_t *)(dc.data + dc.pos + 8) = ctime;

        tEndDecode(&dc);
      }

      tEndDecode(&dc);
      tDecoderClear(&dc);
    } break;
    case TDMT_VND_SUBMIT: {
      SSubmitMsgIter msgIter = {0};
      SSubmitReq    *pSubmitReq = (SSubmitReq *)pMsg->pCont;
      SSubmitBlk    *pBlock = NULL;
      int64_t        ctime = taosGetTimestampMs();
H
Hongze Cheng 已提交
66
      tb_uid_t       uid;
H
Hongze Cheng 已提交
67 68 69 70 71 72 73 74

      tInitSubmitMsgIter(pSubmitReq, &msgIter);

      for (;;) {
        tGetSubmitMsgNext(&msgIter, &pBlock);
        if (pBlock == NULL) break;

        if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
75
          char *name = NULL;
H
Hongze Cheng 已提交
76

H
Hongze Cheng 已提交
77 78 79 80
          tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
          tStartDecode(&dc);

          tDecodeI32v(&dc, NULL);
H
Hongze Cheng 已提交
81 82 83 84 85 86
          tDecodeCStr(&dc, &name);

          uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
          if (uid == 0) {
            uid = tGenIdPI64();
          }
H
Hongze Cheng 已提交
87
          *(int64_t *)(dc.data + dc.pos) = uid;
H
Hongze Cheng 已提交
88
          *(int64_t *)(dc.data + dc.pos + 8) = ctime;
H
Hongze Cheng 已提交
89
          pBlock->uid = htobe64(uid);
H
Hongze Cheng 已提交
90 91 92 93 94 95 96

          tEndDecode(&dc);
          tDecoderClear(&dc);
        }
      }

    } break;
H
Hongze Cheng 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
    case TDMT_VND_DELETE: {
      int32_t     size;
      int32_t     ret;
      uint8_t    *pCont;
      SEncoder   *pCoder = &(SEncoder){0};
      SDeleteRes  res = {0};
      SReadHandle handle = {
          .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};

      code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
      if (code) goto _err;

      // malloc and encode
      tEncodeSize(tEncodeDeleteRes, &res, size, ret);
      pCont = rpcMallocCont(size + sizeof(SMsgHead));

      ((SMsgHead *)pCont)->contLen = htonl(size + sizeof(SMsgHead));
      ((SMsgHead *)pCont)->vgId = htonl(TD_VID(pVnode));

      tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
      tEncodeDeleteRes(pCoder, &res);
      tEncoderClear(pCoder);

      rpcFreeCont(pMsg->pCont);
      pMsg->pCont = pCont;
      pMsg->contLen = size + sizeof(SMsgHead);

      taosArrayDestroy(res.uidList);
    } break;
H
Hongze Cheng 已提交
126 127 128
    default:
      break;
  }
H
Hongze Cheng 已提交
129

S
Shengliang Guan 已提交
130
  return code;
H
Hongze Cheng 已提交
131 132 133 134

_err:
  vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
135 136
}

137
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
138 139 140 141
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
H
Hongze Cheng 已提交
142

143
  vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
144
         version);
H
Hongze Cheng 已提交
145

H
Hongze Cheng 已提交
146
  pVnode->state.applied = version;
H
Hongze Cheng 已提交
147
  pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
H
Hongze Cheng 已提交
148

H
Hongze Cheng 已提交
149 150 151
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
152 153

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
154
    /* META */
H
Hongze Cheng 已提交
155
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
156
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
157
      break;
H
Hongze Cheng 已提交
158
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
159
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
160
      break;
H
Hongze Cheng 已提交
161
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
162
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
163
      break;
H
Hongze Cheng 已提交
164
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
165
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
166 167
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
168
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
169
      break;
H
Hongze Cheng 已提交
170
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
171
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
172
      break;
173
    case TDMT_VND_DROP_TTL_TABLE:
174
      if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
175
      break;
176 177
    case TDMT_VND_CREATE_SMA: {
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
178 179
    } break;
    /* TSDB */
H
Hongze Cheng 已提交
180
    case TDMT_VND_SUBMIT:
H
Hongze Cheng 已提交
181
      if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
182
      break;
D
dapan1121 已提交
183
    case TDMT_VND_DELETE:
H
Hongze Cheng 已提交
184
      if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
185
      break;
H
Hongze Cheng 已提交
186
    /* TQ */
L
Liu Jicong 已提交
187 188 189
    case TDMT_VND_MQ_VG_CHANGE:
      if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                               pMsg->contLen - sizeof(SMsgHead)) < 0) {
190
        goto _err;
L
Liu Jicong 已提交
191 192
      }
      break;
L
Liu Jicong 已提交
193 194
    case TDMT_VND_MQ_VG_DELETE:
      if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
195 196 197 198 199 200 201
        goto _err;
      }
      break;
    case TDMT_VND_MQ_COMMIT_OFFSET:
      if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                   pMsg->contLen - sizeof(SMsgHead)) < 0) {
        goto _err;
L
Liu Jicong 已提交
202 203
      }
      break;
204
    case TDMT_STREAM_TASK_DEPLOY: {
L
Liu Jicong 已提交
205 206
      if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                                 pMsg->contLen - sizeof(SMsgHead)) < 0) {
207
        goto _err;
H
Hongze Cheng 已提交
208 209
      }
    } break;
L
Liu Jicong 已提交
210
    case TDMT_STREAM_TASK_DROP: {
L
Liu Jicong 已提交
211 212 213 214
      if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
        goto _err;
      }
    } break;
215 216 217
    case TDMT_VND_ALTER_CONFIRM:
      vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
      break;
S
Shengliang Guan 已提交
218
    case TDMT_VND_ALTER_HASHRANGE:
219
      vnodeProcessAlterHashRangeReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
220
      break;
221
    case TDMT_VND_ALTER_CONFIG:
222
      vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
S
Shengliang Guan 已提交
223
      break;
H
Hongze Cheng 已提交
224 225
    case TDMT_VND_COMMIT:
      goto _do_commit;
H
Hongze Cheng 已提交
226 227 228 229 230
    default:
      ASSERT(0);
      break;
  }

231
  vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
H
Hongze Cheng 已提交
232

233
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
S
Shengliang Guan 已提交
234
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
235 236 237
    return -1;
  }

H
Hongze Cheng 已提交
238
  // commit if need
H
Hongze Cheng 已提交
239
  if (vnodeShouldCommit(pVnode)) {
H
Hongze Cheng 已提交
240
  _do_commit:
S
Shengliang Guan 已提交
241
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
242 243 244 245 246
    // commit current change
    vnodeCommit(pVnode);

    // start a new one
    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
247 248 249
  }

  return 0;
H
Hongze Cheng 已提交
250 251

_err:
S
Shengliang Guan 已提交
252
  vError("vgId:%d, process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
253 254
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
255 256
}

257
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
258
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
259 260 261 262 263 264 265
    return 0;
  }

  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg);
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
266
  vTrace("message in vnode query queue is processing");
D
dapan1121 已提交
267
  if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
268 269 270 271
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

272
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
273
  switch (pMsg->msgType) {
D
dapan1121 已提交
274
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
275
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
276
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
277
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
278
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
279 280 281 282 283 284
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

285
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
H
Hongze Cheng 已提交
286
  vTrace("message in fetch queue is processing");
M
Minglei Jin 已提交
287 288 289
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META ||
       pMsg->msgType == TDMT_VND_TABLE_CFG) &&
      !vnodeIsLeader(pVnode)) {
290 291 292 293
    vnodeRedirectRpcMsg(pVnode, pMsg);
    return 0;
  }

H
Hongze Cheng 已提交
294 295
  char   *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
C
Cary Xu 已提交
296

H
Hongze Cheng 已提交
297
  switch (pMsg->msgType) {
D
dapan1121 已提交
298
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
299
    case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
300
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
301
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
302
      return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
303
    case TDMT_SCH_CANCEL_TASK:
D
dapan1121 已提交
304
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
305
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
306
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
307
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
308
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
309 310
    case TDMT_VND_TABLE_META:
      return vnodeGetTableMeta(pVnode, pMsg);
D
dapan1121 已提交
311 312
    case TDMT_VND_TABLE_CFG:
      return vnodeGetTableCfg(pVnode, pMsg);
H
Hongze Cheng 已提交
313 314
    case TDMT_VND_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
315 316 317
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
318
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
319
    case TDMT_STREAM_TASK_RECOVER:
L
Liu Jicong 已提交
320
      return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
321 322
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
323
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
324
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
325
    case TDMT_STREAM_TASK_RECOVER_RSP:
L
Liu Jicong 已提交
326
      return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
327 328
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
329 330 331 332 333 334 335 336 337 338
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO

S
slzhou 已提交
339
  blockDebugShowDataBlocks(data, __func__);
340
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
341 342
}

D
dapan1121 已提交
343 344 345 346 347 348 349
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

L
Liu Jicong 已提交
350
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
351 352 353
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

L
Liu Jicong 已提交
354
  int32_t t = ntohl(*(int32_t *)pReq);
H
Hongze Cheng 已提交
355
  vDebug("rec ttl time:%d", t);
wmmhello's avatar
wmmhello 已提交
356
  int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids);
L
Liu Jicong 已提交
357
  if (ret != 0) {
358 359
    goto end;
  }
L
Liu Jicong 已提交
360
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
361 362
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
363 364 365 366 367 368

end:
  taosArrayDestroy(tbUids);
  return ret;
}

369
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
370
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
371
  SDecoder       coder;
H
Hongze Cheng 已提交
372

H
Hongze Cheng 已提交
373 374 375 376 377 378
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
379
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
380 381

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
382 383
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
384 385
  }

H
Hongze Cheng 已提交
386
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
387 388
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
389 390
  }

391 392 393 394
  if (tdProcessRSmaCreate(pVnode, &req) < 0) {
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
395

M
Minglei Jin 已提交
396 397
  taosMemoryFree(req.schemaRow.pSchema);
  taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
398
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
399
  return 0;
H
Hongze Cheng 已提交
400 401

_err:
M
Minglei Jin 已提交
402 403
  taosMemoryFree(req.schemaRow.pSchema);
  taosMemoryFree(req.schemaTag.pSchema);
H
Hongze Cheng 已提交
404
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
405
  return -1;
H
Hongze Cheng 已提交
406 407
}

408
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
409
  SDecoder           decoder = {0};
410
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
411 412
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
413 414 415
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
416
  STbUidStore       *pStore = NULL;
417
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
418 419

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
420 421 422
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
423

H
Hongze Cheng 已提交
424
  // decode
H
Hongze Cheng 已提交
425 426
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
427 428 429 430
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
431

H
Hongze Cheng 已提交
432
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
433 434
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
435 436 437 438 439
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
440
  // loop to create table
441
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
442
    pCreateReq = req.pReqs + iReq;
H
Hongze Cheng 已提交
443 444 445 446 447 448 449 450 451 452

    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
H
Hongze Cheng 已提交
453
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
H
Hongze Cheng 已提交
454 455 456 457 458
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
459
    } else {
H
Hongze Cheng 已提交
460
      cRsp.code = TSDB_CODE_SUCCESS;
461
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
462
      taosArrayPush(tbUids, &pCreateReq->uid);
H
Hongze Cheng 已提交
463
    }
H
Hongze Cheng 已提交
464 465

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
466 467
  }

H
Hongze Cheng 已提交
468
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
469

470
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
471 472
  tdUpdateTbUidList(pVnode->pSma, pStore);
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
473

H
Hongze Cheng 已提交
474
  // prepare rsp
H
Hongze Cheng 已提交
475 476
  SEncoder encoder = {0};
  int32_t  ret = 0;
wafwerar's avatar
wafwerar 已提交
477
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
478 479 480 481 482 483
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
484 485 486
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
487

H
Hongze Cheng 已提交
488
_exit:
H
Hongze Cheng 已提交
489
  taosArrayDestroy(rsp.pArray);
490
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
491 492
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
493
  return rcode;
H
Hongze Cheng 已提交
494 495
}

496
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
512
  }
H
Hongze Cheng 已提交
513 514 515 516 517 518 519 520 521

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
522 523 524
  return 0;
}

525
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
526
  SVDropStbReq req = {0};
527
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
528
  SDecoder     decoder = {0};
H
Hongze Cheng 已提交
529 530 531 532 533 534

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
535 536
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
537 538 539 540 541
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
542 543 544 545
  if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
546 547 548 549

  // return rsp
_exit:
  pRsp->code = rcode;
H
Hongze Cheng 已提交
550
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
551 552 553
  return 0;
}

554
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
555 556 557
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
558 559
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
560 561
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
562 563 564 565 566 567 568 569 570 571

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
572
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
573
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
574 575
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
576 577 578
  }

  // process
D
dapan1121 已提交
579
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq, &vMetaRsp) < 0) {
H
Hongze Cheng 已提交
580
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
581
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
582 583
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
584 585
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
586

D
dapan1121 已提交
587 588 589 590 591
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
592 593 594 595 596 597
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
598 599 600
  return 0;
}

601
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
602 603
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
604
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
605
  SEncoder         encoder = {0};
606
  int32_t          ret;
607
  SArray          *tbUids = NULL;
H
Hongze Cheng 已提交
608

H
Hongze Cheng 已提交
609
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
610 611 612
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
613 614

  // decode req
H
Hongze Cheng 已提交
615 616
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
617 618 619 620 621
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
622 623

  // process req
624
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
625
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
626 627
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

628
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
629 630
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
H
Hongze Cheng 已提交
631

H
Hongze Cheng 已提交
632
    /* code */
633
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
H
Hongze Cheng 已提交
634
    if (ret < 0) {
H
Hongze Cheng 已提交
635 636 637 638 639
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
640
    } else {
H
Hongze Cheng 已提交
641
      dropTbRsp.code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
642 643 644 645 646
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

647 648
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);

H
Hongze Cheng 已提交
649
_exit:
650
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
651
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
652 653 654 655 656
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
657 658 659
  return 0;
}

660 661
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
662 663 664 665
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
666
  int32_t        rv = -1;
D
dapan 已提交
667 668 669

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
C
Cary Xu 已提交
670
  if (!pSchema || (suid != msgIter->suid) || rv != TD_ROW_SVER(blkIter.row)) {
D
dapan 已提交
671 672 673
    if (pSchema) {
      taosMemoryFreeClear(pSchema);
    }
C
Cary Xu 已提交
674
    pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row));  // TODO: use the real schema
D
dapan 已提交
675 676
    if (pSchema) {
      suid = msgIter->suid;
C
Cary Xu 已提交
677
      rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694
    }
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

695
static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
C
Cary Xu 已提交
696 697 698 699 700 701 702 703 704
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SMeta         *pMeta = pVnode->pMeta;
  SSubmitBlk    *pBlock = NULL;

  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
H
Hongze Cheng 已提交
705

D
dapan 已提交
706 707
    vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
  }
C
Cary Xu 已提交
708 709 710 711

  return 0;
}

712
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
713
  SSubmitReq    *pSubmitReq = (SSubmitReq *)pReq;
H
Hongze Cheng 已提交
714
  SSubmitRsp     submitRsp = {0};
H
Hongze Cheng 已提交
715 716 717 718
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock;
  SSubmitRsp     rsp = {0};
  SVCreateTbReq  createTbReq = {0};
H
Hongze Cheng 已提交
719
  SDecoder       decoder = {0};
H
Hongze Cheng 已提交
720
  int32_t        nRows;
H
Hongze Cheng 已提交
721 722
  int32_t        tsize, ret;
  SEncoder       encoder = {0};
723
  SArray        *newTbUids = NULL;
724
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
725 726

  pRsp->code = 0;
C
Cary Xu 已提交
727

C
Cary Xu 已提交
728 729 730 731
#ifdef TD_DEBUG_PRINT_ROW
  vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif

C
Cary Xu 已提交
732 733 734 735 736
  if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
    pRsp->code = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
737
  // handle the request
H
Hongze Cheng 已提交
738 739 740
  if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    goto _exit;
H
Hongze Cheng 已提交
741 742
  }

C
Cary Xu 已提交
743 744
  submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
  newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
745 746 747 748 749
  if (!submitRsp.pArray) {
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
750
  for (;;) {
H
Hongze Cheng 已提交
751 752 753
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
754 755
    SSubmitBlkRsp submitBlkRsp = {0};

H
Hongze Cheng 已提交
756 757
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
758 759
      submitBlkRsp.hashMeta = 1;

H
Hongze Cheng 已提交
760 761
      tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
H
Hongze Cheng 已提交
762
        pRsp->code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
763
        tDecoderClear(&decoder);
H
Hongze Cheng 已提交
764 765 766 767 768
        goto _exit;
      }

      if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
H
Hongze Cheng 已提交
769
          submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
770
          tDecoderClear(&decoder);
H
Hongze Cheng 已提交
771 772 773
          goto _exit;
        }
      }
774
      taosArrayPush(newTbUids, &createTbReq.uid);
H
Hongze Cheng 已提交
775

H
Hongze Cheng 已提交
776
      submitBlkRsp.uid = createTbReq.uid;
D
dapan 已提交
777
      submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
D
dapan 已提交
778
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
779

H
Hongze Cheng 已提交
780 781 782 783 784 785 786
      msgIter.uid = createTbReq.uid;
      if (createTbReq.type == TSDB_CHILD_TABLE) {
        msgIter.suid = createTbReq.ctb.suid;
      } else {
        msgIter.suid = 0;
      }

wmmhello's avatar
wmmhello 已提交
787
#ifdef TD_DEBUG_PRINT_ROW
D
dapan 已提交
788
      vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
wmmhello's avatar
wmmhello 已提交
789
#endif
H
Hongze Cheng 已提交
790
      tDecoderClear(&decoder);
D
dapan1121 已提交
791 792 793
    } else {
      submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
794 795
    }

H
Hongze Cheng 已提交
796
    if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
797
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
798 799
    }

H
Hongze Cheng 已提交
800 801 802
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
    taosArrayPush(submitRsp.pArray, &submitBlkRsp);
H
Hongze Cheng 已提交
803
  }
804
  tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
805

H
Hongze Cheng 已提交
806
_exit:
807
  taosArrayDestroy(newTbUids);
H
Hongze Cheng 已提交
808 809 810 811 812 813 814 815
  tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  pRsp->pCont = rpcMallocCont(tsize);
  pRsp->contLen = tsize;
  tEncoderInit(&encoder, pRsp->pCont, tsize);
  tEncodeSSubmitRsp(&encoder, &submitRsp);
  tEncoderClear(&encoder);

  for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
D
dapan 已提交
816
    taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
H
Hongze Cheng 已提交
817 818 819
  }

  taosArrayDestroy(submitRsp.pArray);
H
Hongze Cheng 已提交
820

821
  // TODO: the partial success scenario and the error case
H
Hongze Cheng 已提交
822 823
  // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
  // 1/level 2.
824
  // TODO: refactor
C
Cary Xu 已提交
825
  if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
L
Liu Jicong 已提交
826
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
827
  }
C
Cary Xu 已提交
828

H
Hongze Cheng 已提交
829
  return 0;
L
Liu Jicong 已提交
830
}
831

832
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
833
  SVCreateTSmaReq req = {0};
H
Hongze Cheng 已提交
834
  SDecoder        coder;
835

C
Cary Xu 已提交
836 837 838 839 840 841
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
842 843 844 845 846

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
847 848
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
849
    goto _err;
850
  }
C
Cary Xu 已提交
851

C
Cary Xu 已提交
852
  if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
C
Cary Xu 已提交
853
    if (pRsp) pRsp->code = terrno;
854
    goto _err;
855
  }
C
Cary Xu 已提交
856

857
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
858
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
C
Cary Xu 已提交
859
         req.indexName, req.indexUid, version, req.tableUid);
H
Hongze Cheng 已提交
860
  return 0;
861 862 863

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
864
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
C
Cary Xu 已提交
865
         TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr(terrno));
866
  return -1;
L
Liu Jicong 已提交
867
}
C
Cary Xu 已提交
868 869 870 871 872 873 874 875 876 877 878 879

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
880 881 882 883 884 885 886 887 888

static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  vInfo("vgId:%d, alter replica confim msg is processed", TD_VID(pVnode));
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  return 0;
889
}
S
Shengliang Guan 已提交
890

891
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
892 893
  vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));

894 895
  // todo
  // 1. stop work
S
Shengliang Guan 已提交
896 897 898
  // 2. adjust hash range / compact / remove wals / rename vgroups
  // 3. reload sync
  return 0;
M
Minghao Li 已提交
899
}
H
Hongze Cheng 已提交
900

901 902 903 904 905 906 907 908 909 910 911 912
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SAlterVnodeReq alterReq = {0};
  if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return TSDB_CODE_INVALID_MSG;
  }

  vInfo("vgId:%d, start to alter vnode config, cacheLast:%d cacheLastSize:%d", TD_VID(pVnode), alterReq.cacheLast,
        alterReq.cacheLastSize);
  return 0;
}

H
Hongze Cheng 已提交
913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
    code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
                               pRes->skey, pRes->ekey);
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
  return code;

_err:
  return code;
M
Minglei Jin 已提交
939
}