vnodeSvr.c 54.2 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16 17
#include "tencode.h"
#include "tmsg.h"
H
Hongze Cheng 已提交
18
#include "vnd.h"
H
Hongze Cheng 已提交
19 20
#include "vnode.h"
#include "vnodeInt.h"
H
Hongze Cheng 已提交
21

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
39

H
Hongze Cheng 已提交
40
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t ctime, int64_t *pUid) {
H
Hongze Cheng 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
  int32_t code = 0;
  int32_t lino = 0;

  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // flags
  if (tDecodeI32v(pCoder, NULL) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // name
  char *name = NULL;
  if (tDecodeCStr(pCoder, &name) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // uid
  int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
  if (uid == 0) {
    uid = tGenIdPI64();
  }
  *(int64_t *)(pCoder->data + pCoder->pos) = uid;

  // ctime
  *(int64_t *)(pCoder->data + pCoder->pos + 8) = ctime;

  tEndDecode(pCoder);

_exit:
  if (code) {
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    vTrace("vgId:%d %s done, table:%s uid generated:%" PRId64, TD_VID(pVnode), __func__, name, uid);
H
Hongze Cheng 已提交
79
    if (pUid) *pUid = uid;
H
Hongze Cheng 已提交
80 81 82
  }
  return code;
}
H
Hongze Cheng 已提交
83 84 85 86 87
static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;
  int32_t lino = 0;

  int64_t  ctime = taosGetTimestampMs();
H
Hongze Cheng 已提交
88
  SDecoder dc = {0};
H
Hongze Cheng 已提交
89
  int32_t  nReqs;
H
Hongze Cheng 已提交
90

H
Hongze Cheng 已提交
91 92 93 94 95 96 97 98 99 100 101
  tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
  if (tStartDecode(&dc) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    return code;
  }

  if (tDecodeI32v(&dc, &nReqs) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  for (int32_t iReq = 0; iReq < nReqs; iReq++) {
H
Hongze Cheng 已提交
102
    code = vnodePreprocessCreateTableReq(pVnode, &dc, ctime, NULL);
H
Hongze Cheng 已提交
103
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
104 105 106 107 108 109 110 111
  }

  tEndDecode(&dc);

_exit:
  tDecoderClear(&dc);
  return code;
}
H
Hongze Cheng 已提交
112 113
extern int64_t tsMaxKeyByPrecision[];
static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t ctime) {
H
Hongze Cheng 已提交
114 115 116
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
117 118 119 120
  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
121

H
Hongze Cheng 已提交
122 123 124 125 126
  SSubmitTbData submitTbData;
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
127

H
Hongze Cheng 已提交
128
  int64_t uid;
H
Hongze Cheng 已提交
129
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
H
Hongze Cheng 已提交
130
    code = vnodePreprocessCreateTableReq(pVnode, pCoder, ctime, &uid);
H
Hongze Cheng 已提交
131 132 133 134 135 136 137 138
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // submit data
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
139 140 141

  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
    *(int64_t *)(pCoder->data + pCoder->pos) = uid;
H
Hongze Cheng 已提交
142
    pCoder->pos += sizeof(int64_t);
H
Hongze Cheng 已提交
143
  } else {
H
Hongze Cheng 已提交
144 145 146 147
    if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
      code = TSDB_CODE_INVALID_MSG;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
148
  }
H
Hongze Cheng 已提交
149

H
Hongze Cheng 已提交
150
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
H
Hongze Cheng 已提交
151 152 153 154
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
155 156 157 158 159 160 161 162 163 164 165 166
  // scan and check
  TSKEY now = ctime;
  if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) {
    now *= 1000;
  } else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
    now *= 1000000;
  }
  TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
  TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    uint64_t nColData;
    if (tDecodeU64v(pCoder, &nColData) < 0) {
H
Hongze Cheng 已提交
167
      code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
168
      goto _exit;
H
Hongze Cheng 已提交
169 170
    }

H
Hongze Cheng 已提交
171 172 173
    SColData colData = {0};
    pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);

H
Hongze Cheng 已提交
174 175 176 177 178
    if (colData.flag != HAS_VALUE) {
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }

H
Hongze Cheng 已提交
179 180 181 182 183 184 185 186 187
    for (int32_t iRow = 0; iRow < colData.nVal; iRow++) {
      if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) {
        code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
        goto _exit;
      }
    }
  } else {
    uint64_t nRow;
    if (tDecodeU64v(pCoder, &nRow) < 0) {
H
Hongze Cheng 已提交
188
      code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
189
      goto _exit;
H
Hongze Cheng 已提交
190
    }
H
Hongze Cheng 已提交
191

H
Hongze Cheng 已提交
192 193 194
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
      pCoder->pos += pRow->len;
H
Hongze Cheng 已提交
195

H
Hongze Cheng 已提交
196 197 198
      if (pRow->ts < minKey || pRow->ts > maxKey) {
        code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
        goto _exit;
H
Hongze Cheng 已提交
199
      }
H
Hongze Cheng 已提交
200 201
    }
  }
H
Hongze Cheng 已提交
202

H
Hongze Cheng 已提交
203
  tEndDecode(pCoder);
H
Hongze Cheng 已提交
204

H
Hongze Cheng 已提交
205
_exit:
H
Hongze Cheng 已提交
206
  return code;
H
Hongze Cheng 已提交
207 208 209 210
}
static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
211

H
Hongze Cheng 已提交
212
  SDecoder *pCoder = &(SDecoder){0};
H
Hongze Cheng 已提交
213

H
Hongze Cheng 已提交
214
  if (taosHton64(((SSubmitReq2Msg *)pMsg->pCont)->version) != 1) {
H
Hongze Cheng 已提交
215 216 217 218
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

X
Xiaoyu Wang 已提交
219
  tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SSubmitReq2Msg), pMsg->contLen - sizeof(SSubmitReq2Msg));
H
Hongze Cheng 已提交
220

H
Hongze Cheng 已提交
221 222 223 224
  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
225

H
Hongze Cheng 已提交
226 227 228 229 230
  uint64_t nSubmitTbData;
  if (tDecodeU64v(pCoder, &nSubmitTbData) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
231

H
Hongze Cheng 已提交
232 233 234 235
  int64_t ctime = taosGetTimestampMs();
  for (int32_t i = 0; i < nSubmitTbData; i++) {
    code = vnodePreProcessSubmitTbData(pVnode, pCoder, ctime);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
236
  }
H
Hongze Cheng 已提交
237

H
Hongze Cheng 已提交
238
  tEndDecode(pCoder);
H
Hongze Cheng 已提交
239

H
Hongze Cheng 已提交
240
_exit:
241
  if (code) {
242
    vError("vgId:%d, failed to preprocess submit request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code),
243 244
           pMsg->msgType);
  }
H
Hongze Cheng 已提交
245
  tDecoderClear(pCoder);
H
Hongze Cheng 已提交
246 247
  return code;
}
H
Hongze Cheng 已提交
248

H
Hongze Cheng 已提交
249 250 251
static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;

