vnodeSvr.c 24.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
19
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
20
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
21
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
22
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
23
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
24
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
25
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
26

H
Hongze Cheng 已提交
27
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
H
Hongze Cheng 已提交
28
#if 0
S
Shengliang Guan 已提交
29
  SRpcMsg *pMsg;
H
Hongze Cheng 已提交
30 31
  SRpcMsg  *pRpc;

H
Hongze Cheng 已提交
32
  *version = pVnode->state.processed;
H
Hongze Cheng 已提交
33
  for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
S
Shengliang Guan 已提交
34 35
    pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
    pRpc = pMsg;
H
Hongze Cheng 已提交
36 37

    // set request version
H
Hongze Cheng 已提交
38
    if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
H
refact  
Hongze Cheng 已提交
39
      vError("vnode:%d  write wal error since %s", TD_VID(pVnode), terrstr());
H
Hongze Cheng 已提交
40
      return -1;
H
Hongze Cheng 已提交
41 42 43 44
    }
  }

  walFsync(pVnode->pWal, false);
H
Hongze Cheng 已提交
45

H
Hongze Cheng 已提交
46
#endif
H
Hongze Cheng 已提交
47
  return 0;
H
Hongze Cheng 已提交
48 49
}

H
Hongze Cheng 已提交
50
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
51
  void *ptr = NULL;
H
Hongze Cheng 已提交
52 53
  void *pReq;
  int   len;
H
Hongze Cheng 已提交
54
  int   ret;
H
Hongze Cheng 已提交
55

H
Hongze Cheng 已提交
56
  vTrace("vgId:%d start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
57
         version);
H
Hongze Cheng 已提交
58

H
Hongze Cheng 已提交
59 60
  pVnode->state.applied = version;

H
Hongze Cheng 已提交
61 62 63
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
64 65

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
66
    /* META */
H
Hongze Cheng 已提交
67
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
68
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
69
      break;
H
Hongze Cheng 已提交
70
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
71
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
72
      break;
H
Hongze Cheng 已提交
73
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
74
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
75
      break;
H
Hongze Cheng 已提交
76
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
77
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
78 79
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
80
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
81
      break;
H
Hongze Cheng 已提交
82
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
83
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
84
      break;
85 86
    case TDMT_VND_CREATE_SMA: {
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
87 88
    } break;
    /* TSDB */
H
Hongze Cheng 已提交
89
    case TDMT_VND_SUBMIT:
H
Hongze Cheng 已提交
90
      if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
91
      break;
H
Hongze Cheng 已提交
92
    /* TQ */
L
Liu Jicong 已提交
93 94 95 96 97 98
    case TDMT_VND_MQ_VG_CHANGE:
      if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                               pMsg->contLen - sizeof(SMsgHead)) < 0) {
        // TODO: handle error
      }
      break;
L
Liu Jicong 已提交
99 100 101 102 103
    case TDMT_VND_MQ_VG_DELETE:
      if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
        // TODO: handle error
      }
      break;
H
Hongze Cheng 已提交
104 105 106 107 108
    case TDMT_VND_TASK_DEPLOY: {
      if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                              pMsg->contLen - sizeof(SMsgHead)) < 0) {
      }
    } break;
L
Liu Jicong 已提交
109
#if 0
H
Hongze Cheng 已提交
110 111 112 113 114
    case TDMT_VND_TASK_WRITE_EXEC: {
      if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
                            0) < 0) {
      }
    } break;
L
Liu Jicong 已提交
115
#endif
S
Shengliang Guan 已提交
116 117
    case TDMT_VND_ALTER_VNODE:
      break;
H
Hongze Cheng 已提交
118 119 120 121 122
    default:
      ASSERT(0);
      break;
  }

H
Hongze Cheng 已提交
123
  vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
H
Hongze Cheng 已提交
124

125 126 127 128 129
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
    vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
    return -1;
  }

H
Hongze Cheng 已提交
130
  // commit if need
