vnodeSvr.c 56.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16 17
#include "tencode.h"
#include "tmsg.h"
H
Hongze Cheng 已提交
18
#include "vnd.h"
H
Hongze Cheng 已提交
19 20
#include "vnode.h"
#include "vnodeInt.h"
H
Hongze Cheng 已提交
21

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
39

40
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t btime, int64_t *pUid) {
H
Hongze Cheng 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
  int32_t code = 0;
  int32_t lino = 0;

  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // flags
  if (tDecodeI32v(pCoder, NULL) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // name
  char *name = NULL;
  if (tDecodeCStr(pCoder, &name) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // uid
  int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
  if (uid == 0) {
    uid = tGenIdPI64();
  }
  *(int64_t *)(pCoder->data + pCoder->pos) = uid;

69 70
  // btime
  *(int64_t *)(pCoder->data + pCoder->pos + 8) = btime;
H
Hongze Cheng 已提交
71 72 73 74 75 76 77 78

  tEndDecode(pCoder);

_exit:
  if (code) {
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    vTrace("vgId:%d %s done, table:%s uid generated:%" PRId64, TD_VID(pVnode), __func__, name, uid);
H
Hongze Cheng 已提交
79
    if (pUid) *pUid = uid;
H
Hongze Cheng 已提交
80 81 82
  }
  return code;
}
H
Hongze Cheng 已提交
83 84 85 86
static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;
  int32_t lino = 0;

87
  int64_t  btime = taosGetTimestampMs();
H
Hongze Cheng 已提交
88
  SDecoder dc = {0};
H
Hongze Cheng 已提交
89
  int32_t  nReqs;
H
Hongze Cheng 已提交
90

H
Hongze Cheng 已提交
91 92 93 94 95 96 97 98 99 100 101
  tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
  if (tStartDecode(&dc) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    return code;
  }

  if (tDecodeI32v(&dc, &nReqs) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  for (int32_t iReq = 0; iReq < nReqs; iReq++) {
102
    code = vnodePreprocessCreateTableReq(pVnode, &dc, btime, NULL);
H
Hongze Cheng 已提交
103
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
104 105 106 107 108 109 110 111
  }

  tEndDecode(&dc);

_exit:
  tDecoderClear(&dc);
  return code;
}
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138

static int32_t vnodePreProcessAlterTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = TSDB_CODE_INVALID_MSG;
  int32_t lino = 0;

  SDecoder dc = {0};
  tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));

  SVAlterTbReq vAlterTbReq = {0};
  int64_t      ctimeMs = taosGetTimestampMs();
  if (tDecodeSVAlterTbReqSetCtime(&dc, &vAlterTbReq, ctimeMs) < 0) {
    goto _exit;
  }

  code = 0;

_exit:
  tDecoderClear(&dc);
  if (code) {
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    vTrace("vgId:%d %s done, table:%s ctimeMs generated:%" PRId64, TD_VID(pVnode), __func__, vAlterTbReq.tbName,
           ctimeMs);
  }
  return code;
}

H
Hongze Cheng 已提交
139
extern int64_t tsMaxKeyByPrecision[];
140
static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t btimeMs, int64_t ctimeMs) {
H
Hongze Cheng 已提交
141 142 143
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
144 145 146 147
  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
148

H
Hongze Cheng 已提交
149 150 151 152 153
  SSubmitTbData submitTbData;
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
154

H
Hongze Cheng 已提交
155
  int64_t uid;
H
Hongze Cheng 已提交
156
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
157
    code = vnodePreprocessCreateTableReq(pVnode, pCoder, btimeMs, &uid);
H
Hongze Cheng 已提交
158 159 160 161 162 163 164 165
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // submit data
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
166 167 168

  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
    *(int64_t *)(pCoder->data + pCoder->pos) = uid;
H
Hongze Cheng 已提交
169
    pCoder->pos += sizeof(int64_t);
H
Hongze Cheng 已提交
170
  } else {
H
Hongze Cheng 已提交
171 172 173 174
    if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
      code = TSDB_CODE_INVALID_MSG;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
175
  }
H
Hongze Cheng 已提交
176

H
Hongze Cheng 已提交
177
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
H
Hongze Cheng 已提交
178 179 180 181
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
182
  // scan and check
183
  TSKEY now = btimeMs;
H
Hongze Cheng 已提交
184 185 186 187 188 189 190 191 192 193
  if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) {
    now *= 1000;
  } else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
    now *= 1000000;
  }
  TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
  TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
    uint64_t nColData;
    if (tDecodeU64v(pCoder, &nColData) < 0) {
H
Hongze Cheng 已提交
194
      code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
195
      goto _exit;
H
Hongze Cheng 已提交
196 197
    }

H
Hongze Cheng 已提交
198 199
    SColData colData = {0};
    pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
H
Hongze Cheng 已提交
200 201 202 203 204
    if (colData.flag != HAS_VALUE) {
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }

H
Hongze Cheng 已提交
205 206 207 208 209 210
    for (int32_t iRow = 0; iRow < colData.nVal; iRow++) {
      if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) {
        code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
        goto _exit;
      }
    }
211 212 213 214

    for (uint64_t i = 1; i < nColData; i++) {
      pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
    }
H
Hongze Cheng 已提交
215 216 217
  } else {
    uint64_t nRow;
    if (tDecodeU64v(pCoder, &nRow) < 0) {
H
Hongze Cheng 已提交
218
      code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
219
      goto _exit;
H
Hongze Cheng 已提交
220
    }
H
Hongze Cheng 已提交
221

H
Hongze Cheng 已提交
222 223 224
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
      pCoder->pos += pRow->len;
H
Hongze Cheng 已提交
225

H
Hongze Cheng 已提交
226 227 228
      if (pRow->ts < minKey || pRow->ts > maxKey) {
        code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
        goto _exit;
H
Hongze Cheng 已提交
229
      }
H
Hongze Cheng 已提交
230 231
    }
  }
H
Hongze Cheng 已提交
232

233 234 235
  *(int64_t *)(pCoder->data + pCoder->pos) = ctimeMs;
  pCoder->pos += sizeof(int64_t);

H
Hongze Cheng 已提交
236
  tEndDecode(pCoder);
H
Hongze Cheng 已提交
237

H
Hongze Cheng 已提交
238
_exit:
H
Hongze Cheng 已提交
239
  return code;
H
Hongze Cheng 已提交
240 241 242 243
}
static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
244

H
Hongze Cheng 已提交
245
  SDecoder *pCoder = &(SDecoder){0};
H
Hongze Cheng 已提交
246

H
Hongze Cheng 已提交
247
  if (taosHton64(((SSubmitReq2Msg *)pMsg->pCont)->version) != 1) {
H
Hongze Cheng 已提交
248 249 250 251
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

X
Xiaoyu Wang 已提交
252
  tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SSubmitReq2Msg), pMsg->contLen - sizeof(SSubmitReq2Msg));
H
Hongze Cheng 已提交
253