252 253 254 255 256
  int32_t    size;
  int32_t    ret;
  uint8_t   *pCont;
  SEncoder  *pCoder = &(SEncoder){0};
  SDeleteRes res = {0};
H
Haojun Liao 已提交
257

258
  SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Haojun Liao 已提交
259
  initStorageAPI(&handle.api);
H
Hongze Cheng 已提交
260 261 262 263 264 265 266

  code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
  if (code) goto _exit;

  // malloc and encode
  tEncodeSize(tEncodeDeleteRes, &res, size, ret);
  pCont = rpcMallocCont(size + sizeof(SMsgHead));
H
Hongze Cheng 已提交
267

H
Hongze Cheng 已提交
268 269
  ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
  ((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
H
Hongze Cheng 已提交
270

H
Hongze Cheng 已提交
271 272 273
  tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
  tEncodeDeleteRes(pCoder, &res);
  tEncoderClear(pCoder);
H
Hongze Cheng 已提交
274

H
Hongze Cheng 已提交
275 276 277
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = pCont;
  pMsg->contLen = size + sizeof(SMsgHead);
H
Hongze Cheng 已提交
278

H
Hongze Cheng 已提交
279 280 281 282 283
  taosArrayDestroy(res.uidList);

_exit:
  return code;
}
H
Hongze Cheng 已提交
284

H
Hongze Cheng 已提交
285 286 287 288 289 290 291 292 293 294 295 296
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;

  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      code = vnodePreProcessCreateTableMsg(pVnode, pMsg);
    } break;
    case TDMT_VND_SUBMIT: {
      code = vnodePreProcessSubmitMsg(pVnode, pMsg);
    } break;
    case TDMT_VND_DELETE: {
      code = vnodePreProcessDeleteMsg(pVnode, pMsg);
H
Hongze Cheng 已提交
297
    } break;
H
Hongze Cheng 已提交
298 299 300
    default:
      break;
  }
H
Hongze Cheng 已提交
301

H
Hongze Cheng 已提交
302 303
_exit:
  if (code) {
304
    vError("vgId:%d, failed to preprocess write request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code),
H
Hongze Cheng 已提交
305 306
           pMsg->msgType);
  }
H
Hongze Cheng 已提交
307
  return code;
H
Hongze Cheng 已提交
308 309
}

310
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg *pRsp) {
311 312 313 314
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
315

316 317
  if (ver <= pVnode->state.applied) {
    vError("vgId:%d, duplicate write request. ver: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), ver,
318
           pVnode->state.applied);
319
    terrno = TSDB_CODE_VND_DUP_REQUEST;
320 321 322
    return -1;
  }

323
  vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver);
H
Hongze Cheng 已提交
324

325
  ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
326
  ASSERT(pVnode->state.applied + 1 == ver);
327

328
  atomic_store_64(&pVnode->state.applied, ver);
329
  atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
H
Hongze Cheng 已提交
330

331 332
  if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;

L
Liu Jicong 已提交
333
  if (pMsg->msgType == TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE || pMsg->msgType == TDMT_STREAM_TASK_CHECK_RSP) {
334
    if (tqCheckLogInWal(pVnode->pTq, ver)) return 0;
L
Liu Jicong 已提交
335 336
  }

H
Hongze Cheng 已提交
337 338 339
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
B
Benguang Zhao 已提交
340
  bool needCommit = false;
H
Hongze Cheng 已提交
341 342

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
343
    /* META */
H
Hongze Cheng 已提交
344
    case TDMT_VND_CREATE_STB:
345
      if (vnodeProcessCreateStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
346
      break;
H
Hongze Cheng 已提交
347
    case TDMT_VND_ALTER_STB:
348
      if (vnodeProcessAlterStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
349
      break;
H
Hongze Cheng 已提交
350
    case TDMT_VND_DROP_STB:
351
      if (vnodeProcessDropStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
352
      break;
H
Hongze Cheng 已提交
353
    case TDMT_VND_CREATE_TABLE:
354
      if (vnodeProcessCreateTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
355 356
      break;
    case TDMT_VND_ALTER_TABLE:
357
      if (vnodeProcessAlterTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
358
      break;
H
Hongze Cheng 已提交
359
    case TDMT_VND_DROP_TABLE:
360
      if (vnodeProcessDropTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
361
      break;
362
    case TDMT_VND_DROP_TTL_TABLE:
363
      if (vnodeProcessDropTtlTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
364
      break;
S
Shengliang Guan 已提交
365
    case TDMT_VND_TRIM:
366
      if (vnodeProcessTrimReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
367 368
      break;
    case TDMT_VND_CREATE_SMA:
369
      if (vnodeProcessCreateTSmaReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
370
      break;
H
Hongze Cheng 已提交
371
    /* TSDB */
H
Hongze Cheng 已提交
372
    case TDMT_VND_SUBMIT:
373
      if (vnodeProcessSubmitReq(pVnode, ver, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
374
      break;
D
dapan1121 已提交
375
    case TDMT_VND_DELETE:
376
      if (vnodeProcessDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
377
      break;
378
    case TDMT_VND_BATCH_DEL:
379
      if (vnodeProcessBatchDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
380
      break;
H
Hongze Cheng 已提交
381
    /* TQ */
L
Liu Jicong 已提交
382
    case TDMT_VND_TMQ_SUBSCRIBE:
383
      if (tqProcessSubscribeReq(pVnode->pTq, ver, pReq, len) < 0) {
384
        goto _err;
L
Liu Jicong 已提交
385 386
      }
      break;
L
Liu Jicong 已提交
387
    case TDMT_VND_TMQ_DELETE_SUB:
388
      if (tqProcessDeleteSubReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
389 390 391
        goto _err;
      }
      break;
L
Liu Jicong 已提交
392
    case TDMT_VND_TMQ_COMMIT_OFFSET:
393
      if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
394
        goto _err;
L
Liu Jicong 已提交
395 396
      }
      break;
397
    case TDMT_VND_TMQ_SEEK_TO_OFFSET:
398
      if (tqProcessSeekReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
399 400 401
        goto _err;
      }
      break;
L
Liu Jicong 已提交
402
    case TDMT_VND_TMQ_ADD_CHECKINFO:
403
      if (tqProcessAddCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
404 405 406
        goto _err;
      }
      break;
L
Liu Jicong 已提交
407
    case TDMT_VND_TMQ_DEL_CHECKINFO:
408
      if (tqProcessDelCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
L
Liu Jicong 已提交
409 410 411
        goto _err;
      }
      break;
412
    case TDMT_STREAM_TASK_DEPLOY: {
413
      if (pVnode->restored && tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) {
414
        goto _err;
H
Hongze Cheng 已提交
415 416
      }
    } break;
L
Liu Jicong 已提交
417
    case TDMT_STREAM_TASK_DROP: {
418
      if (tqProcessTaskDropReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
L
Liu Jicong 已提交
419 420 421
        goto _err;
      }
    } break;
5
54liuyao 已提交
422
    case TDMT_STREAM_TASK_PAUSE: {
423
      if (pVnode->restored && tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
5
54liuyao 已提交
424 425 426 427
        goto _err;
      }
    } break;
    case TDMT_STREAM_TASK_RESUME: {
428
      if (pVnode->restored && tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
5
54liuyao 已提交
429 430 431
        goto _err;
      }
    } break;
L
Liu Jicong 已提交
432
    case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: {
433
      if (tqProcessTaskRecover2Req(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
L
Liu Jicong 已提交
434 435 436
        goto _err;
      }
    } break;
437
    case TDMT_STREAM_TASK_CHECK_RSP: {
438
      if (tqProcessStreamTaskCheckRsp(pVnode->pTq, ver, pReq, len) < 0) {
439 440 441
        goto _err;
      }
    } break;
442
    case TDMT_VND_ALTER_CONFIRM:
443
      needCommit = pVnode->config.hashChange;
444
      if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
445 446
        goto _err;
      }
447
      break;
448
    case TDMT_VND_ALTER_CONFIG:
449
      vnodeProcessAlterConfigReq(pVnode, ver, pReq, len, pRsp);
S
Shengliang Guan 已提交
450
      break;
H
Hongze Cheng 已提交
451
    case TDMT_VND_COMMIT:
B
Benguang Zhao 已提交
452 453
      needCommit = true;
      break;
dengyihao's avatar
dengyihao 已提交
454
    case TDMT_VND_CREATE_INDEX:
455
      vnodeProcessCreateIndexReq(pVnode, ver, pReq, len, pRsp);
dengyihao's avatar
dengyihao 已提交
456 457
      break;
    case TDMT_VND_DROP_INDEX:
458
      vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
dengyihao's avatar
dengyihao 已提交
459
      break;
H
Hongze Cheng 已提交
460
    case TDMT_VND_COMPACT:
461
      vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp);
H
Hongze Cheng 已提交
462
      goto _exit;
H
Hongze Cheng 已提交
463
    default:
L
Liu Jicong 已提交
464 465
      vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
      return -1;
H
Hongze Cheng 已提交
466 467
  }

S
Shengliang Guan 已提交
468
  vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,
469
         ver);
H
Hongze Cheng 已提交
470

471
  walApplyVer(pVnode->pWal, ver);
472

473
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
S
Shengliang Guan 已提交
474
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
475 476 477
    return -1;
  }

H
Hongze Cheng 已提交
478
  // commit if need
B
Benguang Zhao 已提交
479
  if (needCommit) {
480
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), ver);
481 482 483 484
    if (vnodeAsyncCommit(pVnode) < 0) {
      vError("vgId:%d, failed to vnode async commit since %s.", TD_VID(pVnode), tstrerror(terrno));
      goto _err;
    }
H
Hongze Cheng 已提交
485 486

    // start a new one
487
    if (vnodeBegin(pVnode) < 0) {
H
Hongze Cheng 已提交
488 489
      vError("vgId:%d, failed to begin vnode since %s.", TD_VID(pVnode), tstrerror(terrno));
      goto _err;
490
    }
H
Hongze Cheng 已提交
491 492
  }

H
Hongze Cheng 已提交
493
_exit:
H
Hongze Cheng 已提交
494
  return 0;
H
Hongze Cheng 已提交
495 496

_err:
497 498
  vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
         tstrerror(terrno), ver);
H
Hongze Cheng 已提交
499
  return -1;
H
Hongze Cheng 已提交
500 501
}

502
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
503
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
504 505 506
    return 0;
  }

507
  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType);
D
dapan1121 已提交
508 509 510
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
511
  vTrace("message in vnode query queue is processing");
H
Hongze Cheng 已提交
512 513 514
  if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME ||
       pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) &&
      !syncIsReadyForRead(pVnode->sync)) {
515
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
516 517 518
    return 0;
  }

519 520 521 522 523
  if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) {
    vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
    return 0;
  }

524
  SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
525 526
  initStorageAPI(&handle.api);

H
Hongze Cheng 已提交
527
  switch (pMsg->msgType) {
D
dapan1121 已提交
528
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
529
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
530
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
531
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
532
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
533 534
    case TDMT_VND_TMQ_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg);
535 536
    case TDMT_VND_TMQ_CONSUME_PUSH:
      return tqProcessPollPush(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
537 538
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
539
      return TSDB_CODE_APP_ERROR;
H
Hongze Cheng 已提交
540 541 542
  }
}

543
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
S
Shengliang Guan 已提交
544
  vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
L
Liu Jicong 已提交
545
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
546
       pMsg->msgType == TDMT_VND_BATCH_META) &&
547
      !syncIsReadyForRead(pVnode->sync)) {
548
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
549 550 551
    return 0;
  }

H
Hongze Cheng 已提交
552
  switch (pMsg->msgType) {
D
dapan1121 已提交
553
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
554
    case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
555
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
556
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
557
      return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
L
Liu Jicong 已提交
558 559
    // case TDMT_SCH_CANCEL_TASK:
    //   return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
560
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
561
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
562
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
563
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
564
    case TDMT_VND_TABLE_META:
D
dapan1121 已提交
565
      return vnodeGetTableMeta(pVnode, pMsg, true);
D
dapan1121 已提交
566
    case TDMT_VND_TABLE_CFG:
D
dapan1121 已提交
567 568 569
      return vnodeGetTableCfg(pVnode, pMsg, true);
    case TDMT_VND_BATCH_META:
      return vnodeGetBatchMeta(pVnode, pMsg);
H
Hongze Cheng 已提交
570 571
      //    case TDMT_VND_TMQ_CONSUME:
      //      return tqProcessPollReq(pVnode->pTq, pMsg);
572 573
    case TDMT_VND_TMQ_VG_WALINFO:
      return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
574 575 576
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
577
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
578 579
    case TDMT_STREAM_TASK_CHECK:
      return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
580
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
581
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
582 583
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
584 585
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
586
    case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
L
Liu Jicong 已提交
587 588 589 590 591
      return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
    case TDMT_STREAM_RECOVER_FINISH:
      return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_RECOVER_FINISH_RSP:
      return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
592 593
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
594
      return TSDB_CODE_APP_ERROR;
H
Hongze Cheng 已提交
595 596 597 598
  }
}

void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
C
Cary Xu 已提交
599
  // blockDebugShowDataBlocks(data, __func__);
600
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
601 602
}

D
dapan1121 已提交
603
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
604 605 606
  if (NULL == pMetaRsp) {
    return;
  }
L
Liu Jicong 已提交
607

D
dapan1121 已提交
608 609 610 611 612 613
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

H
Hongze Cheng 已提交
614 615 616
extern int32_t vnodeAsyncRetention(SVnode *pVnode, int64_t now);
extern int32_t vnodeSyncRetention(SVnode *pVnode, int64_t now);

617
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
618
  int32_t     code = 0;
S
Shengliang Guan 已提交
619 620
  SVTrimDbReq trimReq = {0};

H
Hongze Cheng 已提交
621 622 623 624
  // decode
  if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
S
Shengliang Guan 已提交
625 626
  }