H
Hongze Cheng 已提交
131
  if (vnodeShouldCommit(pVnode)) {
H
Hongze Cheng 已提交
132
    vInfo("vgId:%d commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
133 134 135 136 137
    // commit current change
    vnodeCommit(pVnode);

    // start a new one
    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
138 139 140
  }

  return 0;
H
Hongze Cheng 已提交
141 142

_err:
H
Hongze Cheng 已提交
143
  vDebug("vgId:%d process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
144 145
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
146 147 148
}

int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
149
  vTrace("message in vnode query queue is processing");
150
#if 0
151
  SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
152
#endif
153
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
  switch (pMsg->msgType) {
    case TDMT_VND_QUERY:
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
    case TDMT_VND_QUERY_CONTINUE:
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg);
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
  vTrace("message in fetch queue is processing");
  char   *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
  switch (pMsg->msgType) {
    case TDMT_VND_FETCH:
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_FETCH_RSP:
      return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_RES_READY:
      return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_TASKS_STATUS:
      return qWorkerProcessStatusMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_CANCEL_TASK:
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_DROP_TASK:
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_TABLE_META:
      return vnodeGetTableMeta(pVnode, pMsg);
    case TDMT_VND_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
L
Liu Jicong 已提交
186

L
Liu Jicong 已提交
187 188 189 190 191
    case TDMT_VND_TASK_RUN: {
      int32_t code = tqProcessTaskRunReq(pVnode->pTq, pMsg);
      pMsg->pCont = NULL;
      return code;
    }
L
Liu Jicong 已提交
192 193 194 195 196 197 198 199 200 201
    case TDMT_VND_TASK_DISPATCH:
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
    case TDMT_VND_TASK_RECOVER:
      return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
    case TDMT_VND_TASK_DISPATCH_RSP:
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
    case TDMT_VND_TASK_RECOVER_RSP:
      return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);

#if 0
H
Hongze Cheng 已提交
202 203 204
    case TDMT_VND_TASK_PIPE_EXEC:
    case TDMT_VND_TASK_MERGE_EXEC:
      return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
L
Liu Jicong 已提交
205
    case TDMT_VND_STREAM_TRIGGER:{
dengyihao's avatar
dengyihao 已提交
206 207 208 209 210
      // refactor, avoid double free
      int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
      pMsg->pCont = NULL;
      return code;
    }
L
Liu Jicong 已提交
211
#endif
H
Hongze Cheng 已提交
212 213 214 215 216 217 218 219 220 221 222 223 224
    case TDMT_VND_QUERY_HEARTBEAT:
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO

  // blockDebugShowData(data);
225
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
226 227 228
}

int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
229
  int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
H
Hongze Cheng 已提交
230

231 232 233
  if (syncEnvIsStart()) {
    SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
    assert(pSyncNode != NULL);
M
Minghao Li 已提交
234

235 236
    ESyncState state = syncGetMyRole(pVnode->sync);
    SyncTerm   currentTerm = syncGetMyTerm(pVnode->sync);
M
Minghao Li 已提交
237

238
    SMsgHead *pHead = pMsg->pCont;
M
Minghao Li 已提交
239

240 241 242 243 244
    char  logBuf[512];
    char *syncNodeStr = sync2SimpleStr(pVnode->sync);
    snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
    syncRpcMsgLog2(logBuf, pMsg);
    taosMemoryFree(syncNodeStr);
M
Minghao Li 已提交
245

246
    SRpcMsg *pRpcMsg = pMsg;
M
Minghao Li 已提交
247

248 249 250
    if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
251

252
      ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
253
      syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
254

255 256 257
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
258

259
      ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
260
      syncPingDestroy(pSyncMsg);
M
Minghao Li 已提交
261

262 263 264
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
265

266
      ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
267
      syncPingReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
268

269 270 271
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
272

273
      ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
274
      syncClientRequestDestroy(pSyncMsg);
M
Minghao Li 已提交
275

276 277 278
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
279

280
      ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
281
      syncRequestVoteDestroy(pSyncMsg);
M
Minghao Li 已提交
282

283 284 285
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
286

287
      ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
288
      syncRequestVoteReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
289

290 291 292
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
293

294
      ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
295
      syncAppendEntriesDestroy(pSyncMsg);
M
Minghao Li 已提交
296

297 298 299 300
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

301
      ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
302
      syncAppendEntriesReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
303

304 305
    } else {
      vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
306
      ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
307
    }
