vnodeSvr.c 23.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
19
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
20
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
21
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
22
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
23
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
24
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
25
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
26

H
Hongze Cheng 已提交
27
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
H
Hongze Cheng 已提交
28
#if 0
S
Shengliang Guan 已提交
29
  SRpcMsg *pMsg;
H
Hongze Cheng 已提交
30 31
  SRpcMsg  *pRpc;

H
Hongze Cheng 已提交
32
  *version = pVnode->state.processed;
H
Hongze Cheng 已提交
33
  for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
S
Shengliang Guan 已提交
34 35
    pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
    pRpc = pMsg;
H
Hongze Cheng 已提交
36 37

    // set request version
H
Hongze Cheng 已提交
38
    if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
H
refact  
Hongze Cheng 已提交
39
      vError("vnode:%d  write wal error since %s", TD_VID(pVnode), terrstr());
H
Hongze Cheng 已提交
40
      return -1;
H
Hongze Cheng 已提交
41 42 43 44
    }
  }

  walFsync(pVnode->pWal, false);
H
Hongze Cheng 已提交
45

H
Hongze Cheng 已提交
46
#endif
H
Hongze Cheng 已提交
47
  return 0;
H
Hongze Cheng 已提交
48 49
}

H
Hongze Cheng 已提交
50
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
51
  void *ptr = NULL;
H
Hongze Cheng 已提交
52 53
  void *pReq;
  int   len;
H
Hongze Cheng 已提交
54
  int   ret;
H
Hongze Cheng 已提交
55

H
Hongze Cheng 已提交
56
  vTrace("vgId:%d start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
57
         version);
H
Hongze Cheng 已提交
58

H
Hongze Cheng 已提交
59 60
  pVnode->state.applied = version;

H
Hongze Cheng 已提交
61 62 63
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
64

H
Hongze Cheng 已提交
65
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
H
Hongze Cheng 已提交
66
    vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
67
    return -1;
H
Hongze Cheng 已提交
68 69 70
  }

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
71
    /* META */
H
Hongze Cheng 已提交
72
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
73
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
74
      break;
H
Hongze Cheng 已提交
75
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
76
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
77
      break;
H
Hongze Cheng 已提交
78
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
79
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
80
      break;
H
Hongze Cheng 已提交
81
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
82
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
83 84
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
85
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
86
      break;
H
Hongze Cheng 已提交
87
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
88
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
89
      break;
90 91
    case TDMT_VND_CREATE_SMA: {
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
92 93
    } break;
    /* TSDB */
H
Hongze Cheng 已提交
94
    case TDMT_VND_SUBMIT:
H
Hongze Cheng 已提交
95
      if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
96
      break;
H
Hongze Cheng 已提交
97
    /* TQ */
L
Liu Jicong 已提交
98 99 100 101 102 103
    case TDMT_VND_MQ_VG_CHANGE:
      if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                               pMsg->contLen - sizeof(SMsgHead)) < 0) {
        // TODO: handle error
      }
      break;
L
Liu Jicong 已提交
104 105 106 107 108
    case TDMT_VND_MQ_VG_DELETE:
      if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
        // TODO: handle error
      }
      break;
H
Hongze Cheng 已提交
109 110 111 112 113 114 115 116 117 118
    case TDMT_VND_TASK_DEPLOY: {
      if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                              pMsg->contLen - sizeof(SMsgHead)) < 0) {
      }
    } break;
    case TDMT_VND_TASK_WRITE_EXEC: {
      if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
                            0) < 0) {
      }
    } break;
S
Shengliang Guan 已提交
119 120
    case TDMT_VND_ALTER_VNODE:
      break;
H
Hongze Cheng 已提交
121 122 123 124 125
    default:
      ASSERT(0);
      break;
  }

H
Hongze Cheng 已提交
126
  vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
H
Hongze Cheng 已提交
127

H
Hongze Cheng 已提交
128
  // commit if need