C
Cary Xu 已提交
627 628
  vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);

H
Hongze Cheng 已提交
629
  code = vnodeSyncRetention(pVnode, trimReq.timestamp);
C
Cary Xu 已提交
630

H
Hongze Cheng 已提交
631 632
_exit:
  return code;
S
Shengliang Guan 已提交
633 634
}

635
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
636 637 638
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
639 640 641 642 643 644
  SVDropTtlTableReq ttlReq = {0};
  if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto end;
  }

645
  vDebug("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp);
S
Shengliang Guan 已提交
646
  int32_t ret = metaTtlDropTable(pVnode->pMeta, ttlReq.timestamp, tbUids);
L
Liu Jicong 已提交
647
  if (ret != 0) {
648 649
    goto end;
  }
L
Liu Jicong 已提交
650
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
651 652
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
653

H
Hongze Cheng 已提交
654
  vnodeSyncRetention(pVnode, ttlReq.timestamp);
H
Hongze Cheng 已提交
655

656 657 658 659 660
end:
  taosArrayDestroy(tbUids);
  return ret;
}

661
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
662
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
663
  SDecoder       coder;
H
Hongze Cheng 已提交
664

H
Hongze Cheng 已提交
665 666 667 668 669 670
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
671
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
672 673

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
674 675
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
676 677
  }

678
  if (metaCreateSTable(pVnode->pMeta, ver, &req) < 0) {
H
Hongze Cheng 已提交
679 680
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
681 682
  }

683
  if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) {
684 685 686
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
687

H
Hongze Cheng 已提交
688
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
689
  return 0;
H
Hongze Cheng 已提交
690 691

_err:
H
Hongze Cheng 已提交
692
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
693
  return -1;
H
Hongze Cheng 已提交
694 695
}

696
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
697
  SDecoder           decoder = {0};
698
  SEncoder           encoder = {0};
699
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
700 701
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
702 703 704
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
705
  STbUidStore       *pStore = NULL;
706
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
707 708

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
709 710 711
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
712

H
Hongze Cheng 已提交
713
  // decode
H
Hongze Cheng 已提交
714 715
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
716 717 718 719
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
720

H
Hongze Cheng 已提交
721
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
722 723
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
724 725 726 727 728
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
729
  // loop to create table
730
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
731
    pCreateReq = req.pReqs + iReq;
D
dapan1121 已提交
732
    memset(&cRsp, 0, sizeof(cRsp));
H
Hongze Cheng 已提交
733

C
Cary Xu 已提交
734 735 736 737
    if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
      rcode = -1;
      goto _exit;
    }
L
Liu Jicong 已提交
738

wafwerar's avatar
wafwerar 已提交
739 740 741 742 743
    if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
      rcode = -1;
      goto _exit;
    }

H
Hongze Cheng 已提交
744 745 746 747 748 749 750 751 752
    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
753
    if (metaCreateTable(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) {
H
Hongze Cheng 已提交
754 755 756 757 758
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
759
    } else {
H
Hongze Cheng 已提交
760
      cRsp.code = TSDB_CODE_SUCCESS;
761
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
762
      taosArrayPush(tbUids, &pCreateReq->uid);
L
Liu Jicong 已提交
763
      vnodeUpdateMetaRsp(pVnode, cRsp.pMeta);
H
Hongze Cheng 已提交
764
    }
H
Hongze Cheng 已提交
765 766

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
767 768
  }

H
Haojun Liao 已提交
769
  vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
770
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
771
  if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
C
Cary Xu 已提交
772 773
    goto _exit;
  }
774
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
775

H
Hongze Cheng 已提交
776
  // prepare rsp
777
  int32_t ret = 0;
wafwerar's avatar
wafwerar 已提交
778
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
779 780 781 782 783 784
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
785 786
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
H
Hongze Cheng 已提交
787

H
Hongze Cheng 已提交
788
_exit:
wmmhello's avatar
wmmhello 已提交
789 790
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
791
    taosMemoryFree(pCreateReq->comment);
wmmhello's avatar
wmmhello 已提交
792 793
    taosArrayDestroy(pCreateReq->ctb.tagName);
  }
794
  taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
795
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
796 797
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
798
  return rcode;
H
Hongze Cheng 已提交
799 800
}

801
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
817
  }
H
Hongze Cheng 已提交
818

819
  if (metaAlterSTable(pVnode->pMeta, ver, &req) < 0) {
H
Hongze Cheng 已提交
820 821 822 823 824 825 826
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
827 828 829
  return 0;
}

830
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
831
  SVDropStbReq req = {0};
832
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
833
  SDecoder     decoder = {0};
834
  SArray      *tbUidList = NULL;
H
Hongze Cheng 已提交
835 836 837 838 839 840

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
841 842
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
843 844 845 846 847
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
848 849
  tbUidList = taosArrayInit(8, sizeof(int64_t));
  if (tbUidList == NULL) goto _exit;
850
  if (metaDropSTable(pVnode->pMeta, ver, &req, tbUidList) < 0) {
851 852 853 854 855
    rcode = terrno;
    goto _exit;
  }

  if (tqUpdateTbUidList(pVnode->pTq, tbUidList, false) < 0) {
856 857 858
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
859

860 861 862 863 864
  if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
865 866
  // return rsp
_exit:
867
  if (tbUidList) taosArrayDestroy(tbUidList);
H
Hongze Cheng 已提交
868
  pRsp->code = rcode;
H
Hongze Cheng 已提交
869
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
870 871 872
  return 0;
}

873
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
874 875 876
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
877 878
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
879 880
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
881 882 883 884 885 886 887 888 889 890

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
891
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
892
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
893 894
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
895 896 897
  }

  // process
898
  if (metaAlterTable(pVnode->pMeta, ver, &vAlterTbReq, &vMetaRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
899
    vAlterTbRsp.code = terrno;
H
Hongze Cheng 已提交
900
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
901 902
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
903 904
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
905

D
dapan1121 已提交
906 907 908 909 910
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
911 912 913 914 915 916
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
M
Minglei Jin 已提交
917 918 919
  if (vMetaRsp.pSchemas) {
    taosMemoryFree(vMetaRsp.pSchemas);
  }
H
Hongze Cheng 已提交
920 921 922
  return 0;
}

923
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
924 925
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
926
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
927
  SEncoder         encoder = {0};
928
  int32_t          ret;
929
  SArray          *tbUids = NULL;
930
  STbUidStore     *pStore = NULL;
H
Hongze Cheng 已提交
931

H
Hongze Cheng 已提交
932
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
933 934 935
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
936 937

  // decode req
H
Hongze Cheng 已提交
938 939
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
940 941 942 943 944
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
945 946

  // process req
947
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
948
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
949 950
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

951
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
952 953
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
954
    tb_uid_t     tbUid = 0;
H
Hongze Cheng 已提交
955

H
Hongze Cheng 已提交
956
    /* code */
957
    ret = metaDropTable(pVnode->pMeta, ver, pDropTbReq, tbUids, &tbUid);
H
Hongze Cheng 已提交
958
    if (ret < 0) {
959
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
H
Hongze Cheng 已提交
960 961 962 963
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
964
    } else {
H
Hongze Cheng 已提交
965
      dropTbRsp.code = TSDB_CODE_SUCCESS;
966
      if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid);
H
Hongze Cheng 已提交
967 968 969 970 971
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

972
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);
973
  tdUpdateTbUidList(pVnode->pSma, pStore, false);