M
Minghao Li 已提交
308

309 310 311
    syncNodeRelease(pSyncNode);
  } else {
    vError("==vnodeProcessSyncReq== error syncEnv stop");
312
    ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
313
  }
314 315

  return ret;
H
Hongze Cheng 已提交
316 317
}

H
Hongze Cheng 已提交
318
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
319
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
320
  SDecoder       coder;
H
Hongze Cheng 已提交
321

H
Hongze Cheng 已提交
322 323 324 325 326 327
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
328
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
329 330

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
331 332
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
333 334
  }

H
Hongze Cheng 已提交
335
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
336 337
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
338 339
  }

340
  tdProcessRSmaCreate(pVnode->pSma, pVnode->pMeta, &req, &pVnode->msgCb);
C
Cary Xu 已提交
341

H
Hongze Cheng 已提交
342
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
343
  return 0;
H
Hongze Cheng 已提交
344 345

_err:
H
Hongze Cheng 已提交
346
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
347
  return -1;
H
Hongze Cheng 已提交
348 349
}

H
Hongze Cheng 已提交
350
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
351
  SDecoder           decoder = {0};
H
Hongze Cheng 已提交
352 353 354
  int                rcode = 0;
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
355 356 357
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
358
  STbUidStore       *pStore = NULL;
359
  SArray            *tbUids = NULL;
H
Hongze Cheng 已提交
360 361

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
362 363 364
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
365

H
Hongze Cheng 已提交
366
  // decode
H
Hongze Cheng 已提交
367 368
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
369 370 371 372
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
373

H
Hongze Cheng 已提交
374
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
375 376
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
  if (rsp.pArray == NULL || tbUids == NULL) {
H
Hongze Cheng 已提交
377 378 379 380 381
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
382 383 384
  // loop to create table
  for (int iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
H
Hongze Cheng 已提交
385 386 387 388 389 390 391 392 393 394

    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
H
Hongze Cheng 已提交
395
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
H
Hongze Cheng 已提交
396 397 398 399 400
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
401
    } else {
H
Hongze Cheng 已提交
402
      cRsp.code = TSDB_CODE_SUCCESS;
403
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
404
      taosArrayPush(tbUids, &pCreateReq->uid);
H
Hongze Cheng 已提交
405
    }
H
Hongze Cheng 已提交
406 407

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
408 409
  }

H
Hongze Cheng 已提交
410
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
411

412
  tqUpdateTbUidList(pVnode->pTq, tbUids, true);
413 414
  tdUpdateTbUidList(pVnode->pSma, pStore);
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
415

H
Hongze Cheng 已提交
416
  // prepare rsp
H
Hongze Cheng 已提交
417 418
  SEncoder encoder = {0};
  int32_t  ret = 0;
wafwerar's avatar
wafwerar 已提交
419
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
420 421 422 423 424 425
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
426 427 428
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
429

H
Hongze Cheng 已提交
430
_exit:
H
Hongze Cheng 已提交
431
  taosArrayDestroy(rsp.pArray);
432
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
433 434
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
435
  return rcode;
H
Hongze Cheng 已提交
436 437
}

H
Hongze Cheng 已提交
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
454
  }
H
Hongze Cheng 已提交
455 456 457 458 459 460 461 462 463

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
464 465 466
  return 0;
}

H
Hongze Cheng 已提交
467
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
468 469
  SVDropStbReq req = {0};
  int          rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
470
  SDecoder     decoder = {0};
H
Hongze Cheng 已提交
471 472 473 474 475 476

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
477 478
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
479 480 481 482 483
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
H
Hongze Cheng 已提交
484 485 486 487
  // if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
  //   rcode = terrno;
  //   goto _exit;
  // }
H
Hongze Cheng 已提交
488 489 490 491

  // return rsp
_exit:
  pRsp->code = rcode;
H
Hongze Cheng 已提交
492
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
493 494 495
  return 0;
}

H
Hongze Cheng 已提交
496 497
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SVAlterTbReq vAlterTbReq = {0};
H
Hongze Cheng 已提交
498
  SVAlterTbRsp vAlterTbRsp = {0};
