vnodeSvr.c 21.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
19
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
20
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
21
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
22
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
23
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
24
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
25
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
H
Hongze Cheng 已提交
26

H
Hongze Cheng 已提交
27
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
H
Hongze Cheng 已提交
28
#if 0
H
Hongze Cheng 已提交
29 30 31
  SNodeMsg *pMsg;
  SRpcMsg  *pRpc;

H
Hongze Cheng 已提交
32
  *version = pVnode->state.processed;
H
Hongze Cheng 已提交
33 34 35 36 37
  for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
    pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i);
    pRpc = &pMsg->rpcMsg;

    // set request version
H
Hongze Cheng 已提交
38
    if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
H
refact  
Hongze Cheng 已提交
39
      vError("vnode:%d  write wal error since %s", TD_VID(pVnode), terrstr());
H
Hongze Cheng 已提交
40
      return -1;
H
Hongze Cheng 已提交
41 42 43 44
    }
  }

  walFsync(pVnode->pWal, false);
H
Hongze Cheng 已提交
45

H
Hongze Cheng 已提交
46
#endif
H
Hongze Cheng 已提交
47
  return 0;
H
Hongze Cheng 已提交
48 49
}

H
Hongze Cheng 已提交
50
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
51
  void *ptr = NULL;
H
Hongze Cheng 已提交
52 53
  void *pReq;
  int   len;
H
Hongze Cheng 已提交
54
  int   ret;
H
Hongze Cheng 已提交
55

H
Hongze Cheng 已提交
56
  vTrace("vgId:%d start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
57
         version);
H
Hongze Cheng 已提交
58

H
Hongze Cheng 已提交
59 60
  pVnode->state.applied = version;

H
Hongze Cheng 已提交
61 62 63
  // skip header
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  len = pMsg->contLen - sizeof(SMsgHead);
H
Hongze Cheng 已提交
64

H
Hongze Cheng 已提交
65
  if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
H
Hongze Cheng 已提交
66
    vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
67
    return -1;
H
Hongze Cheng 已提交
68 69 70
  }

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
71
    /* META */
H
Hongze Cheng 已提交
72
    case TDMT_VND_CREATE_STB:
H
Hongze Cheng 已提交
73
      if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
74
      break;
H
Hongze Cheng 已提交
75
    case TDMT_VND_ALTER_STB:
H
Hongze Cheng 已提交
76
      if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
77
      break;
H
Hongze Cheng 已提交
78
    case TDMT_VND_DROP_STB:
H
Hongze Cheng 已提交
79
      if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
80
      break;
H
Hongze Cheng 已提交
81
    case TDMT_VND_CREATE_TABLE:
H
Hongze Cheng 已提交
82
      if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
83 84
      break;
    case TDMT_VND_ALTER_TABLE:
H
Hongze Cheng 已提交
85
      if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
86
      break;
H
Hongze Cheng 已提交
87
    case TDMT_VND_DROP_TABLE:
H
Hongze Cheng 已提交
88
      if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
89
      break;
90 91
    case TDMT_VND_CREATE_SMA: {
      if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
92 93
    } break;
    /* TSDB */
H
Hongze Cheng 已提交
94
    case TDMT_VND_SUBMIT:
H
Hongze Cheng 已提交
95
      if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
H
Hongze Cheng 已提交
96
      break;
H
Hongze Cheng 已提交
97
    /* TQ */
L
Liu Jicong 已提交
98 99 100 101 102 103
    case TDMT_VND_MQ_VG_CHANGE:
      if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                               pMsg->contLen - sizeof(SMsgHead)) < 0) {
        // TODO: handle error
      }
      break;
H
Hongze Cheng 已提交
104 105 106 107 108 109 110 111 112 113
    case TDMT_VND_TASK_DEPLOY: {
      if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
                              pMsg->contLen - sizeof(SMsgHead)) < 0) {
      }
    } break;
    case TDMT_VND_TASK_WRITE_EXEC: {
      if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
                            0) < 0) {
      }
    } break;
