vnodeWrite.c 2.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#include "vnodeDef.h"

H
refact  
Hongze Cheng 已提交
18
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
H
more  
Hongze Cheng 已提交
19 20
  SRpcMsg *  pMsg;
  SVnodeReq *pVnodeReq;
H
more  
Hongze Cheng 已提交
21

H
more  
Hongze Cheng 已提交
22 23
  for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
    pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
H
more  
Hongze Cheng 已提交
24

H
more  
Hongze Cheng 已提交
25 26 27 28 29 30 31
    // ser request version
    pVnodeReq = (SVnodeReq *)(pMsg->pCont);
    pVnodeReq->ver = pVnode->state.processed++;

    if (walWrite(pVnode->pWal, pVnodeReq->ver, pVnodeReq->req, pMsg->contLen - sizeof(pVnodeReq->ver)) < 0) {
      // TODO: handle error
    }
H
more  
Hongze Cheng 已提交
32 33
  }

H
more  
Hongze Cheng 已提交
34
  walFsync(pVnode->pWal, false);
H
refact  
Hongze Cheng 已提交
35

H
more  
Hongze Cheng 已提交
36 37 38 39 40
  // Apply each request now
  for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
    pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
    pVnodeReq = (SVnodeReq *)(pMsg->pCont);
    SVCreateTableReq ctReq;
H
refact  
Hongze Cheng 已提交
41

H
more  
Hongze Cheng 已提交
42 43 44 45 46
    // Apply the request
    {
      void *ptr = vnodeMalloc(pVnode, pMsg->contLen);
      if (ptr == NULL) {
        // TODO: handle error
H
more  
Hongze Cheng 已提交
47
      }
H
more  
Hongze Cheng 已提交
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75

      memcpy(ptr, pVnodeReq, pMsg->contLen);

      // todo: change the interface here
      if (tqPushMsg(pVnode->pTq, pVnodeReq->req, pVnodeReq->ver) < 0) {
        // TODO: handle error
      }

      switch (pMsg->msgType) {
        case TSDB_MSG_TYPE_CREATE_TABLE:
          if (vnodeParseCreateTableReq(pVnodeReq->req, pMsg->contLen - sizeof(pVnodeReq->ver), &(ctReq)) < 0) {
            // TODO: handle error
          }

          if (metaCreateTable(pVnode->pMeta, &ctReq) < 0) {
            // TODO: handle error
          }

          // TODO: maybe need to clear the requst struct
          break;
        case TSDB_MSG_TYPE_DROP_TABLE:
          /* code */
          break;
        case TSDB_MSG_TYPE_SUBMIT:
          /* code */
          break;
        default:
          break;
H
more  
Hongze Cheng 已提交
76
      }
H
more  
Hongze Cheng 已提交
77 78 79 80 81 82 83 84
    }

    pVnode->state.applied = pVnodeReq->ver;

    // Check if it needs to commit
    if (vnodeShouldCommit(pVnode)) {
      if (vnodeAsyncCommit(pVnode) < 0) {
        // TODO: handle error
H
more  
Hongze Cheng 已提交
85
      }
H
more  
Hongze Cheng 已提交
86
    }
H
refact  
Hongze Cheng 已提交
87 88
  }

H
more  
Hongze Cheng 已提交
89
  return 0;
H
more  
Hongze Cheng 已提交
90 91
}

H
more  
Hongze Cheng 已提交
92 93 94 95 96
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
  // TODO
  return 0;
}

H
Hongze Cheng 已提交
97
/* ------------------------ STATIC METHODS ------------------------ */