H
Hongze Cheng 已提交
499
  SDecoder     dc = {0};
H
Hongze Cheng 已提交
500 501 502
  int          rcode = 0;
  int          ret;
  SEncoder     ec = {0};
H
Hongze Cheng 已提交
503 504 505 506 507 508 509 510 511 512

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
513
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
514
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
515 516
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
517 518 519 520
  }

  // process
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq) < 0) {
H
Hongze Cheng 已提交
521
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
522
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
523 524
    rcode = -1;
    goto _exit;
H
Hongze Cheng 已提交
525 526
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
527 528 529 530 531 532 533

_exit:
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
  tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
  tEncoderClear(&ec);
H
Hongze Cheng 已提交
534 535 536
  return 0;
}

H
Hongze Cheng 已提交
537
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
538 539
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
540
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
541
  SEncoder         encoder = {0};
H
Hongze Cheng 已提交
542
  int              ret;
543
  SArray          *tbUids = NULL;
H
Hongze Cheng 已提交
544

H
Hongze Cheng 已提交
545
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
546 547 548
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
549 550

  // decode req
H
Hongze Cheng 已提交
551 552
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
553 554 555 556 557
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
558 559

  // process req
560
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
H
Hongze Cheng 已提交
561
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
562 563
  if (tbUids == NULL || rsp.pArray == NULL) goto _exit;

H
Hongze Cheng 已提交
564 565 566
  for (int iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
H
Hongze Cheng 已提交
567

H
Hongze Cheng 已提交
568
    /* code */
569
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
H
Hongze Cheng 已提交
570
    if (ret < 0) {
H
Hongze Cheng 已提交
571 572 573 574 575
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
576
    } else {
H
Hongze Cheng 已提交
577
      dropTbRsp.code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
578 579 580 581 582
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

583 584
  tqUpdateTbUidList(pVnode->pTq, tbUids, false);

H
Hongze Cheng 已提交
585
_exit:
586
  taosArrayDestroy(tbUids);
H
Hongze Cheng 已提交
587
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
588 589 590 591 592
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
593 594 595
  return 0;
}

H
Hongze Cheng 已提交
596
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
D
dapan 已提交
597 598 599 600 601 602 603 604 605 606 607
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
  if (!pSchema || (suid != msgIter->suid)) {
    if (pSchema) {
      taosMemoryFreeClear(pSchema);
    }
H
Hongze Cheng 已提交
608
    pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 1);  // TODO: use the real schema
D
dapan 已提交
609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
    if (pSchema) {
      suid = msgIter->suid;
    }
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
628
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
C
Cary Xu 已提交
629 630 631 632 633 634 635 636 637
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SMeta         *pMeta = pVnode->pMeta;
  SSubmitBlk    *pBlock = NULL;

  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
H
Hongze Cheng 已提交
638

D
dapan 已提交
639 640
    vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
  }
C
Cary Xu 已提交
641 642 643 644

  return 0;
}

H
Hongze Cheng 已提交
645
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
646
  SSubmitReq    *pSubmitReq = (SSubmitReq *)pReq;
H
Hongze Cheng 已提交
647
  SSubmitRsp     submitRsp = {0};
H
Hongze Cheng 已提交
648 649 650 651
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock;
  SSubmitRsp     rsp = {0};
  SVCreateTbReq  createTbReq = {0};
H
Hongze Cheng 已提交
652
  SDecoder       decoder = {0};
H
Hongze Cheng 已提交
653
  int32_t        nRows;
H
Hongze Cheng 已提交
654 655
  int32_t        tsize, ret;
  SEncoder       encoder = {0};
656
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
657 658

  pRsp->code = 0;
C
Cary Xu 已提交
659

C
Cary Xu 已提交
660 661 662 663
#ifdef TD_DEBUG_PRINT_ROW
  vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif

C
Cary Xu 已提交
664 665 666 667 668
  if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
    pRsp->code = terrno;
    goto _exit;
  }

H
Hongze Cheng 已提交
669
  // handle the request
H
Hongze Cheng 已提交
670 671 672
  if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    goto _exit;
H
Hongze Cheng 已提交
673 674
  }