H
Hongze Cheng 已提交
254 255 256 257
  if (tStartDecode(pCoder) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
258

H
Hongze Cheng 已提交
259 260 261 262 263
  uint64_t nSubmitTbData;
  if (tDecodeU64v(pCoder, &nSubmitTbData) < 0) {
    code = TSDB_CODE_INVALID_MSG;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
264

265 266
  int64_t btimeMs = taosGetTimestampMs();
  int64_t ctimeMs = btimeMs;
H
Hongze Cheng 已提交
267
  for (int32_t i = 0; i < nSubmitTbData; i++) {
268
    code = vnodePreProcessSubmitTbData(pVnode, pCoder, btimeMs, ctimeMs);
H
Hongze Cheng 已提交
269
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
270
  }
H
Hongze Cheng 已提交
271

H
Hongze Cheng 已提交
272
  tEndDecode(pCoder);
H
Hongze Cheng 已提交
273

H
Hongze Cheng 已提交
274
_exit:
275
  if (code) {
276
    vError("vgId:%d, failed to preprocess submit request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code),
277 278
           pMsg->msgType);
  }
H
Hongze Cheng 已提交
279
  tDecoderClear(pCoder);
H
Hongze Cheng 已提交
280 281
  return code;
}
H
Hongze Cheng 已提交
282

H
Hongze Cheng 已提交
283 284 285
static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;

286 287 288 289 290
  int32_t    size;
  int32_t    ret;
  uint8_t   *pCont;
  SEncoder  *pCoder = &(SEncoder){0};
  SDeleteRes res = {0};
H
Haojun Liao 已提交
291

292
  SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Haojun Liao 已提交
293
  initStorageAPI(&handle.api);
H
Hongze Cheng 已提交
294 295 296 297

  code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
  if (code) goto _exit;

298
  res.ctimeMs = taosGetTimestampMs();
H
Hongze Cheng 已提交
299 300 301
  // malloc and encode
  tEncodeSize(tEncodeDeleteRes, &res, size, ret);
  pCont = rpcMallocCont(size + sizeof(SMsgHead));
H
Hongze Cheng 已提交
302

H
Hongze Cheng 已提交
303 304
  ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
  ((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
H
Hongze Cheng 已提交
305

H
Hongze Cheng 已提交
306 307 308
  tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
  tEncodeDeleteRes(pCoder, &res);
  tEncoderClear(pCoder);
H
Hongze Cheng 已提交
309

H
Hongze Cheng 已提交
310 311 312
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = pCont;
  pMsg->contLen = size + sizeof(SMsgHead);
H
Hongze Cheng 已提交
313

H
Hongze Cheng 已提交
314 315 316 317 318
  taosArrayDestroy(res.uidList);

_exit:
  return code;
}
H
Hongze Cheng 已提交
319

320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
static int32_t vnodePreProcessBatchDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;
  int32_t lino = 0;

  int64_t         ctimeMs = taosGetTimestampMs();
  SBatchDeleteReq pReq = {0};
  SDecoder       *pCoder = &(SDecoder){0};

  tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));

  if (tDecodeSBatchDeleteReqSetCtime(pCoder, &pReq, ctimeMs) < 0) {
    code = TSDB_CODE_INVALID_MSG;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pReq.deleteReqs);

  if (code) {
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    vTrace("vgId:%d %s done, ctimeMs generated:%" PRId64, TD_VID(pVnode), __func__, ctimeMs);
  }
  return code;
}

H
Hongze Cheng 已提交
345 346 347 348 349 350 351
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = 0;

  switch (pMsg->msgType) {
    case TDMT_VND_CREATE_TABLE: {
      code = vnodePreProcessCreateTableMsg(pVnode, pMsg);
    } break;
352 353 354
    case TDMT_VND_ALTER_TABLE: {
      code = vnodePreProcessAlterTableMsg(pVnode, pMsg);
    } break;
H
Hongze Cheng 已提交
355 356 357 358 359
    case TDMT_VND_SUBMIT: {
      code = vnodePreProcessSubmitMsg(pVnode, pMsg);
    } break;
    case TDMT_VND_DELETE: {
      code = vnodePreProcessDeleteMsg(pVnode, pMsg);
H
Hongze Cheng 已提交
360
    } break;
361 362 363
    case TDMT_VND_BATCH_DEL: {
      code = vnodePreProcessBatchDeleteMsg(pVnode, pMsg);
    } break;
H
Hongze Cheng 已提交
364 365 366
    default:
      break;
  }
H
Hongze Cheng 已提交
367

H
Hongze Cheng 已提交
368 369
_exit:
  if (code) {
370
    vError("vgId:%d, failed to preprocess write request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code),
H
Hongze Cheng 已提交
371 372
           pMsg->msgType);
  }
H
Hongze Cheng 已提交
373
  return code;
H
Hongze Cheng 已提交
374 375
}

376
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg *pRsp) {
377 378 379 380
  void   *ptr = NULL;
  void   *pReq;
  int32_t len;
  int32_t ret;
381

382 383
  if (ver <= pVnode->state.applied) {
    vError("vgId:%d, duplicate write request. ver: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), ver,
384
           pVnode->state.applied);
385
    terrno = TSDB_CODE_VND_DUP_REQUEST;
386 387 388
    return -1;
  }

389
  vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver);
H
Hongze Cheng 已提交
390

391
  ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
392
  ASSERT(pVnode->state.applied + 1 == ver);
393

394
  atomic_store_64(&pVnode->state.applied, ver);
395
  atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
H
Hongze Cheng 已提交
396

397 398
  if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;

L
Liu Jicong 已提交
399
  if (pMsg->msgType == TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE || pMsg->msgType == TDMT_STREAM_TASK_CHECK_RSP) {
400
    if (tqCheckLogInWal(pVnode->pTq, ver)) return 0;
L
Liu Jicong 已提交
401 402
  }

H
Hongze Cheng 已提交
403 404 405
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
B
Benguang Zhao 已提交
406
  bool needCommit = false;
H
Hongze Cheng 已提交
407 408

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
409
    /* META */
H
Hongze Cheng 已提交
410
    case TDMT_VND_CREATE_STB:
411
      if (vnodeProcessCreateStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
412
      break;
H
Hongze Cheng 已提交
413
    case TDMT_VND_ALTER_STB:
414
      if (vnodeProcessAlterStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
415
      break;
H
Hongze Cheng 已提交
416
    case TDMT_VND_DROP_STB:
417
      if (vnodeProcessDropStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
418
      break;
H
Hongze Cheng 已提交
419
    case TDMT_VND_CREATE_TABLE:
420
      if (vnodeProcessCreateTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
421 422
      break;
    case TDMT_VND_ALTER_TABLE:
423
      if (vnodeProcessAlterTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
424
      break;
H
Hongze Cheng 已提交
425
    case TDMT_VND_DROP_TABLE:
426
      if (vnodeProcessDropTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
427
      break;
428
    case TDMT_VND_DROP_TTL_TABLE:
429
      if (vnodeProcessDropTtlTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
430
      break;
S
Shengliang Guan 已提交
431
    case TDMT_VND_TRIM:
432
      if (vnodeProcessTrimReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
433 434
      break;
    case TDMT_VND_CREATE_SMA:
435
      if (vnodeProcessCreateTSmaReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
S
Shengliang Guan 已提交
436
      break;
H
Hongze Cheng 已提交
437
    /* TSDB */
H
Hongze Cheng 已提交
438
    case TDMT_VND_SUBMIT:
439
      if (vnodeProcessSubmitReq(pVnode, ver, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
440
      break;
D
dapan1121 已提交
441
    case TDMT_VND_DELETE:
442
      if (vnodeProcessDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
D
dapan1121 已提交
443
      break;
444
    case TDMT_VND_BATCH_DEL:
445
      if (vnodeProcessBatchDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
446
      break;
H
Hongze Cheng 已提交
447
    /* TQ */
L
Liu Jicong 已提交
448
    case TDMT_VND_TMQ_SUBSCRIBE:
449
      if (tqProcessSubscribeReq(pVnode->pTq, ver, pReq, len) < 0) {
450
        goto _err;
L
Liu Jicong 已提交
451 452
      }
      break;
L
Liu Jicong 已提交
453
    case TDMT_VND_TMQ_DELETE_SUB:
454
      if (tqProcessDeleteSubReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
455 456 457
        goto _err;
      }
      break;
L
Liu Jicong 已提交
458
    case TDMT_VND_TMQ_COMMIT_OFFSET:
459
      if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
460
        goto _err;
L
Liu Jicong 已提交
461 462
      }
      break;
463
    case TDMT_VND_TMQ_SEEK_TO_OFFSET:
464
      if (tqProcessSeekReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
465 466 467
        goto _err;
      }
      break;
L
Liu Jicong 已提交
468
    case TDMT_VND_TMQ_ADD_CHECKINFO:
469
      if (tqProcessAddCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
470 471 472
        goto _err;
      }
      break;
L
Liu Jicong 已提交
473
    case TDMT_VND_TMQ_DEL_CHECKINFO:
474
      if (tqProcessDelCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
L
Liu Jicong 已提交
475 476 477
        goto _err;
      }
      break;
478
    case TDMT_STREAM_TASK_DEPLOY: {
479
      if (pVnode->restored && tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) {
480
        goto _err;
H
Hongze Cheng 已提交
481 482
      }
    } break;
L
Liu Jicong 已提交
483
    case TDMT_STREAM_TASK_DROP: {
484
      if (tqProcessTaskDropReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
L
Liu Jicong 已提交
485 486 487
        goto _err;
      }
    } break;
5
54liuyao 已提交
488
    case TDMT_STREAM_TASK_PAUSE: {
489
      if (pVnode->restored && tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
5
54liuyao 已提交
490 491 492 493
        goto _err;
      }
    } break;
    case TDMT_STREAM_TASK_RESUME: {
494
      if (pVnode->restored && tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
5
54liuyao 已提交
495 496 497
        goto _err;
      }
    } break;
L
Liu Jicong 已提交
498
    case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: {
499
      if (tqProcessTaskRecover2Req(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
L
Liu Jicong 已提交
500 501 502
        goto _err;
      }
    } break;
503
    case TDMT_STREAM_TASK_CHECK_RSP: {
504
      if (tqProcessStreamTaskCheckRsp(pVnode->pTq, ver, pReq, len) < 0) {
505 506 507
        goto _err;
      }
    } break;
508
    case TDMT_VND_ALTER_CONFIRM:
509
      needCommit = pVnode->config.hashChange;
510
      if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
511 512
        goto _err;
      }
513
      break;
514
    case TDMT_VND_ALTER_CONFIG:
515
      vnodeProcessAlterConfigReq(pVnode, ver, pReq, len, pRsp);
S
Shengliang Guan 已提交
516
      break;
H
Hongze Cheng 已提交
517
    case TDMT_VND_COMMIT:
B
Benguang Zhao 已提交
518 519
      needCommit = true;
      break;
dengyihao's avatar
dengyihao 已提交
520
    case TDMT_VND_CREATE_INDEX:
521
      vnodeProcessCreateIndexReq(pVnode, ver, pReq, len, pRsp);
dengyihao's avatar
dengyihao 已提交
522 523
      break;
    case TDMT_VND_DROP_INDEX:
524
      vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
dengyihao's avatar
dengyihao 已提交
525
      break;
H
Hongze Cheng 已提交
526
    case TDMT_VND_COMPACT:
527
      vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp);
H
Hongze Cheng 已提交
528
      goto _exit;
H
Hongze Cheng 已提交
529
    default:
L
Liu Jicong 已提交
530 531
      vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
      return -1;
H
Hongze Cheng 已提交
532 533
  }

S
Shengliang Guan 已提交
534
  vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,
535
         ver);
H
Hongze Cheng 已提交
536

537
  walApplyVer(pVnode->pWal, ver);
538

539
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
S
Shengliang Guan 已提交
540
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
541 542 543
    return -1;
  }

H
Hongze Cheng 已提交
544
  // commit if need
B
Benguang Zhao 已提交
545
  if (needCommit) {
546
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), ver);
547 548 549 550
    if (vnodeAsyncCommit(pVnode) < 0) {
      vError("vgId:%d, failed to vnode async commit since %s.", TD_VID(pVnode), tstrerror(terrno));
      goto _err;
    }
H
Hongze Cheng 已提交
551 552

    // start a new one
553
    if (vnodeBegin(pVnode) < 0) {
H
Hongze Cheng 已提交
554 555
      vError("vgId:%d, failed to begin vnode since %s.", TD_VID(pVnode), tstrerror(terrno));
      goto _err;
556
    }
H
Hongze Cheng 已提交
557 558
  }

H
Hongze Cheng 已提交
559
_exit:
H
Hongze Cheng 已提交
560
  return 0;
H
Hongze Cheng 已提交
561 562

_err:
563 564
  vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
         tstrerror(terrno), ver);
H
Hongze Cheng 已提交
565
  return -1;
H
Hongze Cheng 已提交
566 567
}

568
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan1121 已提交
569
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
D
dapan1121 已提交
570 571 572
    return 0;
  }

573
  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType);
D
dapan1121 已提交
574 575 576
}

int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
577
  vTrace("message in vnode query queue is processing");
H
Hongze Cheng 已提交
578 579 580
  if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME ||
       pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) &&
      !syncIsReadyForRead(pVnode->sync)) {
581
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
582 583 584
    return 0;
  }

585 586 587 588 589
  if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) {
    vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
    return 0;
  }

590
  SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
591 592
  initStorageAPI(&handle.api);

H
Hongze Cheng 已提交
593
  switch (pMsg->msgType) {
D
dapan1121 已提交
594
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
595
    case TDMT_SCH_MERGE_QUERY:
D
dapan1121 已提交
596
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
597
    case TDMT_SCH_QUERY_CONTINUE:
D
dapan1121 已提交
598
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
599 600
    case TDMT_VND_TMQ_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg);
601 602
    case TDMT_VND_TMQ_CONSUME_PUSH:
      return tqProcessPollPush(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
603 604
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
605
      return TSDB_CODE_APP_ERROR;
H
Hongze Cheng 已提交
606 607 608
  }
}

609
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
S
Shengliang Guan 已提交
610
  vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
L
Liu Jicong 已提交
611
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
612
       pMsg->msgType == TDMT_VND_BATCH_META) &&
613
      !syncIsReadyForRead(pVnode->sync)) {
614
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
615 616 617
    return 0;
  }

H
Hongze Cheng 已提交
618
  switch (pMsg->msgType) {
D
dapan1121 已提交
619
    case TDMT_SCH_FETCH:
D
dapan1121 已提交
620
    case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
621
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
622
    case TDMT_SCH_FETCH_RSP:
D
dapan1121 已提交
623
      return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
L
Liu Jicong 已提交
624 625
    // case TDMT_SCH_CANCEL_TASK:
    //   return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
626
    case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
627
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
D
dapan1121 已提交
628
    case TDMT_SCH_QUERY_HEARTBEAT:
D
dapan1121 已提交
629
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
H
Hongze Cheng 已提交
630
    case TDMT_VND_TABLE_META:
D
dapan1121 已提交
631
      return vnodeGetTableMeta(pVnode, pMsg, true);
D
dapan1121 已提交
632
    case TDMT_VND_TABLE_CFG:
D
dapan1121 已提交
633 634 635
      return vnodeGetTableCfg(pVnode, pMsg, true);
    case TDMT_VND_BATCH_META:
      return vnodeGetBatchMeta(pVnode, pMsg);
H
Hongze Cheng 已提交
636 637
      //    case TDMT_VND_TMQ_CONSUME:
      //      return tqProcessPollReq(pVnode->pTq, pMsg);
638 639
    case TDMT_VND_TMQ_VG_WALINFO:
      return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
640 641 642
    case TDMT_STREAM_TASK_RUN:
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_TASK_DISPATCH:
L
Liu Jicong 已提交
643
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
644 645
    case TDMT_STREAM_TASK_CHECK:
      return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
646
    case TDMT_STREAM_TASK_DISPATCH_RSP:
L
Liu Jicong 已提交
647
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
648 649
    case TDMT_STREAM_RETRIEVE:
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
650 651
    case TDMT_STREAM_RETRIEVE_RSP:
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
L
Liu Jicong 已提交
652
    case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
L
Liu Jicong 已提交
653 654 655 656 657
      return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
    case TDMT_STREAM_RECOVER_FINISH:
      return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
    case TDMT_STREAM_RECOVER_FINISH_RSP:
      return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
H
Hongze Cheng 已提交
658 659
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
660
      return TSDB_CODE_APP_ERROR;
H
Hongze Cheng 已提交
661 662 663 664
  }
}