974

H
Hongze Cheng 已提交
975
_exit:
976
  taosArrayDestroy(tbUids);
977
  tdUidStoreFree(pStore);
H
Hongze Cheng 已提交
978
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
979 980 981 982 983
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
984
  taosArrayDestroy(rsp.pArray);
H
Hongze Cheng 已提交
985 986 987
  return 0;
}

988 989
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
990 991 992 993
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
994
  int32_t        rv = -1;
D
dapan 已提交
995 996 997

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
998 999 1000 1001 1002

  pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row), 1);  // TODO: use the real schema
  if (pSchema) {
    suid = msgIter->suid;
    rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031
typedef struct SSubmitReqConvertCxt {
  SSubmitMsgIter msgIter;
  SSubmitBlk    *pBlock;
  SSubmitBlkIter blkIter;
  STSRow        *pRow;
  STSRowIter     rowIter;
  SSubmitTbData *pTbData;
  STSchema      *pTbSchema;
  SArray        *pColValues;
} SSubmitReqConvertCxt;

static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) {
  taosMemoryFreeClear(pCxt->pTbSchema);
1032 1033
  pCxt->pTbSchema = metaGetTbTSchema(pMeta, pCxt->msgIter.suid > 0 ? pCxt->msgIter.suid : pCxt->msgIter.uid,
                                     pCxt->msgIter.sversion, 1);
1034 1035 1036 1037 1038
  if (NULL == pCxt->pTbSchema) {
    return TSDB_CODE_INVALID_MSG;
  }
  tdSTSRowIterInit(&pCxt->rowIter, pCxt->pTbSchema);

1039
  tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070
  if (NULL == pCxt->pTbData) {
    pCxt->pTbData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
    if (NULL == pCxt->pTbData) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  pCxt->pTbData->flags = 0;
  pCxt->pTbData->suid = pCxt->msgIter.suid;
  pCxt->pTbData->uid = pCxt->msgIter.uid;
  pCxt->pTbData->sver = pCxt->msgIter.sversion;
  pCxt->pTbData->pCreateTbReq = NULL;
  pCxt->pTbData->aRowP = taosArrayInit(128, POINTER_BYTES);
  if (NULL == pCxt->pTbData->aRowP) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosArrayDestroy(pCxt->pColValues);
  pCxt->pColValues = taosArrayInit(pCxt->pTbSchema->numOfCols, sizeof(SColVal));
  if (NULL == pCxt->pColValues) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for (int32_t i = 0; i < pCxt->pTbSchema->numOfCols; ++i) {
    SColVal val = COL_VAL_NONE(pCxt->pTbSchema->columns[i].colId, pCxt->pTbSchema->columns[i].type);
    taosArrayPush(pCxt->pColValues, &val);
  }

  return TSDB_CODE_SUCCESS;
}

static void vnodeDestroySubmitReqConvertCxt(SSubmitReqConvertCxt *pCxt) {
  taosMemoryFreeClear(pCxt->pTbSchema);
1071
  tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088
  taosMemoryFreeClear(pCxt->pTbData);
  taosArrayDestroy(pCxt->pColValues);
}

static int32_t vnodeCellValConvertToColVal(STColumn *pCol, SCellVal *pCellVal, SColVal *pColVal) {
  if (tdValTypeIsNone(pCellVal->valType)) {
    pColVal->flag = CV_FLAG_NONE;
    return TSDB_CODE_SUCCESS;
  }

  if (tdValTypeIsNull(pCellVal->valType)) {
    pColVal->flag = CV_FLAG_NULL;
    return TSDB_CODE_SUCCESS;
  }

  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pColVal->value.nData = varDataLen(pCellVal->val);
1089
    pColVal->value.pData = (uint8_t *)varDataVal(pCellVal->val);
1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127
  } else if (TSDB_DATA_TYPE_FLOAT == pCol->type) {
    float f = GET_FLOAT_VAL(pCellVal->val);
    memcpy(&pColVal->value.val, &f, sizeof(f));
  } else if (TSDB_DATA_TYPE_DOUBLE == pCol->type) {
    pColVal->value.val = *(int64_t *)pCellVal->val;
  } else {
    GET_TYPED_DATA(pColVal->value.val, int64_t, pCol->type, pCellVal->val);
  }

  pColVal->flag = CV_FLAG_VALUE;
  return TSDB_CODE_SUCCESS;
}

static int32_t vnodeTSRowConvertToColValArray(SSubmitReqConvertCxt *pCxt) {
  int32_t code = TSDB_CODE_SUCCESS;
  tdSTSRowIterReset(&pCxt->rowIter, pCxt->pRow);
  for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < pCxt->pTbSchema->numOfCols; ++i) {
    STColumn *pCol = pCxt->pTbSchema->columns + i;
    SCellVal  cellVal = {0};
    if (!tdSTSRowIterFetch(&pCxt->rowIter, pCol->colId, pCol->type, &cellVal)) {
      break;
    }
    code = vnodeCellValConvertToColVal(pCol, &cellVal, (SColVal *)taosArrayGet(pCxt->pColValues, i));
  }
  return code;
}

static int32_t vnodeDecodeCreateTbReq(SSubmitReqConvertCxt *pCxt) {
  if (pCxt->msgIter.schemaLen <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  pCxt->pTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
  if (NULL == pCxt->pTbData->pCreateTbReq) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SDecoder decoder = {0};
1128
  tDecoderInit(&decoder, (uint8_t *)pCxt->pBlock->data, pCxt->msgIter.schemaLen);
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180
  int32_t code = tDecodeSVCreateTbReq(&decoder, pCxt->pTbData->pCreateTbReq);
  tDecoderClear(&decoder);

  return code;
}

static int32_t vnodeSubmitReqConvertToSubmitReq2(SVnode *pVnode, SSubmitReq *pReq, SSubmitReq2 *pReq2) {
  pReq2->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
  if (NULL == pReq2->aSubmitTbData) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSubmitReqConvertCxt cxt = {0};

  int32_t code = tInitSubmitMsgIter(pReq, &cxt.msgIter);
  while (TSDB_CODE_SUCCESS == code) {
    code = tGetSubmitMsgNext(&cxt.msgIter, &cxt.pBlock);
    if (TSDB_CODE_SUCCESS == code) {
      if (NULL == cxt.pBlock) {
        break;
      }
      code = vnodeResetTableCxt(pVnode->pMeta, &cxt);
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = tInitSubmitBlkIter(&cxt.msgIter, cxt.pBlock, &cxt.blkIter);
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = vnodeDecodeCreateTbReq(&cxt);
    }
    while (TSDB_CODE_SUCCESS == code && (cxt.pRow = tGetSubmitBlkNext(&cxt.blkIter)) != NULL) {
      code = vnodeTSRowConvertToColValArray(&cxt);
      if (TSDB_CODE_SUCCESS == code) {
        SRow **pNewRow = taosArrayReserve(cxt.pTbData->aRowP, 1);
        code = tRowBuild(cxt.pColValues, cxt.pTbSchema, pNewRow);
      }
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = (NULL == taosArrayPush(pReq2->aSubmitTbData, cxt.pTbData) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);
    }
    if (TSDB_CODE_SUCCESS == code) {
      taosMemoryFreeClear(cxt.pTbData);
    }
  }

  vnodeDestroySubmitReqConvertCxt(&cxt);
  return code;
}

static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
  int32_t  code = TSDB_CODE_SUCCESS;
  char    *pMsg = NULL;
  uint32_t msglen = 0;
1181
  tEncodeSize(tEncodeSubmitReq, pSubmitReq, msglen, code);
1182 1183 1184 1185 1186 1187 1188 1189
  if (TSDB_CODE_SUCCESS == code) {
    pMsg = taosMemoryMalloc(msglen);
    if (NULL == pMsg) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    SEncoder encoder;
1190
    tEncoderInit(&encoder, (uint8_t *)pMsg, msglen);
1191
    code = tEncodeSubmitReq(&encoder, pSubmitReq);
1192 1193 1194 1195 1196 1197 1198 1199
    tEncoderClear(&encoder);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *ppMsg = pMsg;
  }
  return code;
}