S
Shengliang Guan 已提交
114 115
    case TDMT_VND_ALTER_VNODE:
      break;
H
Hongze Cheng 已提交
116 117 118 119 120
    default:
      ASSERT(0);
      break;
  }

H
Hongze Cheng 已提交
121
  vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
H
Hongze Cheng 已提交
122

H
Hongze Cheng 已提交
123
  // commit if need
H
Hongze Cheng 已提交
124
  if (vnodeShouldCommit(pVnode)) {
H
Hongze Cheng 已提交
125
    vInfo("vgId:%d commit at version %" PRId64, TD_VID(pVnode), version);
H
Hongze Cheng 已提交
126 127 128 129 130
    // commit current change
    vnodeCommit(pVnode);

    // start a new one
    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
131 132 133
  }

  return 0;
H
Hongze Cheng 已提交
134 135

_err:
H
Hongze Cheng 已提交
136
  vDebug("vgId:%d process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
H
Hongze Cheng 已提交
137 138
         tstrerror(terrno), version);
  return -1;
H
Hongze Cheng 已提交
139 140 141
}

int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
D
dapan 已提交
142
  vTrace("message in vnode query queue is processing");
143
#if 0
144
  SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
145
#endif
146
  SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
H
Hongze Cheng 已提交
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
  switch (pMsg->msgType) {
    case TDMT_VND_QUERY:
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
    case TDMT_VND_QUERY_CONTINUE:
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg);
    default:
      vError("unknown msg type:%d in query queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
  vTrace("message in fetch queue is processing");
  char   *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
  switch (pMsg->msgType) {
    case TDMT_VND_FETCH:
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_FETCH_RSP:
      return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_RES_READY:
      return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_TASKS_STATUS:
      return qWorkerProcessStatusMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_CANCEL_TASK:
      return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_DROP_TASK:
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
    case TDMT_VND_TABLE_META:
      return vnodeGetTableMeta(pVnode, pMsg);
    case TDMT_VND_CONSUME:
      return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
    case TDMT_VND_TASK_PIPE_EXEC:
    case TDMT_VND_TASK_MERGE_EXEC:
      return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
    case TDMT_VND_STREAM_TRIGGER:
      return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
    case TDMT_VND_QUERY_HEARTBEAT:
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
    default:
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
      return TSDB_CODE_VND_APP_ERROR;
  }
}

// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
  // TODO

  // blockDebugShowData(data);
197
  tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
H
Hongze Cheng 已提交
198 199 200
}

int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
201 202
  int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
  
203 204 205
  if (syncEnvIsStart()) {
    SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
    assert(pSyncNode != NULL);
M
Minghao Li 已提交
206

207 208
    ESyncState state = syncGetMyRole(pVnode->sync);
    SyncTerm   currentTerm = syncGetMyTerm(pVnode->sync);
M
Minghao Li 已提交
209

210
    SMsgHead *pHead = pMsg->pCont;
M
Minghao Li 已提交
211

212 213 214 215 216
    char  logBuf[512];
    char *syncNodeStr = sync2SimpleStr(pVnode->sync);
    snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
    syncRpcMsgLog2(logBuf, pMsg);
    taosMemoryFree(syncNodeStr);
M
Minghao Li 已提交
217

218
    SRpcMsg *pRpcMsg = pMsg;
M
Minghao Li 已提交
219

220 221 222
    if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
223

224
      ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
225
      syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
226

227 228 229
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
230

231
      ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
232
      syncPingDestroy(pSyncMsg);
M
Minghao Li 已提交
233

234 235 236
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
237

238
      ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
239
      syncPingReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
240

241 242 243
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
244

245
      ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
246
      syncClientRequestDestroy(pSyncMsg);
M
Minghao Li 已提交
247

248 249 250
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
251

252
      ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
253
      syncRequestVoteDestroy(pSyncMsg);
M
Minghao Li 已提交
254

255 256 257
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
258

259
      ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
260
      syncRequestVoteReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
261

262 263 264
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);
M
Minghao Li 已提交
265