H
Hongze Cheng 已提交
129
  if (vnodeShouldCommit(pVnode)) {
H
Hongze Cheng 已提交
130
    vInfo("vgId:%d commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
131 132 133 134 135
    // commit current change
    vnodeCommit(pVnode);

    // start a new one
    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
136 137 138
  }

  return 0;
H
Hongze Cheng 已提交
139 140

_err:
H
Hongze Cheng 已提交
141
  vDebug("vgId:%d process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
142 143
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
144 145 146
}

int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
147
  vTrace("message in vnode query queue is processing");
148
#if 0
149
  SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
150
#endif
151
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
  switch (pMsg->msgType) {
    case TDMT_VND_QUERY:
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
    case TDMT_VND_QUERY_CONTINUE:
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg);
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
  vTrace("message in fetch queue is processing");
  char   *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
  switch (pMsg->msgType) {
    case TDMT_VND_FETCH:
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_FETCH_RSP:
      return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_RES_READY:
      return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_TASKS_STATUS:
      return qWorkerProcessStatusMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_CANCEL_TASK:
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_DROP_TASK:
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_TABLE_META:
      return vnodeGetTableMeta(pVnode, pMsg);
    case TDMT_VND_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
    case TDMT_VND_TASK_PIPE_EXEC:
    case TDMT_VND_TASK_MERGE_EXEC:
      return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
    case TDMT_VND_STREAM_TRIGGER:
      return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
    case TDMT_VND_QUERY_HEARTBEAT:
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO

  // blockDebugShowData(data);
202
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
203 204 205
}

int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
206
  int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
H
Hongze Cheng 已提交
207

208 209 210
  if (syncEnvIsStart()) {
    SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
    assert(pSyncNode != NULL);
M
Minghao Li 已提交
211

212 213
    ESyncState state = syncGetMyRole(pVnode->sync);
    SyncTerm   currentTerm = syncGetMyTerm(pVnode->sync);
M
Minghao Li 已提交
214

215
    SMsgHead *pHead = pMsg->pCont;
M
Minghao Li 已提交
216

217 218 219 220 221
    char  logBuf[512];
    char *syncNodeStr = sync2SimpleStr(pVnode->sync);
    snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
    syncRpcMsgLog2(logBuf, pMsg);
    taosMemoryFree(syncNodeStr);
M
Minghao Li 已提交
222

223
    SRpcMsg *pRpcMsg = pMsg;
M
Minghao Li 已提交
224

225 226 227
    if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
228

229
      ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
230
      syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
231

232 233 234
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
235

236
      ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
237
      syncPingDestroy(pSyncMsg);
M
Minghao Li 已提交
238

239 240 241
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
242

243
      ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
244
      syncPingReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
245

246 247 248
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
249

250
      ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
251
      syncClientRequestDestroy(pSyncMsg);
M
Minghao Li 已提交
252

253 254 255
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
256

257
      ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
258
      syncRequestVoteDestroy(pSyncMsg);
M
Minghao Li 已提交
259

260 261 262
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
263

264
      ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
265
      syncRequestVoteReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
266

267 268 269
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
270

271
      ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
272
      syncAppendEntriesDestroy(pSyncMsg);
M
Minghao Li 已提交
273

274 275 276 277
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

278
      ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
279
      syncAppendEntriesReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
280

281 282
    } else {
      vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
283
      ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
284
    }
M
Minghao Li 已提交
285

286 287 288
    syncNodeRelease(pSyncNode);
  } else {
    vError("==vnodeProcessSyncReq== error syncEnv stop");
289
    ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
290
  }
291 292

  return ret;
H
Hongze Cheng 已提交
293 294
}

H
Hongze Cheng 已提交
295
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
296
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
297
  SDecoder       coder;
H
Hongze Cheng 已提交
298

H
Hongze Cheng 已提交
299 300 301 302 303 304
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
305
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
306 307

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
308 309
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
310 311
  }

H
Hongze Cheng 已提交
312
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
313 314
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
315 316
  }

317
  tdProcessRSmaCreate(pVnode->pSma, pVnode->pMeta, &req, &pVnode->msgCb);
C
Cary Xu 已提交
318

H
Hongze Cheng 已提交
319
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
320
  return 0;
H
Hongze Cheng 已提交
321 322

_err:
H
Hongze Cheng 已提交
323
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
324
  return -1;
H
Hongze Cheng 已提交
325 326
}

H
Hongze Cheng 已提交
327
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
328
  SDecoder           decoder = {0};
H
Hongze Cheng 已提交
329 330 331
  int                rcode = 0;
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
332 333 334
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
335
  STbUidStore       *pStore = NULL;
336
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
337 338

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
339 340 341
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
342

H
Hongze Cheng 已提交
343
  // decode
H
Hongze Cheng 已提交
344 345
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
346 347 348 349
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
350

H
Hongze Cheng 已提交
351
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
352 353
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
354 355 356 357 358
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
359 360 361
  // loop to create table
  for (int iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
H
Hongze Cheng 已提交
362 363 364 365 366 367 368 369 370 371

    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
H
Hongze Cheng 已提交
372
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
H
Hongze Cheng 已提交
373 374 375 376 377
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
378
    } else {
H
Hongze Cheng 已提交
379
      cRsp.code = TSDB_CODE_SUCCESS;
380
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
381
      taosArrayPush(tbUids, &pCreateReq->uid);
H
Hongze Cheng 已提交
382
    }
H
Hongze Cheng 已提交
383 384

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
385 386
  }