void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
C
Cary Xu 已提交
665
  // blockDebugShowDataBlocks(data, __func__);
666
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
667 668
}

D
dapan1121 已提交
669
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
670 671 672
  if (NULL == pMetaRsp) {
    return;
  }
L
Liu Jicong 已提交
673

D
dapan1121 已提交
674 675 676 677 678 679
  strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
  pMetaRsp->dbId = pVnode->config.dbId;
  pMetaRsp->vgId = TD_VID(pVnode);
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}

H
Hongze Cheng 已提交
680 681
extern int32_t vnodeSyncRetention(SVnode *pVnode, int64_t now);

682
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
683
  int32_t     code = 0;
S
Shengliang Guan 已提交
684 685
  SVTrimDbReq trimReq = {0};

H
Hongze Cheng 已提交
686 687 688 689
  // decode
  if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
    code = TSDB_CODE_INVALID_MSG;
    goto _exit;
S
Shengliang Guan 已提交
690 691
  }

C
Cary Xu 已提交
692 693
  vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);

H
Hongze Cheng 已提交
694
  code = vnodeSyncRetention(pVnode, trimReq.timestamp);
C
Cary Xu 已提交
695

H
Hongze Cheng 已提交
696 697
_exit:
  return code;
S
Shengliang Guan 已提交
698 699
}

700
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
701 702 703
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
  if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
704 705 706 707 708 709
  SVDropTtlTableReq ttlReq = {0};
  if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto end;
  }

710 711
  vDebug("vgId:%d, drop ttl table req will be processed, time:%" PRId32, pVnode->config.vgId, ttlReq.timestampSec);
  int32_t ret = metaTtlDropTable(pVnode->pMeta, (int64_t)ttlReq.timestampSec * 1000, tbUids);
L
Liu Jicong 已提交
712
  if (ret != 0) {
713 714
    goto end;
  }
L
Liu Jicong 已提交
715
  if (taosArrayGetSize(tbUids) > 0) {
wmmhello's avatar
wmmhello 已提交
716 717
    tqUpdateTbUidList(pVnode->pTq, tbUids, false);
  }
718

H
Hongze Cheng 已提交
719
  vnodeSyncRetention(pVnode, ttlReq.timestampSec);
H
Hongze Cheng 已提交
720

721 722 723 724 725
end:
  taosArrayDestroy(tbUids);
  return ret;
}

726
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
727
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
728
  SDecoder       coder;
H
Hongze Cheng 已提交
729

H
Hongze Cheng 已提交
730 731 732 733 734 735
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
736
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
737 738

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
739 740
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
741 742
  }

743
  if (metaCreateSTable(pVnode->pMeta, ver, &req) < 0) {
H
Hongze Cheng 已提交
744 745
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
746 747
  }

748
  if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) {
749 750 751
    pRsp->code = terrno;
    goto _err;
  }
C
Cary Xu 已提交
752

H
Hongze Cheng 已提交
753
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
754
  return 0;
H
Hongze Cheng 已提交
755 756

_err:
H
Hongze Cheng 已提交
757
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
758
  return -1;
H
Hongze Cheng 已提交
759 760
}

761
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
762
  SDecoder           decoder = {0};
763
  SEncoder           encoder = {0};
764
  int32_t            rcode = 0;
H
Hongze Cheng 已提交
765 766
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
767 768 769
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
770
  STbUidStore       *pStore = NULL;
771
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
772 773

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
774 775 776
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
777

H
Hongze Cheng 已提交
778
  // decode
H
Hongze Cheng 已提交
779 780
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
781 782 783 784
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
785

H
Hongze Cheng 已提交
786
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
787 788
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
789 790 791 792 793
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
794
  // loop to create table
795
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
796
    pCreateReq = req.pReqs + iReq;
D
dapan1121 已提交
797
    memset(&cRsp, 0, sizeof(cRsp));
H
Hongze Cheng 已提交
798

C
Cary Xu 已提交
799 800 801 802
    if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
      rcode = -1;
      goto _exit;
    }
L
Liu Jicong 已提交
803

wafwerar's avatar
wafwerar 已提交
804 805 806 807 808
    if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
      rcode = -1;
      goto _exit;
    }

H
Hongze Cheng 已提交
809 810 811 812 813 814 815 816 817
    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
818
    if (metaCreateTable(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) {
H
Hongze Cheng 已提交
819 820 821 822 823
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
824
    } else {
H
Hongze Cheng 已提交
825
      cRsp.code = TSDB_CODE_SUCCESS;
826
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
827
      taosArrayPush(tbUids, &pCreateReq->uid);
L
Liu Jicong 已提交
828
      vnodeUpdateMetaRsp(pVnode, cRsp.pMeta);
H
Hongze Cheng 已提交
829
    }
H
Hongze Cheng 已提交
830 831

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
832 833
  }

H
Haojun Liao 已提交
834
  vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
835
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
836
  if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
C
Cary Xu 已提交
837 838
    goto _exit;
  }
839
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
840

H
Hongze Cheng 已提交
841
  // prepare rsp
842
  int32_t ret = 0;
wafwerar's avatar
wafwerar 已提交
843
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
844 845 846 847 848 849
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
850 851
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
H
Hongze Cheng 已提交
852

H
Hongze Cheng 已提交
853
_exit:
wmmhello's avatar
wmmhello 已提交
854 855
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
856
    taosMemoryFree(pCreateReq->comment);
wmmhello's avatar
wmmhello 已提交
857 858
    taosArrayDestroy(pCreateReq->ctb.tagName);
  }
859
  taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
860
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
861 862
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
863
  return rcode;
H
Hongze Cheng 已提交
864 865
}

866
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
867 868 869 870 871 872 873 874 875 876 877 878 879 880 881
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
882
  }
H
Hongze Cheng 已提交
883

884
  if (metaAlterSTable(pVnode->pMeta, ver, &req) < 0) {
H
Hongze Cheng 已提交
885 886 887 888 889 890 891
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
892 893 894
  return 0;
}

895
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
896
  SVDropStbReq req = {0};
897
  int32_t      rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
898
  SDecoder     decoder = {0};
899
  SArray      *tbUidList = NULL;
H
Hongze Cheng 已提交
900 901 902 903 904 905

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
906 907
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
908 909 910 911 912
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
913 914
  tbUidList = taosArrayInit(8, sizeof(int64_t));
  if (tbUidList == NULL) goto _exit;
915
  if (metaDropSTable(pVnode->pMeta, ver, &req, tbUidList) < 0) {
916 917 918 919 920
    rcode = terrno;
    goto _exit;
  }

  if (tqUpdateTbUidList(pVnode->pTq, tbUidList, false) < 0) {
921 922 923
    rcode = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
924

925 926 927 928 929
  if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) {
    rcode = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
930 931
  // return rsp
_exit:
932
  if (tbUidList) taosArrayDestroy(tbUidList);
H
Hongze Cheng 已提交
933
  pRsp->code = rcode;
H
Hongze Cheng 已提交
934
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
935 936 937
  return 0;
}

938
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
D
dapan1121 已提交
939 940 941
  SVAlterTbReq  vAlterTbReq = {0};
  SVAlterTbRsp  vAlterTbRsp = {0};
  SDecoder      dc = {0};
942 943
  int32_t       rcode = 0;
  int32_t       ret;
D
dapan1121 已提交
944 945
  SEncoder      ec = {0};
  STableMetaRsp vMetaRsp = {0};
H
Hongze Cheng 已提交
946 947 948 949 950 951 952 953 954 955

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
956
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
957
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
958 959
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
960 961 962
  }

  // process