H
Hongze Cheng 已提交
675
  submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp));
676 677 678 679 680
  if (!submitRsp.pArray) {
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
681
  for (int i = 0;;) {
H
Hongze Cheng 已提交
682 683 684
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
685 686
    SSubmitBlkRsp submitBlkRsp = {0};

H
Hongze Cheng 已提交
687 688
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
689 690
      submitBlkRsp.hashMeta = 1;

H
Hongze Cheng 已提交
691 692
      tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
H
Hongze Cheng 已提交
693
        pRsp->code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
694
        tDecoderClear(&decoder);
H
Hongze Cheng 已提交
695 696 697 698 699
        goto _exit;
      }

      if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
H
Hongze Cheng 已提交
700
          submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
701
          tDecoderClear(&decoder);
H
Hongze Cheng 已提交
702 703 704 705
          goto _exit;
        }
      }

H
Hongze Cheng 已提交
706
      submitBlkRsp.uid = createTbReq.uid;
D
dapan 已提交
707
      submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
D
dapan 已提交
708
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
709

H
Hongze Cheng 已提交
710 711 712 713 714 715 716
      msgIter.uid = createTbReq.uid;
      if (createTbReq.type == TSDB_CHILD_TABLE) {
        msgIter.suid = createTbReq.ctb.suid;
      } else {
        msgIter.suid = 0;
      }

D
dapan 已提交
717
      vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
H
Hongze Cheng 已提交
718
      tDecoderClear(&decoder);
D
dapan1121 已提交
719 720 721
    } else {
      submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
      sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
H
Hongze Cheng 已提交
722 723
    }

H
Hongze Cheng 已提交
724
    if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
725
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
726 727
    }

H
Hongze Cheng 已提交
728 729 730
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
    taosArrayPush(submitRsp.pArray, &submitBlkRsp);
H
Hongze Cheng 已提交
731
  }
732

H
Hongze Cheng 已提交
733
_exit:
H
Hongze Cheng 已提交
734 735 736 737 738 739 740 741
  tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  pRsp->pCont = rpcMallocCont(tsize);
  pRsp->contLen = tsize;
  tEncoderInit(&encoder, pRsp->pCont, tsize);
  tEncodeSSubmitRsp(&encoder, &submitRsp);
  tEncoderClear(&encoder);

  for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
D
dapan 已提交
742
    taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
H
Hongze Cheng 已提交
743 744 745
  }

  taosArrayDestroy(submitRsp.pArray);
H
Hongze Cheng 已提交
746

747 748 749 750
  // TODO: the partial success scenario and the error case
  // TODO: refactor
  if ((terrno == TSDB_CODE_SUCCESS || terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) &&
      (pRsp->code == TSDB_CODE_SUCCESS)) {
751
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
752
  }
C
Cary Xu 已提交
753

H
Hongze Cheng 已提交
754
  return 0;
L
Liu Jicong 已提交
755
}
756 757 758

static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
  SVCreateTSmaReq req = {0};
H
Hongze Cheng 已提交
759
  SDecoder        coder;
760 761 762 763 764 765 766 767 768 769 770 771

  pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
    pRsp->code = terrno;
    goto _err;
772
  }
C
Cary Xu 已提交
773

C
Cary Xu 已提交
774 775 776 777
  // record current timezone of server side
  req.timezoneInt = tsTimezone;

  if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
778 779
    pRsp->code = terrno;
    goto _err;
780
  }
C
Cary Xu 已提交
781

782
  tDecoderClear(&coder);
C
Cary Xu 已提交
783 784
  vDebug("vgId:%d success to create tsma %s:%" PRIi64 " for table %" PRIi64, TD_VID(pVnode), req.indexName,
         req.indexUid, req.tableUid);
H
Hongze Cheng 已提交
785
  return 0;
786 787 788

_err:
  tDecoderClear(&coder);
C
Cary Xu 已提交
789 790
  vError("vgId:%d failed to create tsma %s:%" PRIi64 " for table %" PRIi64 " since %s", TD_VID(pVnode), req.indexName,
         req.indexUid, req.tableUid, terrstr(terrno));
791
  return -1;
L
Liu Jicong 已提交
792
}