H
Hongze Cheng 已提交
387
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
388

389
  tqUpdateTbUidList(pVnode->pTq, tbUids);
390 391
  tdUpdateTbUidList(pVnode->pSma, pStore);
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
392

H
Hongze Cheng 已提交
393
  // prepare rsp
H
Hongze Cheng 已提交
394 395
  SEncoder encoder = {0};
  int32_t  ret = 0;
wafwerar's avatar
wafwerar 已提交
396
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
397 398 399 400 401 402
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
403 404 405
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
406

H
Hongze Cheng 已提交
407
_exit:
H
Hongze Cheng 已提交
408
  taosArrayDestroy(rsp.pArray);
409
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
410 411
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
412
  return rcode;
H
Hongze Cheng 已提交
413 414
}

H
Hongze Cheng 已提交
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
431
  }
H
Hongze Cheng 已提交
432 433 434 435 436 437 438 439 440

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
441 442 443
  return 0;
}

H
Hongze Cheng 已提交
444
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
445 446
  SVDropStbReq req = {0};
  int          rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
447
  SDecoder     decoder = {0};
H
Hongze Cheng 已提交
448 449 450 451 452 453

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
454 455
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
456 457 458 459 460
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
H
Hongze Cheng 已提交
461 462 463 464
  // if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
  //   rcode = terrno;
  //   goto _exit;
  // }
H
Hongze Cheng 已提交
465 466 467 468

  // return rsp
_exit:
  pRsp->code = rcode;
H
Hongze Cheng 已提交
469
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
470 471 472
  return 0;
}

H
Hongze Cheng 已提交
473 474
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SVAlterTbReq vAlterTbReq = {0};
H
Hongze Cheng 已提交
475
  SVAlterTbRsp vAlterTbRsp = {0};
H
Hongze Cheng 已提交
476
  SDecoder     dc = {0};
H
Hongze Cheng 已提交
477 478 479
  int          rcode = 0;
  int          ret;
  SEncoder     ec = {0};
H
Hongze Cheng 已提交
480 481 482 483 484 485 486 487 488 489

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
490
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
491
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
492 493
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
494 495 496 497
  }

  // process
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
498
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
499
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
500 501
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
502 503
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
504 505 506 507 508 509 510

_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
511 512 513
  return 0;
}

H
Hongze Cheng 已提交
514
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
515 516
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
517
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
518
  SEncoder         encoder = {0};
H
Hongze Cheng 已提交
519 520
  int              ret;

H
Hongze Cheng 已提交
521
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
522 523 524
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
525 526

  // decode req
H
Hongze Cheng 已提交
527 528
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
529 530 531 532 533
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
534 535

  // process req
H
Hongze Cheng 已提交
536
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
H
Hongze Cheng 已提交
537 538 539
  for (int iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
H
Hongze Cheng 已提交
540

H
Hongze Cheng 已提交
541 542 543
    /* code */
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq);
    if (ret < 0) {
H
Hongze Cheng 已提交
544 545 546 547 548
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
549
    } else {
H
Hongze Cheng 已提交
550
      dropTbRsp.code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
551 552 553 554 555 556
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

_exit:
H
Hongze Cheng 已提交
557
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
558 559 560 561 562
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
563 564 565
  return 0;
}

H
Hongze Cheng 已提交
566
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
D
dapan 已提交
567 568 569 570 571 572 573 574 575 576 577
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
  if (!pSchema || (suid != msgIter->suid)) {
    if (pSchema) {
      taosMemoryFreeClear(pSchema);
    }
H
Hongze Cheng 已提交
578
    pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 1);  // TODO: use the real schema
D
dapan 已提交
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597
    if (pSchema) {
      suid = msgIter->suid;
    }
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
598
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
C
Cary Xu 已提交
599 600 601 602 603 604 605 606 607
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SMeta         *pMeta = pVnode->pMeta;
  SSubmitBlk    *pBlock = NULL;

  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
H
Hongze Cheng 已提交
608

D
dapan 已提交
609 610
    vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
  }
C
Cary Xu 已提交
611 612 613 614

  return 0;
}

H
Hongze Cheng 已提交
615
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
616
  SSubmitReq    *pSubmitReq = (SSubmitReq *)pReq;
H
Hongze Cheng 已提交
617
  SSubmitRsp     submitRsp = {0};
H
Hongze Cheng 已提交
618 619 620 621
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock;
  SSubmitRsp     rsp = {0};
  SVCreateTbReq  createTbReq = {0};
H
Hongze Cheng 已提交
622
  SDecoder       decoder = {0};
H
Hongze Cheng 已提交
623
  int32_t        nRows;