266
      ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
267
      syncAppendEntriesDestroy(pSyncMsg);
M
Minghao Li 已提交
268

269 270 271 272
    } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

273
      ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
274
      syncAppendEntriesReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
275

276 277
    } else {
      vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
278
      ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
279
    }
M
Minghao Li 已提交
280

281 282 283
    syncNodeRelease(pSyncNode);
  } else {
    vError("==vnodeProcessSyncReq== error syncEnv stop");
284
    ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
285
  }
286 287

  return ret;
H
Hongze Cheng 已提交
288 289
}

H
Hongze Cheng 已提交
290
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
291
  SVCreateStbReq req = {0};
H
Hongze Cheng 已提交
292
  SDecoder       coder;
H
Hongze Cheng 已提交
293

H
Hongze Cheng 已提交
294 295 296 297 298 299
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
H
Hongze Cheng 已提交
300
  tDecoderInit(&coder, pReq, len);
H
Hongze Cheng 已提交
301 302

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
H
Hongze Cheng 已提交
303 304
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
305 306
  }

H
Hongze Cheng 已提交
307
  if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
H
Hongze Cheng 已提交
308 309
    pRsp->code = terrno;
    goto _err;
H
Hongze Cheng 已提交
310 311
  }

312
  tdProcessRSmaCreate(pVnode->pSma, pVnode->pMeta, &req, &pVnode->msgCb);
C
Cary Xu 已提交
313

H
Hongze Cheng 已提交
314
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
315
  return 0;
H
Hongze Cheng 已提交
316 317

_err:
H
Hongze Cheng 已提交
318
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
319
  return -1;
H
Hongze Cheng 已提交
320 321
}

H
Hongze Cheng 已提交
322
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
323
  SDecoder           decoder = {0};
H
Hongze Cheng 已提交
324 325 326
  int                rcode = 0;
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
H
Hongze Cheng 已提交
327 328 329
  SVCreateTbBatchRsp rsp = {0};
  SVCreateTbRsp      cRsp = {0};
  char               tbName[TSDB_TABLE_FNAME_LEN];
C
Cary Xu 已提交
330
  STbUidStore       *pStore = NULL;
H
Hongze Cheng 已提交
331 332

  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
H
Hongze Cheng 已提交
333 334 335
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
H
Hongze Cheng 已提交
336

H
Hongze Cheng 已提交
337
  // decode
H
Hongze Cheng 已提交
338 339
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
340 341 342 343
    rcode = -1;
    terrno = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }
H
Hongze Cheng 已提交
344

H
Hongze Cheng 已提交
345
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
H
Hongze Cheng 已提交
346 347 348 349 350 351
  if (rsp.pArray == NULL) {
    rcode = -1;
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
352 353 354
  // loop to create table
  for (int iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
H
Hongze Cheng 已提交
355 356 357 358 359 360 361 362 363 364

    // validate hash
    sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
      taosArrayPush(rsp.pArray, &cRsp);
      continue;
    }

    // do create table
H
Hongze Cheng 已提交
365
    if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
H
Hongze Cheng 已提交
366 367 368 369 370
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
        cRsp.code = TSDB_CODE_SUCCESS;
      } else {
        cRsp.code = terrno;
      }
H
Hongze Cheng 已提交
371
    } else {
H
Hongze Cheng 已提交
372
      cRsp.code = TSDB_CODE_SUCCESS;
373
      tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
H
Hongze Cheng 已提交
374
    }
H
Hongze Cheng 已提交
375 376

    taosArrayPush(rsp.pArray, &cRsp);
H
Hongze Cheng 已提交
377 378
  }

H
Hongze Cheng 已提交
379
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
380