963
  if (metaAlterTable(pVnode->pMeta, ver, &vAlterTbReq, &vMetaRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
964
    vAlterTbRsp.code = terrno;
H
Hongze Cheng 已提交
965
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
966 967
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
968 969
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
970

D
dapan1121 已提交
971 972 973 974 975
  if (NULL != vMetaRsp.pSchemas) {
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
    vAlterTbRsp.pMeta = &vMetaRsp;
  }

H
Hongze Cheng 已提交
976 977 978 979 980 981
_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
M
Minglei Jin 已提交
982 983 984
  if (vMetaRsp.pSchemas) {
    taosMemoryFree(vMetaRsp.pSchemas);
  }
H
Hongze Cheng 已提交
985 986 987
  return 0;
}

988
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
989 990
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
991
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
992
  SEncoder         encoder = {0};
993
  int32_t          ret;
994
  SArray          *tbUids = NULL;
995
  STbUidStore     *pStore = NULL;
H
Hongze Cheng 已提交
996

H
Hongze Cheng 已提交
997
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
998 999 1000
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
1001 1002

  // decode req
H
Hongze Cheng 已提交
1003 1004
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
1005 1006 1007 1008 1009
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
1010 1011

  // process req
1012
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
1013
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
1014 1015
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

1016
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
H
Hongze Cheng 已提交
1017 1018
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
1019
    tb_uid_t     tbUid = 0;
H
Hongze Cheng 已提交
1020

H
Hongze Cheng 已提交
1021
    /* code */
1022
    ret = metaDropTable(pVnode->pMeta, ver, pDropTbReq, tbUids, &tbUid);
H
Hongze Cheng 已提交
1023
    if (ret < 0) {
1024
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
H
Hongze Cheng 已提交
1025 1026 1027 1028
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
1029
    } else {
H
Hongze Cheng 已提交
1030
      dropTbRsp.code = TSDB_CODE_SUCCESS;
1031
      if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid);
H
Hongze Cheng 已提交
1032 1033 1034 1035 1036
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

1037
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);
1038
  tdUpdateTbUidList(pVnode->pSma, pStore, false);
1039

H
Hongze Cheng 已提交
1040
_exit:
1041
  taosArrayDestroy(tbUids);
1042
  tdUidStoreFree(pStore);
H
Hongze Cheng 已提交
1043
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
1044 1045 1046 1047 1048
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
1049
  taosArrayDestroy(rsp.pArray);
H
Hongze Cheng 已提交
1050 1051 1052
  return 0;
}

1053 1054
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
                                              const char *tags) {
D
dapan 已提交
1055 1056 1057 1058
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;
C
Cary Xu 已提交
1059
  int32_t        rv = -1;
D
dapan 已提交
1060 1061 1062

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
1063 1064 1065 1066 1067

  pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row), 1);  // TODO: use the real schema
  if (pSchema) {
    suid = msgIter->suid;
    rv = TD_ROW_SVER(blkIter.row);
D
dapan 已提交
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096
typedef struct SSubmitReqConvertCxt {
  SSubmitMsgIter msgIter;
  SSubmitBlk    *pBlock;
  SSubmitBlkIter blkIter;
  STSRow        *pRow;
  STSRowIter     rowIter;
  SSubmitTbData *pTbData;
  STSchema      *pTbSchema;
  SArray        *pColValues;
} SSubmitReqConvertCxt;

static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) {
  taosMemoryFreeClear(pCxt->pTbSchema);
1097 1098
  pCxt->pTbSchema = metaGetTbTSchema(pMeta, pCxt->msgIter.suid > 0 ? pCxt->msgIter.suid : pCxt->msgIter.uid,
                                     pCxt->msgIter.sversion, 1);
1099 1100 1101 1102 1103
  if (NULL == pCxt->pTbSchema) {
    return TSDB_CODE_INVALID_MSG;
  }
  tdSTSRowIterInit(&pCxt->rowIter, pCxt->pTbSchema);

1104
  tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135
  if (NULL == pCxt->pTbData) {
    pCxt->pTbData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
    if (NULL == pCxt->pTbData) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  pCxt->pTbData->flags = 0;
  pCxt->pTbData->suid = pCxt->msgIter.suid;
  pCxt->pTbData->uid = pCxt->msgIter.uid;
  pCxt->pTbData->sver = pCxt->msgIter.sversion;
  pCxt->pTbData->pCreateTbReq = NULL;
  pCxt->pTbData->aRowP = taosArrayInit(128, POINTER_BYTES);
  if (NULL == pCxt->pTbData->aRowP) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  taosArrayDestroy(pCxt->pColValues);
  pCxt->pColValues = taosArrayInit(pCxt->pTbSchema->numOfCols, sizeof(SColVal));
  if (NULL == pCxt->pColValues) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for (int32_t i = 0; i < pCxt->pTbSchema->numOfCols; ++i) {
    SColVal val = COL_VAL_NONE(pCxt->pTbSchema->columns[i].colId, pCxt->pTbSchema->columns[i].type);
    taosArrayPush(pCxt->pColValues, &val);
  }

  return TSDB_CODE_SUCCESS;
}

static void vnodeDestroySubmitReqConvertCxt(SSubmitReqConvertCxt *pCxt) {
  taosMemoryFreeClear(pCxt->pTbSchema);
1136
  tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153
  taosMemoryFreeClear(pCxt->pTbData);
  taosArrayDestroy(pCxt->pColValues);
}

static int32_t vnodeCellValConvertToColVal(STColumn *pCol, SCellVal *pCellVal, SColVal *pColVal) {
  if (tdValTypeIsNone(pCellVal->valType)) {
    pColVal->flag = CV_FLAG_NONE;
    return TSDB_CODE_SUCCESS;
  }

  if (tdValTypeIsNull(pCellVal->valType)) {
    pColVal->flag = CV_FLAG_NULL;
    return TSDB_CODE_SUCCESS;
  }

  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pColVal->value.nData = varDataLen(pCellVal->val);
1154
    pColVal->value.pData = (uint8_t *)varDataVal(pCellVal->val);
1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192
  } else if (TSDB_DATA_TYPE_FLOAT == pCol->type) {
    float f = GET_FLOAT_VAL(pCellVal->val);
    memcpy(&pColVal->value.val, &f, sizeof(f));
  } else if (TSDB_DATA_TYPE_DOUBLE == pCol->type) {
    pColVal->value.val = *(int64_t *)pCellVal->val;
  } else {
    GET_TYPED_DATA(pColVal->value.val, int64_t, pCol->type, pCellVal->val);
  }

  pColVal->flag = CV_FLAG_VALUE;
  return TSDB_CODE_SUCCESS;
}

static int32_t vnodeTSRowConvertToColValArray(SSubmitReqConvertCxt *pCxt) {
  int32_t code = TSDB_CODE_SUCCESS;
  tdSTSRowIterReset(&pCxt->rowIter, pCxt->pRow);
  for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < pCxt->pTbSchema->numOfCols; ++i) {
    STColumn *pCol = pCxt->pTbSchema->columns + i;
    SCellVal  cellVal = {0};
    if (!tdSTSRowIterFetch(&pCxt->rowIter, pCol->colId, pCol->type, &cellVal)) {
      break;
    }
    code = vnodeCellValConvertToColVal(pCol, &cellVal, (SColVal *)taosArrayGet(pCxt->pColValues, i));
  }
  return code;
}