H
Hongze Cheng 已提交
624 625
  int32_t        tsize, ret;
  SEncoder       encoder = {0};
626
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
627 628

  pRsp->code = 0;
C
Cary Xu 已提交
629

C
Cary Xu 已提交
630 631 632 633
#ifdef TD_DEBUG_PRINT_ROW
  vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif

C
Cary Xu 已提交
634 635 636 637 638
  if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
    pRsp->code = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
639
  // handle the request
H
Hongze Cheng 已提交
640 641 642
  if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    goto _exit;
H
Hongze Cheng 已提交
643 644
  }

H
Hongze Cheng 已提交
645
  submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp));
646 647 648 649 650
  if (!submitRsp.pArray) {
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
651
  for (int i = 0;;) {
H
Hongze Cheng 已提交
652 653 654
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
655 656
    SSubmitBlkRsp submitBlkRsp = {0};

H
Hongze Cheng 已提交
657 658
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
659 660
      submitBlkRsp.hashMeta = 1;

H
Hongze Cheng 已提交
661 662
      tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
H
Hongze Cheng 已提交
663
        pRsp->code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
664
        tDecoderClear(&decoder);
H
Hongze Cheng 已提交
665 666 667 668 669
        goto _exit;
      }

      if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
H
Hongze Cheng 已提交
670
          submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
671
          tDecoderClear(&decoder);
H
Hongze Cheng 已提交
672 673 674 675
          goto _exit;
        }
      }

H
Hongze Cheng 已提交
676
      submitBlkRsp.uid = createTbReq.uid;
D
dapan 已提交
677 678
      submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
      sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name);
H
Hongze Cheng 已提交
679

H
Hongze Cheng 已提交
680 681 682 683 684 685 686
      msgIter.uid = createTbReq.uid;
      if (createTbReq.type == TSDB_CHILD_TABLE) {
        msgIter.suid = createTbReq.ctb.suid;
      } else {
        msgIter.suid = 0;
      }

D
dapan 已提交
687
      vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
H
Hongze Cheng 已提交
688
      tDecoderClear(&decoder);
D
dapan1121 已提交
689 690 691
    } else {
      submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
692 693
    }

H
Hongze Cheng 已提交
694
    if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
695
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
696 697
    }

H
Hongze Cheng 已提交
698 699 700
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
    taosArrayPush(submitRsp.pArray, &submitBlkRsp);
H
Hongze Cheng 已提交
701
  }
702

H
Hongze Cheng 已提交
703
_exit:
H
Hongze Cheng 已提交
704 705 706 707 708 709 710 711
  tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  pRsp->pCont = rpcMallocCont(tsize);
  pRsp->contLen = tsize;
  tEncoderInit(&encoder, pRsp->pCont, tsize);
  tEncodeSSubmitRsp(&encoder, &submitRsp);
  tEncoderClear(&encoder);

  for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
D
dapan 已提交
712
    taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
H
Hongze Cheng 已提交
713 714 715
  }

  taosArrayDestroy(submitRsp.pArray);
H
Hongze Cheng 已提交
716

717 718 719 720
  // TODO: the partial success scenario and the error case
  // TODO: refactor
  if ((terrno == TSDB_CODE_SUCCESS || terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) &&
      (pRsp->code == TSDB_CODE_SUCCESS)) {
721
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
722
  }
C
Cary Xu 已提交
723

H
Hongze Cheng 已提交
724
  return 0;
L
Liu Jicong 已提交
725
}
726 727 728

static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
  SVCreateTSmaReq req = {0};
H
Hongze Cheng 已提交
729
  SDecoder        coder;
730 731 732 733 734 735 736 737 738 739 740 741

  pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
    pRsp->code = terrno;
    goto _err;
742
  }
C
Cary Xu 已提交
743

C
Cary Xu 已提交
744 745 746 747
  // record current timezone of server side
  req.timezoneInt = tsTimezone;

  if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
748 749
    pRsp->code = terrno;
    goto _err;
750
  }
C
Cary Xu 已提交
751

752
  tDecoderClear(&coder);
C
Cary Xu 已提交
753 754
  vDebug("vgId:%d success to create tsma %s:%" PRIi64 " for table %" PRIi64, TD_VID(pVnode), req.indexName,
         req.indexUid, req.tableUid);
H
Hongze Cheng 已提交
755
  return 0;
756 757 758

_err:
  tDecoderClear(&coder);
C
Cary Xu 已提交
759 760
  vError("vgId:%d failed to create tsma %s:%" PRIi64 " for table %" PRIi64 " since %s", TD_VID(pVnode), req.indexName,
         req.indexUid, req.tableUid, terrstr(terrno));
761
  return -1;
L
Liu Jicong 已提交
762
}