1200
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
1201
  int32_t code = 0;
D
dapan1121 已提交
1202
  terrno = 0;
C
Cary Xu 已提交
1203

H
Hongze Cheng 已提交
1204
  SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
H
Hongze Cheng 已提交
1205
  SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
H
Hongze Cheng 已提交
1206
  SArray      *newTbUids = NULL;
H
Hongze Cheng 已提交
1207 1208 1209 1210
  int32_t      ret;
  SEncoder     ec = {0};

  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
1211

1212
  void           *pAllocMsg = NULL;
1213
  SSubmitReq2Msg *pMsg = (SSubmitReq2Msg *)pReq;
1214
  if (0 == pMsg->version) {
1215 1216 1217 1218
    code = vnodeSubmitReqConvertToSubmitReq2(pVnode, (SSubmitReq *)pMsg, pSubmitReq);
    if (TSDB_CODE_SUCCESS == code) {
      code = vnodeRebuildSubmitReqMsg(pSubmitReq, &pReq);
    }
1219 1220 1221
    if (TSDB_CODE_SUCCESS == code) {
      pAllocMsg = pReq;
    }
1222 1223 1224 1225 1226 1227 1228 1229 1230
    if (TSDB_CODE_SUCCESS != code) {
      goto _exit;
    }
  } else {
    // decode
    pReq = POINTER_SHIFT(pReq, sizeof(SSubmitReq2Msg));
    len -= sizeof(SSubmitReq2Msg);
    SDecoder dc = {0};
    tDecoderInit(&dc, pReq, len);
1231
    if (tDecodeSubmitReq(&dc, pSubmitReq) < 0) {
1232 1233 1234 1235
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }
    tDecoderClear(&dc);
D
dapan 已提交
1236
  }
C
Cary Xu 已提交
1237

H
Hongze Cheng 已提交
1238 1239 1240 1241 1242 1243 1244
  // scan
  TSKEY now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
  TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
  TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

H
Hongze Cheng 已提交
1245 1246 1247 1248 1249
    if (pSubmitTbData->pCreateTbReq && pSubmitTbData->pCreateTbReq->uid == 0) {
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }

H
Hongze Cheng 已提交
1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261
    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
      if (TARRAY_SIZE(pSubmitTbData->aCol) <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      SColData *pColData = (SColData *)taosArrayGet(pSubmitTbData->aCol, 0);
      TSKEY    *aKey = (TSKEY *)(pColData->pData);

      for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) {
        if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) {
          code = TSDB_CODE_INVALID_MSG;
1262
          vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
H
Hongze Cheng 已提交
1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273
          goto _exit;
        }
      }

    } else {
      int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
      SRow  **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);

      for (int32_t iRow = 0; iRow < nRow; ++iRow) {
        if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
          code = TSDB_CODE_INVALID_MSG;
1274
          vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
H
Hongze Cheng 已提交
1275 1276 1277 1278 1279 1280
          goto _exit;
        }
      }
    }
  }

H
Hongze Cheng 已提交
1281
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
H
Hongze Cheng 已提交
1282 1283 1284 1285 1286 1287 1288 1289 1290 1291
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

    if (pSubmitTbData->pCreateTbReq) {
      pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid;
    } else {
      SMetaInfo info = {0};

      code = metaGetInfo(pVnode->pMeta, pSubmitTbData->uid, &info, NULL);
      if (code) {
        code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
D
dapan1121 已提交
1292
        vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), pSubmitTbData->uid);
H
Hongze Cheng 已提交
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
        goto _exit;
      }

      if (info.suid != pSubmitTbData->suid) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      if (info.suid) {
        metaGetInfo(pVnode->pMeta, info.suid, &info, NULL);
      }

      if (pSubmitTbData->sver != info.skmVer) {
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
        goto _exit;
      }
    }
H
Hongze Cheng 已提交
1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325

    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
      int32_t   nColData = TARRAY_SIZE(pSubmitTbData->aCol);
      SColData *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);

      if (nColData <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      if (aColData[0].cid != PRIMARYKEY_TIMESTAMP_COL_ID || aColData[0].type != TSDB_DATA_TYPE_TIMESTAMP ||
          aColData[0].nVal <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

1326 1327
      for (int32_t j = 1; j < nColData; j++) {
        if (aColData[j].nVal != aColData[0].nVal) {
H
Hongze Cheng 已提交
1328 1329 1330 1331 1332
          code = TSDB_CODE_INVALID_MSG;
          goto _exit;
        }
      }
    }
H
Hongze Cheng 已提交
1333 1334
  }

L
Liu Jicong 已提交
1335 1336
  vDebug("vgId:%d, submit block size %d", TD_VID(pVnode), (int32_t)taosArrayGetSize(pSubmitReq->aSubmitTbData));

H
Hongze Cheng 已提交
1337
  // loop to handle
H
Hongze Cheng 已提交
1338
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
H
Hongze Cheng 已提交
1339 1340 1341 1342
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

    // create table
    if (pSubmitTbData->pCreateTbReq) {
H
Hongze Cheng 已提交
1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353
      // check (TODO: move check to create table)
      code = grantCheck(TSDB_GRANT_TIMESERIES);
      if (code) goto _exit;

      code = grantCheck(TSDB_GRANT_TABLE);
      if (code) goto _exit;

      // alloc if need
      if (pSubmitRsp->aCreateTbRsp == NULL &&
          (pSubmitRsp->aCreateTbRsp = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(SVCreateTbRsp))) ==
              NULL) {
H
Hongze Cheng 已提交
1354
        code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1355 1356 1357 1358 1359 1360
        goto _exit;
      }

      SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);

      // create table