static int32_t vnodeDecodeCreateTbReq(SSubmitReqConvertCxt *pCxt) {
  if (pCxt->msgIter.schemaLen <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  pCxt->pTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
  if (NULL == pCxt->pTbData->pCreateTbReq) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SDecoder decoder = {0};
1193
  tDecoderInit(&decoder, (uint8_t *)pCxt->pBlock->data, pCxt->msgIter.schemaLen);
1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245
  int32_t code = tDecodeSVCreateTbReq(&decoder, pCxt->pTbData->pCreateTbReq);
  tDecoderClear(&decoder);

  return code;
}

static int32_t vnodeSubmitReqConvertToSubmitReq2(SVnode *pVnode, SSubmitReq *pReq, SSubmitReq2 *pReq2) {
  pReq2->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
  if (NULL == pReq2->aSubmitTbData) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSubmitReqConvertCxt cxt = {0};

  int32_t code = tInitSubmitMsgIter(pReq, &cxt.msgIter);
  while (TSDB_CODE_SUCCESS == code) {
    code = tGetSubmitMsgNext(&cxt.msgIter, &cxt.pBlock);
    if (TSDB_CODE_SUCCESS == code) {
      if (NULL == cxt.pBlock) {
        break;
      }
      code = vnodeResetTableCxt(pVnode->pMeta, &cxt);
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = tInitSubmitBlkIter(&cxt.msgIter, cxt.pBlock, &cxt.blkIter);
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = vnodeDecodeCreateTbReq(&cxt);
    }
    while (TSDB_CODE_SUCCESS == code && (cxt.pRow = tGetSubmitBlkNext(&cxt.blkIter)) != NULL) {
      code = vnodeTSRowConvertToColValArray(&cxt);
      if (TSDB_CODE_SUCCESS == code) {
        SRow **pNewRow = taosArrayReserve(cxt.pTbData->aRowP, 1);
        code = tRowBuild(cxt.pColValues, cxt.pTbSchema, pNewRow);
      }
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = (NULL == taosArrayPush(pReq2->aSubmitTbData, cxt.pTbData) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);
    }
    if (TSDB_CODE_SUCCESS == code) {
      taosMemoryFreeClear(cxt.pTbData);
    }
  }

  vnodeDestroySubmitReqConvertCxt(&cxt);
  return code;
}

static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
  int32_t  code = TSDB_CODE_SUCCESS;
  char    *pMsg = NULL;
  uint32_t msglen = 0;
1246
  tEncodeSize(tEncodeSubmitReq, pSubmitReq, msglen, code);
1247 1248 1249 1250 1251 1252 1253 1254
  if (TSDB_CODE_SUCCESS == code) {
    pMsg = taosMemoryMalloc(msglen);
    if (NULL == pMsg) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    SEncoder encoder;
1255
    tEncoderInit(&encoder, (uint8_t *)pMsg, msglen);
1256
    code = tEncodeSubmitReq(&encoder, pSubmitReq);
1257 1258 1259 1260 1261 1262 1263 1264
    tEncoderClear(&encoder);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *ppMsg = pMsg;
  }
  return code;
}

1265
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
1266
  int32_t code = 0;
D
dapan1121 已提交
1267
  terrno = 0;
C
Cary Xu 已提交
1268

H
Hongze Cheng 已提交
1269
  SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
H
Hongze Cheng 已提交
1270
  SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
H
Hongze Cheng 已提交
1271
  SArray      *newTbUids = NULL;
H
Hongze Cheng 已提交
1272 1273 1274 1275
  int32_t      ret;
  SEncoder     ec = {0};

  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
1276

1277
  void           *pAllocMsg = NULL;
1278
  SSubmitReq2Msg *pMsg = (SSubmitReq2Msg *)pReq;
1279
  if (0 == pMsg->version) {
1280 1281 1282 1283
    code = vnodeSubmitReqConvertToSubmitReq2(pVnode, (SSubmitReq *)pMsg, pSubmitReq);
    if (TSDB_CODE_SUCCESS == code) {
      code = vnodeRebuildSubmitReqMsg(pSubmitReq, &pReq);
    }
1284 1285 1286
    if (TSDB_CODE_SUCCESS == code) {
      pAllocMsg = pReq;
    }
1287 1288 1289 1290 1291 1292 1293 1294 1295
    if (TSDB_CODE_SUCCESS != code) {
      goto _exit;
    }
  } else {
    // decode
    pReq = POINTER_SHIFT(pReq, sizeof(SSubmitReq2Msg));
    len -= sizeof(SSubmitReq2Msg);
    SDecoder dc = {0};
    tDecoderInit(&dc, pReq, len);
1296
    if (tDecodeSubmitReq(&dc, pSubmitReq) < 0) {
1297 1298 1299 1300
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }
    tDecoderClear(&dc);
D
dapan 已提交
1301
  }
C
Cary Xu 已提交
1302

H
Hongze Cheng 已提交
1303 1304 1305 1306 1307 1308 1309
  // scan
  TSKEY now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
  TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
  TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

H
Hongze Cheng 已提交
1310 1311 1312 1313 1314
    if (pSubmitTbData->pCreateTbReq && pSubmitTbData->pCreateTbReq->uid == 0) {
      code = TSDB_CODE_INVALID_MSG;
      goto _exit;
    }

H
Hongze Cheng 已提交
1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326
    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
      if (TARRAY_SIZE(pSubmitTbData->aCol) <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      SColData *pColData = (SColData *)taosArrayGet(pSubmitTbData->aCol, 0);
      TSKEY    *aKey = (TSKEY *)(pColData->pData);

      for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) {
        if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) {
          code = TSDB_CODE_INVALID_MSG;
1327
          vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
H
Hongze Cheng 已提交
1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338
          goto _exit;
        }
      }

    } else {
      int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
      SRow  **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);

      for (int32_t iRow = 0; iRow < nRow; ++iRow) {
        if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
          code = TSDB_CODE_INVALID_MSG;
1339
          vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
H
Hongze Cheng 已提交
1340 1341 1342 1343 1344 1345
          goto _exit;
        }
      }
    }
  }

H
Hongze Cheng 已提交
1346
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
H
Hongze Cheng 已提交
1347 1348 1349 1350 1351 1352 1353 1354 1355 1356
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

    if (pSubmitTbData->pCreateTbReq) {
      pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid;
    } else {
      SMetaInfo info = {0};

      code = metaGetInfo(pVnode->pMeta, pSubmitTbData->uid, &info, NULL);
      if (code) {
        code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
D
dapan1121 已提交
1357
        vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), pSubmitTbData->uid);
H
Hongze Cheng 已提交
1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374
        goto _exit;
      }

      if (info.suid != pSubmitTbData->suid) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      if (info.suid) {
        metaGetInfo(pVnode->pMeta, info.suid, &info, NULL);
      }

      if (pSubmitTbData->sver != info.skmVer) {
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
        goto _exit;
      }
    }
H
Hongze Cheng 已提交
1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390

    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
      int32_t   nColData = TARRAY_SIZE(pSubmitTbData->aCol);
      SColData *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);

      if (nColData <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

      if (aColData[0].cid != PRIMARYKEY_TIMESTAMP_COL_ID || aColData[0].type != TSDB_DATA_TYPE_TIMESTAMP ||
          aColData[0].nVal <= 0) {
        code = TSDB_CODE_INVALID_MSG;
        goto _exit;
      }

1391 1392
      for (int32_t j = 1; j < nColData; j++) {
        if (aColData[j].nVal != aColData[0].nVal) {
H
Hongze Cheng 已提交
1393 1394 1395 1396 1397
          code = TSDB_CODE_INVALID_MSG;
          goto _exit;
        }
      }
    }
H
Hongze Cheng 已提交
1398 1399
  }

L
Liu Jicong 已提交
1400 1401
  vDebug("vgId:%d, submit block size %d", TD_VID(pVnode), (int32_t)taosArrayGetSize(pSubmitReq->aSubmitTbData));

H
Hongze Cheng 已提交
1402
  // loop to handle
H
Hongze Cheng 已提交
1403
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
H
Hongze Cheng 已提交
1404 1405 1406 1407
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);

    // create table
    if (pSubmitTbData->pCreateTbReq) {
H
Hongze Cheng 已提交
1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418
      // check (TODO: move check to create table)
      code = grantCheck(TSDB_GRANT_TIMESERIES);
      if (code) goto _exit;

      code = grantCheck(TSDB_GRANT_TABLE);
      if (code) goto _exit;

      // alloc if need
      if (pSubmitRsp->aCreateTbRsp == NULL &&
          (pSubmitRsp->aCreateTbRsp = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(SVCreateTbRsp))) ==
              NULL) {
H
Hongze Cheng 已提交
1419
        code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1420 1421 1422 1423 1424 1425
        goto _exit;
      }

      SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);

      // create table