381 382
  tdUpdateTbUidList(pVnode->pSma, pStore);
  tdUidStoreFree(pStore);
C
Cary Xu 已提交
383

H
Hongze Cheng 已提交
384
  // prepare rsp
H
Hongze Cheng 已提交
385 386
  SEncoder encoder = {0};
  int32_t  ret = 0;
wafwerar's avatar
wafwerar 已提交
387
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
H
Hongze Cheng 已提交
388 389 390 391 392 393
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  if (pRsp->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    rcode = -1;
    goto _exit;
  }
H
Hongze Cheng 已提交
394 395 396
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
397

H
Hongze Cheng 已提交
398
_exit:
H
Hongze Cheng 已提交
399
  taosArrayDestroy(rsp.pArray);
H
Hongze Cheng 已提交
400 401
  tDecoderClear(&decoder);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
402
  return rcode;
H
Hongze Cheng 已提交
403 404
}

H
Hongze Cheng 已提交
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SVCreateStbReq req = {0};
  SDecoder       dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  tDecoderInit(&dc, pReq, len);

  // decode req
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
H
Hongze Cheng 已提交
421
  }
H
Hongze Cheng 已提交
422 423 424 425 426 427 428 429 430

  if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);

H
Hongze Cheng 已提交
431 432 433
  return 0;
}

H
Hongze Cheng 已提交
434
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
435 436
  SVDropStbReq req = {0};
  int          rcode = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
437
  SDecoder     decoder = {0};
H
Hongze Cheng 已提交
438 439 440 441 442 443

  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode request
H
Hongze Cheng 已提交
444 445
  tDecoderInit(&decoder, pReq, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
H
Hongze Cheng 已提交
446 447 448 449 450
    rcode = TSDB_CODE_INVALID_MSG;
    goto _exit;
  }

  // process request
H
Hongze Cheng 已提交
451 452 453 454
  // if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
  //   rcode = terrno;
  //   goto _exit;
  // }
H
Hongze Cheng 已提交
455 456 457 458

  // return rsp
_exit:
  pRsp->code = rcode;
H
Hongze Cheng 已提交
459
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
460 461 462
  return 0;
}

H
Hongze Cheng 已提交
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
  SVAlterTbReq vAlterTbReq = {0};
  SDecoder     dc = {0};

  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;

  tDecoderInit(&dc, pReq, len);

  // decode
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }

  // process
  if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq) < 0) {
    pRsp->code = terrno;
    tDecoderClear(&dc);
    return -1;
  }

  tDecoderClear(&dc);
H
Hongze Cheng 已提交
489 490 491
  return 0;
}

H
Hongze Cheng 已提交
492
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
493 494
  SVDropTbBatchReq req = {0};
  SVDropTbBatchRsp rsp = {0};
H
Hongze Cheng 已提交
495
  SDecoder         decoder = {0};
H
Hongze Cheng 已提交
496
  SEncoder         encoder = {0};
H
Hongze Cheng 已提交
497 498
  int              ret;

H
Hongze Cheng 已提交
499
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
H
Hongze Cheng 已提交
500 501 502
  pRsp->pCont = NULL;
  pRsp->contLen = 0;
  pRsp->code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
503 504

  // decode req
H
Hongze Cheng 已提交
505 506
  tDecoderInit(&decoder, pReq, len);
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
H
Hongze Cheng 已提交
507 508 509 510 511
  if (ret < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    pRsp->code = terrno;
    goto _exit;
  }
H
Hongze Cheng 已提交
512 513

  // process req