1361
      if (metaCreateTable(pVnode->pMeta, ver, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) {
1362
        // create table success
H
Hongze Cheng 已提交
1363 1364 1365

        if (newTbUids == NULL &&
            (newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) {
H
Hongze Cheng 已提交
1366
          code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1367 1368 1369 1370 1371 1372
          goto _exit;
        }

        taosArrayPush(newTbUids, &pSubmitTbData->uid);

        if (pCreateTbRsp->pMeta) {
D
dapan1121 已提交
1373
          vnodeUpdateMetaRsp(pVnode, pCreateTbRsp->pMeta);
H
Hongze Cheng 已提交
1374 1375 1376 1377 1378 1379
        }
      } else {  // create table failed
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
          code = terrno;
          goto _exit;
        }
X
Xiaoyu Wang 已提交
1380
        terrno = 0;
1381
        pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid;  // update uid if table exist for using below
H
Hongze Cheng 已提交
1382
      }
H
Hongze Cheng 已提交
1383 1384 1385
    }

    // insert data
H
Hongze Cheng 已提交
1386
    int32_t affectedRows;
1387
    code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows);
H
Hongze Cheng 已提交
1388 1389 1390
    if (code) goto _exit;

    pSubmitRsp->affectedRows += affectedRows;
H
Hongze Cheng 已提交
1391
  }
H
Hongze Cheng 已提交
1392

1393
  // update the affected table uid list
H
Hongze Cheng 已提交
1394 1395 1396 1397
  if (taosArrayGetSize(newTbUids) > 0) {
    vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
           (int32_t)taosArrayGetSize(newTbUids));
    tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
H
Hongze Cheng 已提交
1398
  }
H
Hongze Cheng 已提交
1399 1400 1401 1402 1403 1404 1405 1406 1407 1408

_exit:
  // message
  pRsp->code = code;
  tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSSubmitRsp2(&ec, pSubmitRsp);
  tEncoderClear(&ec);

H
Hongze Cheng 已提交
1409 1410 1411 1412 1413 1414
  // update statistics
  atomic_add_fetch_64(&pVnode->statis.nInsert, pSubmitRsp->affectedRows);
  atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
  atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
  if (code == 0) {
    atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
1415
    tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
H
Hongze Cheng 已提交
1416 1417
  }

H
Hongze Cheng 已提交
1418 1419
  // clear
  taosArrayDestroy(newTbUids);
1420
  tDestroySubmitReq(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE);
H
Hongze Cheng 已提交
1421
  tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
H
Hongze Cheng 已提交
1422

D
dapan1121 已提交
1423 1424
  if (code) terrno = code;

1425
  taosMemoryFree(pAllocMsg);
1426

H
Hongze Cheng 已提交
1427
  return code;
L
Liu Jicong 已提交
1428
}
1429

1430
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1431
  SVCreateTSmaReq req = {0};
C
Cary Xu 已提交
1432
  SDecoder        coder = {0};
1433

C
Cary Xu 已提交
1434 1435 1436 1437 1438 1439
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
1440 1441 1442 1443 1444

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
1445 1446
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
1447
    goto _err;
1448
  }
C
Cary Xu 已提交
1449

1450
  if (tdProcessTSmaCreate(pVnode->pSma, ver, (const char *)&req) < 0) {
C
Cary Xu 已提交
1451
    if (pRsp) pRsp->code = terrno;
1452
    goto _err;
1453
  }
C
Cary Xu 已提交
1454

1455
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
1456
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
1457
         req.indexName, req.indexUid, ver, req.tableUid);
H
Hongze Cheng 已提交
1458
  return 0;
1459 1460 1461

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
1462
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
1463
         TD_VID(pVnode), req.indexName, req.indexUid, ver, req.tableUid, terrstr());
1464
  return -1;
L
Liu Jicong 已提交
1465
}
C
Cary Xu 已提交
1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
1478

1479
static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) {
B
Benguang Zhao 已提交
1480 1481 1482
  int32_t code = TSDB_CODE_SUCCESS;

  vInfo("vgId:%d, trim meta of tables per hash range [%" PRIu32 ", %" PRIu32 "]. apply-index:%" PRId64, TD_VID(pVnode),
1483
        pVnode->config.hashBegin, pVnode->config.hashEnd, ver);
B
Benguang Zhao 已提交
1484 1485

  // TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
M
Minglei Jin 已提交
1486
  code = metaTrimTables(pVnode->pMeta);
B
Benguang Zhao 已提交
1487 1488 1489 1490

  return code;
}

1491
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
B
Benguang Zhao 已提交
1492 1493 1494 1495 1496 1497
  vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confim msg is processed", TD_VID(pVnode));
  int32_t code = TSDB_CODE_SUCCESS;
  if (!pVnode->config.hashChange) {
    goto _exit;
  }

1498
  code = vnodeConsolidateAlterHashRange(pVnode, ver);
1499
  if (code < 0) {
1500
    vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(), ver);
1501 1502
    goto _exit;
  }
B
Benguang Zhao 已提交
1503 1504 1505
  pVnode->config.hashChange = false;

_exit:
1506
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
B
Benguang Zhao 已提交
1507
  pRsp->code = code;
1508 1509 1510
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

1511
  return code;
1512
}
S
Shengliang Guan 已提交
1513

1514
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
1515 1516
  bool walChanged = false;
  bool tsdbChanged = false;
1517

S
Shengliang Guan 已提交
1518 1519
  SAlterVnodeConfigReq req = {0};
  if (tDeserializeSAlterVnodeConfigReq(pReq, len, &req) != 0) {
1520 1521 1522 1523
    terrno = TSDB_CODE_INVALID_MSG;
    return TSDB_CODE_INVALID_MSG;
  }

1524
  vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
1525 1526
        " cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d walRetentionPeriod:%d "
        "walRetentionSize:%d",
1527 1528
        TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
        req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
1529
        req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize);
1530 1531 1532

  if (pVnode->config.cacheLastSize != req.cacheLastSize) {
    pVnode->config.cacheLastSize = req.cacheLastSize;
1533 1534
    tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
  }
1535

1536
  if (pVnode->config.szBuf != req.buffer * 1024LL * 1024LL) {
S
Shengliang Guan 已提交
1537
    vInfo("vgId:%d, vnode buffer is changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pVnode->config.szBuf,
1538
          (uint64_t)(req.buffer * 1024LL * 1024LL));
1539
    pVnode->config.szBuf = req.buffer * 1024LL * 1024LL;
H
Hongze Cheng 已提交
1540 1541
  }