1426
      if (metaCreateTable(pVnode->pMeta, ver, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) {
1427
        // create table success
H
Hongze Cheng 已提交
1428 1429 1430

        if (newTbUids == NULL &&
            (newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) {
H
Hongze Cheng 已提交
1431
          code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1432 1433 1434 1435 1436 1437
          goto _exit;
        }

        taosArrayPush(newTbUids, &pSubmitTbData->uid);

        if (pCreateTbRsp->pMeta) {
D
dapan1121 已提交
1438
          vnodeUpdateMetaRsp(pVnode, pCreateTbRsp->pMeta);
H
Hongze Cheng 已提交
1439 1440 1441 1442 1443 1444
        }
      } else {  // create table failed
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
          code = terrno;
          goto _exit;
        }
X
Xiaoyu Wang 已提交
1445
        terrno = 0;
1446
        pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid;  // update uid if table exist for using below
H
Hongze Cheng 已提交
1447
      }
H
Hongze Cheng 已提交
1448 1449 1450
    }

    // insert data
H
Hongze Cheng 已提交
1451
    int32_t affectedRows;
1452
    code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows);
H
Hongze Cheng 已提交
1453 1454
    if (code) goto _exit;

1455 1456 1457
    code = metaUpdateChangeTime(pVnode->pMeta, pSubmitTbData->uid, pSubmitTbData->ctimeMs);
    if (code) goto _exit;

H
Hongze Cheng 已提交
1458
    pSubmitRsp->affectedRows += affectedRows;
H
Hongze Cheng 已提交
1459
  }
H
Hongze Cheng 已提交
1460

1461
  // update the affected table uid list
H
Hongze Cheng 已提交
1462 1463 1464 1465
  if (taosArrayGetSize(newTbUids) > 0) {
    vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
           (int32_t)taosArrayGetSize(newTbUids));
    tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
H
Hongze Cheng 已提交
1466
  }
H
Hongze Cheng 已提交
1467 1468 1469 1470 1471 1472 1473 1474 1475 1476

_exit:
  // message
  pRsp->code = code;
  tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSSubmitRsp2(&ec, pSubmitRsp);
  tEncoderClear(&ec);

H
Hongze Cheng 已提交
1477 1478 1479 1480 1481 1482
  // update statistics
  atomic_add_fetch_64(&pVnode->statis.nInsert, pSubmitRsp->affectedRows);
  atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
  atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
  if (code == 0) {
    atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
1483
    tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
H
Hongze Cheng 已提交
1484 1485
  }

H
Hongze Cheng 已提交
1486 1487
  // clear
  taosArrayDestroy(newTbUids);
1488
  tDestroySubmitReq(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE);
H
Hongze Cheng 已提交
1489
  tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
H
Hongze Cheng 已提交
1490

D
dapan1121 已提交
1491 1492
  if (code) terrno = code;

1493
  taosMemoryFree(pAllocMsg);
1494

H
Hongze Cheng 已提交
1495
  return code;
L
Liu Jicong 已提交
1496
}
1497

1498
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1499
  SVCreateTSmaReq req = {0};
C
Cary Xu 已提交
1500
  SDecoder        coder = {0};
1501

C
Cary Xu 已提交
1502 1503 1504 1505 1506 1507
  if (pRsp) {
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
    pRsp->code = TSDB_CODE_SUCCESS;
    pRsp->pCont = NULL;
    pRsp->contLen = 0;
  }
1508 1509 1510 1511 1512

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
C
Cary Xu 已提交
1513 1514
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
    if (pRsp) pRsp->code = terrno;
1515
    goto _err;
1516
  }
C
Cary Xu 已提交
1517

1518
  if (tdProcessTSmaCreate(pVnode->pSma, ver, (const char *)&req) < 0) {
C
Cary Xu 已提交
1519
    if (pRsp) pRsp->code = terrno;
1520
    goto _err;
1521
  }
C
Cary Xu 已提交
1522

1523
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
1524
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
1525
         req.indexName, req.indexUid, ver, req.tableUid);
H
Hongze Cheng 已提交
1526
  return 0;
1527 1528 1529

_err:
  tDecoderClear(&coder);
S
Shengliang Guan 已提交
1530
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
1531
         TD_VID(pVnode), req.indexName, req.indexUid, ver, req.tableUid, terrstr());
1532
  return -1;
L
Liu Jicong 已提交
1533
}
C
Cary Xu 已提交
1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545

/**
 * @brief specific for smaDstVnode
 *
 * @param pVnode
 * @param pCont
 * @param contLen
 * @return int32_t
 */
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
1546

1547
static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) {
B
Benguang Zhao 已提交
1548 1549 1550
  int32_t code = TSDB_CODE_SUCCESS;

  vInfo("vgId:%d, trim meta of tables per hash range [%" PRIu32 ", %" PRIu32 "]. apply-index:%" PRId64, TD_VID(pVnode),
1551
        pVnode->config.hashBegin, pVnode->config.hashEnd, ver);
B
Benguang Zhao 已提交
1552 1553

  // TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
M
Minglei Jin 已提交
1554
  code = metaTrimTables(pVnode->pMeta);
B
Benguang Zhao 已提交
1555 1556 1557 1558

  return code;
}

1559
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
B
Benguang Zhao 已提交
1560 1561 1562 1563 1564 1565
  vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confim msg is processed", TD_VID(pVnode));
  int32_t code = TSDB_CODE_SUCCESS;
  if (!pVnode->config.hashChange) {
    goto _exit;
  }

1566
  code = vnodeConsolidateAlterHashRange(pVnode, ver);
1567
  if (code < 0) {
1568
    vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(), ver);
1569 1570
    goto _exit;
  }
B
Benguang Zhao 已提交
1571 1572 1573
  pVnode->config.hashChange = false;

_exit:
1574
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
B
Benguang Zhao 已提交
1575
  pRsp->code = code;
1576 1577 1578
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

1579
  return code;
1580
}
S
Shengliang Guan 已提交
1581

1582
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
1583 1584
  bool walChanged = false;
  bool tsdbChanged = false;
1585

S
Shengliang Guan 已提交
1586 1587
  SAlterVnodeConfigReq req = {0};
  if (tDeserializeSAlterVnodeConfigReq(pReq, len, &req) != 0) {
1588 1589 1590 1591
    terrno = TSDB_CODE_INVALID_MSG;
    return TSDB_CODE_INVALID_MSG;
  }

1592
  vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
1593 1594
        " cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d walRetentionPeriod:%d "
        "walRetentionSize:%d",
1595 1596
        TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
        req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
1597
        req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize);
1598 1599 1600

  if (pVnode->config.cacheLastSize != req.cacheLastSize) {
    pVnode->config.cacheLastSize = req.cacheLastSize;
1601 1602
    tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
  }
1603

1604
  if (pVnode->config.szBuf != req.buffer * 1024LL * 1024LL) {
S
Shengliang Guan 已提交
1605
    vInfo("vgId:%d, vnode buffer is changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pVnode->config.szBuf,
1606
          (uint64_t)(req.buffer * 1024LL * 1024LL));
1607
    pVnode->config.szBuf = req.buffer * 1024LL * 1024LL;
H
Hongze Cheng 已提交
1608 1609
  }