H
Hongze Cheng 已提交
514
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
H
Hongze Cheng 已提交
515 516 517
  for (int iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
    SVDropTbRsp  dropTbRsp = {0};
H
Hongze Cheng 已提交
518

H
Hongze Cheng 已提交
519 520 521
    /* code */
    ret = metaDropTable(pVnode->pMeta, version, pDropTbReq);
    if (ret < 0) {
H
Hongze Cheng 已提交
522 523 524 525 526
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
        dropTbRsp.code = TSDB_CODE_SUCCESS;
      } else {
        dropTbRsp.code = terrno;
      }
H
Hongze Cheng 已提交
527
    } else {
H
Hongze Cheng 已提交
528
      dropTbRsp.code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
529 530 531 532 533 534
    }

    taosArrayPush(rsp.pArray, &dropTbRsp);
  }

_exit:
H
Hongze Cheng 已提交
535
  tDecoderClear(&decoder);
H
Hongze Cheng 已提交
536 537 538 539 540
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
  tEncodeSVDropTbBatchRsp(&encoder, &rsp);
  tEncoderClear(&encoder);
H
Hongze Cheng 已提交
541 542 543
  return 0;
}

H
Hongze Cheng 已提交
544
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
D
dapan 已提交
545 546 547 548 549 550 551 552 553 554 555
  SSubmitBlkIter blkIter = {0};
  STSchema      *pSchema = NULL;
  tb_uid_t       suid = 0;
  STSRow        *row = NULL;

  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
  if (blkIter.row == NULL) return 0;
  if (!pSchema || (suid != msgIter->suid)) {
    if (pSchema) {
      taosMemoryFreeClear(pSchema);
    }
H
Hongze Cheng 已提交
556
    pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 1);  // TODO: use the real schema
D
dapan 已提交
557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
    if (pSchema) {
      suid = msgIter->suid;
    }
  }
  if (!pSchema) {
    printf("%s:%d no valid schema\n", tags, __LINE__);
    return -1;
  }
  char __tags[128] = {0};
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
  while ((row = tGetSubmitBlkNext(&blkIter))) {
    tdSRowPrint(row, pSchema, __tags);
  }

  taosMemoryFreeClear(pSchema);

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
576
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
C
Cary Xu 已提交
577 578 579 580 581 582 583 584 585
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SMeta         *pMeta = pVnode->pMeta;
  SSubmitBlk    *pBlock = NULL;

  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
  while (true) {
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
H
Hongze Cheng 已提交
586

D
dapan 已提交
587 588
    vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
  }
C
Cary Xu 已提交
589 590 591 592

  return 0;
}

H
Hongze Cheng 已提交
593
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
H
Hongze Cheng 已提交
594
  SSubmitReq    *pSubmitReq = (SSubmitReq *)pReq;
H
Hongze Cheng 已提交
595
  SSubmitRsp     submitRsp = {0};
H
Hongze Cheng 已提交
596 597 598 599
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock;
  SSubmitRsp     rsp = {0};
  SVCreateTbReq  createTbReq = {0};
H
Hongze Cheng 已提交
600
  SDecoder       decoder = {0};
H
Hongze Cheng 已提交
601
  int32_t        nRows;
H
Hongze Cheng 已提交
602 603
  int32_t        tsize, ret;
  SEncoder       encoder = {0};
604
  terrno = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
605 606

  pRsp->code = 0;
C
Cary Xu 已提交
607

C
Cary Xu 已提交
608 609 610 611
#ifdef TD_DEBUG_PRINT_ROW
  vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif

H
Hongze Cheng 已提交
612
  // handle the request
H
Hongze Cheng 已提交
613 614 615
  if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
    pRsp->code = TSDB_CODE_INVALID_MSG;
    goto _exit;
H
Hongze Cheng 已提交
616 617
  }

H
Hongze Cheng 已提交
618
  submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp));