1542 1543
  if (pVnode->config.szCache != req.pages) {
    if (metaAlterCache(pVnode->pMeta, req.pages) < 0) {
S
Shengliang Guan 已提交
1544
      vError("vgId:%d, failed to change vnode pages from %d to %d failed since %s", TD_VID(pVnode),
1545
             pVnode->config.szCache, req.pages, tstrerror(errno));
H
Hongze Cheng 已提交
1546 1547
      return errno;
    } else {
S
Shengliang Guan 已提交
1548
      vInfo("vgId:%d, vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, req.pages);
1549
      pVnode->config.szCache = req.pages;
H
Hongze Cheng 已提交
1550
    }
H
Hongze Cheng 已提交
1551 1552
  }

1553 1554
  if (pVnode->config.cacheLast != req.cacheLast) {
    pVnode->config.cacheLast = req.cacheLast;
1555 1556
  }

1557 1558
  if (pVnode->config.walCfg.fsyncPeriod != req.walFsyncPeriod) {
    pVnode->config.walCfg.fsyncPeriod = req.walFsyncPeriod;
1559 1560 1561
    walChanged = true;
  }

1562 1563
  if (pVnode->config.walCfg.level != req.walLevel) {
    pVnode->config.walCfg.level = req.walLevel;
1564 1565 1566 1567 1568 1569 1570
    walChanged = true;
  }

  if (pVnode->config.walCfg.retentionPeriod != req.walRetentionPeriod) {
    pVnode->config.walCfg.retentionPeriod = req.walRetentionPeriod;
    walChanged = true;
  }
1571

1572 1573
  if (pVnode->config.walCfg.retentionSize != req.walRetentionSize) {
    pVnode->config.walCfg.retentionSize = req.walRetentionSize;
1574 1575 1576
    walChanged = true;
  }

1577 1578
  if (pVnode->config.tsdbCfg.keep0 != req.daysToKeep0) {
    pVnode->config.tsdbCfg.keep0 = req.daysToKeep0;
1579
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1580
      tsdbChanged = true;
1581 1582 1583
    }
  }

1584 1585
  if (pVnode->config.tsdbCfg.keep1 != req.daysToKeep1) {
    pVnode->config.tsdbCfg.keep1 = req.daysToKeep1;
1586
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1587
      tsdbChanged = true;
1588 1589 1590
    }
  }

1591 1592
  if (pVnode->config.tsdbCfg.keep2 != req.daysToKeep2) {
    pVnode->config.tsdbCfg.keep2 = req.daysToKeep2;
1593
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1594
      tsdbChanged = true;
1595 1596 1597
    }
  }

1598 1599 1600 1601 1602 1603 1604 1605
  if (req.sttTrigger != -1 && req.sttTrigger != pVnode->config.sttTrigger) {
    pVnode->config.sttTrigger = req.sttTrigger;
  }

  if (req.minRows != -1 && req.minRows != pVnode->config.tsdbCfg.minRows) {
    pVnode->config.tsdbCfg.minRows = req.minRows;
  }

1606
  if (walChanged) {
M
Minglei Jin 已提交
1607 1608 1609 1610 1611
    walAlter(pVnode->pWal, &pVnode->config.walCfg);
  }

  if (tsdbChanged) {
    tsdbSetKeepCfg(pVnode->pTsdb, &pVnode->config.tsdbCfg);
1612 1613
  }

1614 1615 1616
  return 0;
}

1617
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1618 1619 1620 1621 1622
  SBatchDeleteReq deleteReq;
  SDecoder        decoder;
  tDecoderInit(&decoder, pReq, len);
  tDecodeSBatchDeleteReq(&decoder, &deleteReq);

1623
  SMetaReader mr = {0};
1624
  metaReaderInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
1625

1626 1627 1628
  int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
  for (int32_t i = 0; i < sz; i++) {
    SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i);
1629 1630
    char             *name = pOneReq->tbname;
    if (metaGetTableEntryByName(&mr, name) < 0) {
S
Shengliang Guan 已提交
1631
      vDebug("vgId:%d, stream delete msg, skip since no table: %s", pVnode->config.vgId, name);
1632 1633 1634 1635 1636
      continue;
    }

    int64_t uid = mr.me.uid;

1637
    int32_t code = tsdbDeleteTableData(pVnode->pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
1638 1639 1640
    if (code < 0) {
      terrno = code;
      vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
L
Liu Jicong 已提交
1641
             TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
1642
    }
1643 1644

    tDecoderClear(&mr.coder);
1645
  }
1646
  metaReaderClear(&mr);
1647
  taosArrayDestroy(deleteReq.deleteReqs);
1648 1649 1650
  return 0;
}

1651
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
1652 1653 1654 1655
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

wmmhello's avatar
wmmhello 已提交
1656 1657 1658 1659 1660
  pRsp->msgType = TDMT_VND_DELETE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

H
Hongze Cheng 已提交
1661 1662 1663 1664 1665 1666 1667 1668
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
1669
  ASSERT(taosArrayGetSize(pRes->uidList) == 0 || (pRes->skey != 0 && pRes->ekey != 0));
H
Hongze Cheng 已提交
1670 1671

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
1672
    code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
H
Hongze Cheng 已提交
1673 1674 1675 1676 1677 1678
                               pRes->skey, pRes->ekey);
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
wmmhello's avatar
wmmhello 已提交
1679 1680

  SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows};
L
Liu Jicong 已提交
1681
  int32_t     ret = 0;
wmmhello's avatar
wmmhello 已提交
1682 1683
  tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
L
Liu Jicong 已提交
1684
  SEncoder ec = {0};
wmmhello's avatar
wmmhello 已提交
1685 1686 1687
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVDeleteRsp(&ec, &rsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
1688 1689 1690 1691
  return code;

_err:
  return code;
M
Minglei Jin 已提交
1692
}
1693
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
dengyihao's avatar
dengyihao 已提交
1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_CREATE_INDEX_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);
  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }
1709
  if (metaAddIndexToSTable(pVnode->pMeta, ver, &req) < 0) {
dengyihao's avatar
dengyihao 已提交
1710 1711 1712 1713 1714 1715 1716 1717
    pRsp->code = terrno;
    goto _err;
  }
  tDecoderClear(&dc);
  return 0;
_err:
  tDecoderClear(&dc);
  return -1;
dengyihao's avatar
dengyihao 已提交
1718
}
1719
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
dengyihao's avatar
dengyihao 已提交
1720
  SDropIndexReq req = {0};
dengyihao's avatar
dengyihao 已提交
1721
  pRsp->msgType = TDMT_VND_DROP_INDEX_RSP;
dengyihao's avatar
dengyihao 已提交
1722 1723 1724 1725 1726 1727 1728 1729
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  if (tDeserializeSDropIdxReq(pReq, len, &req)) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1730

1731
  if (metaDropIndexFromSTable(pVnode->pMeta, ver, &req) < 0) {
dengyihao's avatar
dengyihao 已提交
1732 1733 1734
    pRsp->code = terrno;
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1735 1736
  return TSDB_CODE_SUCCESS;
}
1737

1738
extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
1739

1740 1741
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
  return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp);
H
Hongze Cheng 已提交
1742
}
1743

H
Hongze Cheng 已提交
1744
#ifndef TD_ENTERPRISE
1745
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1746
  return 0;
dengyihao's avatar
dengyihao 已提交
1747
}
H
Hongze Cheng 已提交
1748
#endif