1610 1611
  if (pVnode->config.szCache != req.pages) {
    if (metaAlterCache(pVnode->pMeta, req.pages) < 0) {
S
Shengliang Guan 已提交
1612
      vError("vgId:%d, failed to change vnode pages from %d to %d failed since %s", TD_VID(pVnode),
1613
             pVnode->config.szCache, req.pages, tstrerror(errno));
H
Hongze Cheng 已提交
1614 1615
      return errno;
    } else {
S
Shengliang Guan 已提交
1616
      vInfo("vgId:%d, vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, req.pages);
1617
      pVnode->config.szCache = req.pages;
H
Hongze Cheng 已提交
1618
    }
H
Hongze Cheng 已提交
1619 1620
  }

1621 1622
  if (pVnode->config.cacheLast != req.cacheLast) {
    pVnode->config.cacheLast = req.cacheLast;
1623 1624
  }

1625 1626
  if (pVnode->config.walCfg.fsyncPeriod != req.walFsyncPeriod) {
    pVnode->config.walCfg.fsyncPeriod = req.walFsyncPeriod;
1627 1628 1629
    walChanged = true;
  }

1630 1631
  if (pVnode->config.walCfg.level != req.walLevel) {
    pVnode->config.walCfg.level = req.walLevel;
1632 1633 1634 1635 1636 1637 1638
    walChanged = true;
  }

  if (pVnode->config.walCfg.retentionPeriod != req.walRetentionPeriod) {
    pVnode->config.walCfg.retentionPeriod = req.walRetentionPeriod;
    walChanged = true;
  }
1639

1640 1641
  if (pVnode->config.walCfg.retentionSize != req.walRetentionSize) {
    pVnode->config.walCfg.retentionSize = req.walRetentionSize;
1642 1643 1644
    walChanged = true;
  }

1645 1646
  if (pVnode->config.tsdbCfg.keep0 != req.daysToKeep0) {
    pVnode->config.tsdbCfg.keep0 = req.daysToKeep0;
1647
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1648
      tsdbChanged = true;
1649 1650 1651
    }
  }

1652 1653
  if (pVnode->config.tsdbCfg.keep1 != req.daysToKeep1) {
    pVnode->config.tsdbCfg.keep1 = req.daysToKeep1;
1654
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1655
      tsdbChanged = true;
1656 1657 1658
    }
  }

1659 1660
  if (pVnode->config.tsdbCfg.keep2 != req.daysToKeep2) {
    pVnode->config.tsdbCfg.keep2 = req.daysToKeep2;
1661
    if (!VND_IS_RSMA(pVnode)) {
M
Minglei Jin 已提交
1662
      tsdbChanged = true;
1663 1664 1665
    }
  }

1666 1667 1668 1669 1670 1671 1672 1673
  if (req.sttTrigger != -1 && req.sttTrigger != pVnode->config.sttTrigger) {
    pVnode->config.sttTrigger = req.sttTrigger;
  }

  if (req.minRows != -1 && req.minRows != pVnode->config.tsdbCfg.minRows) {
    pVnode->config.tsdbCfg.minRows = req.minRows;
  }

1674
  if (walChanged) {
M
Minglei Jin 已提交
1675 1676 1677 1678 1679
    walAlter(pVnode->pWal, &pVnode->config.walCfg);
  }

  if (tsdbChanged) {
    tsdbSetKeepCfg(pVnode->pTsdb, &pVnode->config.tsdbCfg);
1680 1681
  }

1682 1683 1684
  return 0;
}

1685
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1686 1687 1688 1689 1690
  SBatchDeleteReq deleteReq;
  SDecoder        decoder;
  tDecoderInit(&decoder, pReq, len);
  tDecodeSBatchDeleteReq(&decoder, &deleteReq);

1691
  SMetaReader mr = {0};
1692
  metaReaderInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
1693

1694 1695 1696
  int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
  for (int32_t i = 0; i < sz; i++) {
    SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i);
1697 1698
    char             *name = pOneReq->tbname;
    if (metaGetTableEntryByName(&mr, name) < 0) {
S
Shengliang Guan 已提交
1699
      vDebug("vgId:%d, stream delete msg, skip since no table: %s", pVnode->config.vgId, name);
1700 1701 1702 1703 1704
      continue;
    }

    int64_t uid = mr.me.uid;

1705
    int32_t code = tsdbDeleteTableData(pVnode->pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
1706 1707 1708
    if (code < 0) {
      terrno = code;
      vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
L
Liu Jicong 已提交
1709
             TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
1710
    }
1711

1712 1713 1714 1715 1716 1717 1718 1719
    code = metaUpdateChangeTime(pVnode->pMeta, uid, deleteReq.ctimeMs);
    if (code < 0) {
      terrno = code;
      vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64
             ", end ts:%" PRId64,
             TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
    }

1720
    tDecoderClear(&mr.coder);
1721
  }
1722
  metaReaderClear(&mr);
1723
  taosArrayDestroy(deleteReq.deleteReqs);
1724 1725 1726
  return 0;
}

1727
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
1728 1729 1730 1731
  int32_t     code = 0;
  SDecoder   *pCoder = &(SDecoder){0};
  SDeleteRes *pRes = &(SDeleteRes){0};

wmmhello's avatar
wmmhello 已提交
1732 1733 1734 1735 1736
  pRsp->msgType = TDMT_VND_DELETE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

H
Hongze Cheng 已提交
1737 1738 1739 1740 1741 1742 1743 1744
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
1745
  ASSERT(taosArrayGetSize(pRes->uidList) == 0 || (pRes->skey != 0 && pRes->ekey != 0));
H
Hongze Cheng 已提交
1746 1747

  for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
1748 1749 1750 1751
    uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid);
    code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey);
    if (code) goto _err;
    code = metaUpdateChangeTime(pVnode->pMeta, uid, pRes->ctimeMs);
H
Hongze Cheng 已提交
1752 1753 1754 1755 1756
    if (code) goto _err;
  }

  tDecoderClear(pCoder);
  taosArrayDestroy(pRes->uidList);
wmmhello's avatar
wmmhello 已提交
1757 1758

  SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows};
L
Liu Jicong 已提交
1759
  int32_t     ret = 0;
wmmhello's avatar
wmmhello 已提交
1760 1761
  tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
L
Liu Jicong 已提交
1762
  SEncoder ec = {0};
wmmhello's avatar
wmmhello 已提交
1763 1764 1765
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVDeleteRsp(&ec, &rsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
1766 1767 1768 1769
  return code;

_err:
  return code;
M
Minglei Jin 已提交
1770
}
1771
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
dengyihao's avatar
dengyihao 已提交
1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_CREATE_INDEX_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);
  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }
1787
  if (metaAddIndexToSTable(pVnode->pMeta, ver, &req) < 0) {
dengyihao's avatar
dengyihao 已提交
1788 1789 1790 1791 1792 1793 1794 1795
    pRsp->code = terrno;
    goto _err;
  }
  tDecoderClear(&dc);
  return 0;
_err:
  tDecoderClear(&dc);
  return -1;
dengyihao's avatar
dengyihao 已提交
1796
}
1797
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
dengyihao's avatar
dengyihao 已提交
1798
  SDropIndexReq req = {0};
dengyihao's avatar
dengyihao 已提交
1799
  pRsp->msgType = TDMT_VND_DROP_INDEX_RSP;
dengyihao's avatar
dengyihao 已提交
1800 1801 1802 1803 1804 1805 1806 1807
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  if (tDeserializeSDropIdxReq(pReq, len, &req)) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1808

1809
  if (metaDropIndexFromSTable(pVnode->pMeta, ver, &req) < 0) {
dengyihao's avatar
dengyihao 已提交
1810 1811 1812
    pRsp->code = terrno;
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1813 1814
  return TSDB_CODE_SUCCESS;
}
1815

1816
extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
1817

1818 1819
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
  return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp);
H
Hongze Cheng 已提交
1820
}
1821

H
Hongze Cheng 已提交
1822
#ifndef TD_ENTERPRISE
1823
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1824
  return 0;
dengyihao's avatar
dengyihao 已提交
1825
}
H
Hongze Cheng 已提交
1826
#endif