619 620 621 622 623
  if (!submitRsp.pArray) {
    pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
624
  for (int i = 0;;) {
H
Hongze Cheng 已提交
625 626 627
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

H
Hongze Cheng 已提交
628 629
    SSubmitBlkRsp submitBlkRsp = {0};

H
Hongze Cheng 已提交
630 631
    // create table for auto create table mode
    if (msgIter.schemaLen > 0) {
H
Hongze Cheng 已提交
632 633
      submitBlkRsp.hashMeta = 1;

H
Hongze Cheng 已提交
634 635
      tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
      if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
H
Hongze Cheng 已提交
636
        pRsp->code = TSDB_CODE_INVALID_MSG;
H
Hongze Cheng 已提交
637
        tDecoderClear(&decoder);
H
Hongze Cheng 已提交
638 639 640 641 642
        goto _exit;
      }

      if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
H
Hongze Cheng 已提交
643
          submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
644
          tDecoderClear(&decoder);
H
Hongze Cheng 已提交
645 646 647 648
          goto _exit;
        }
      }

H
Hongze Cheng 已提交
649
      submitBlkRsp.uid = createTbReq.uid;
D
dapan 已提交
650 651
      submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
      sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name);
H
Hongze Cheng 已提交
652

H
Hongze Cheng 已提交
653 654 655 656 657 658 659
      msgIter.uid = createTbReq.uid;
      if (createTbReq.type == TSDB_CHILD_TABLE) {
        msgIter.suid = createTbReq.ctb.suid;
      } else {
        msgIter.suid = 0;
      }

D
dapan 已提交
660
      vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
H
Hongze Cheng 已提交
661
      tDecoderClear(&decoder);
H
Hongze Cheng 已提交
662 663
    }

H
Hongze Cheng 已提交
664
    if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) {
H
Hongze Cheng 已提交
665
      submitBlkRsp.code = terrno;
H
Hongze Cheng 已提交
666 667
    }

H
Hongze Cheng 已提交
668 669 670
    submitRsp.numOfRows += submitBlkRsp.numOfRows;
    submitRsp.affectedRows += submitBlkRsp.affectedRows;
    taosArrayPush(submitRsp.pArray, &submitBlkRsp);
H
Hongze Cheng 已提交
671
  }
672

H
Hongze Cheng 已提交
673
_exit:
H
Hongze Cheng 已提交
674 675 676 677 678 679 680 681
  tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
  pRsp->pCont = rpcMallocCont(tsize);
  pRsp->contLen = tsize;
  tEncoderInit(&encoder, pRsp->pCont, tsize);
  tEncodeSSubmitRsp(&encoder, &submitRsp);
  tEncoderClear(&encoder);

  for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
D
dapan 已提交
682
    taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
H
Hongze Cheng 已提交
683 684 685
  }

  taosArrayDestroy(submitRsp.pArray);
H
Hongze Cheng 已提交
686

687 688 689 690
  // TODO: the partial success scenario and the error case
  // TODO: refactor
  if ((terrno == TSDB_CODE_SUCCESS || terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) &&
      (pRsp->code == TSDB_CODE_SUCCESS)) {
691
    tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
692
  }
C
Cary Xu 已提交
693

H
Hongze Cheng 已提交
694
  return 0;
L
Liu Jicong 已提交
695
}
696 697 698

static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
  SVCreateTSmaReq req = {0};
H
Hongze Cheng 已提交
699
  SDecoder        coder;
700 701 702 703 704 705 706 707 708 709 710 711 712

  pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
  pRsp->code = TSDB_CODE_SUCCESS;
  pRsp->pCont = NULL;
  pRsp->contLen = 0;

  // decode and process req
  tDecoderInit(&coder, pReq, len);

  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
    pRsp->code = terrno;
    goto _err;
  }
H
Hongze Cheng 已提交
713

C
Cary Xu 已提交
714
  if (metaCreateTSma(pVnode->pMeta, version, &req) < 0) {
715 716
    pRsp->code = terrno;
    goto _err;
717
  }
C
Cary Xu 已提交
718

719
  tDecoderClear(&coder);
H
Hongze Cheng 已提交
720
  return 0;
721 722 723 724

_err:
  tDecoderClear(&coder);
  return -1;
L
Liu Jicong 已提交
725
}