dnodeMgmt.c 4.8 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
17
#include "os.h"
S
slguan 已提交
18
#include "tlog.h"
S
slguan 已提交
19 20
#include "taoserror.h"
#include "taosmsg.h"
H
hzcheng 已提交
21
#include "trpc.h"
S
slguan 已提交
22
#include "dnodeWrite.h"
S
slguan 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
#include "dnodeRead.h"
#include "dnodeMgmt.h"

typedef struct {
  int32_t vgId;     // global vnode group ID
  int     status;   // status: master, slave, notready, deleting
  int     refCount; // reference count
  int64_t version;
  void    *wworker;
  void    *rworker;
  void    *wal;
  void    *tsdb;
  void    *replica;
  void    *events;
  void    *cq;      // continuous query
} SVnodeObj;

static int  dnodeOpenVnodes();
static void dnodeCleanupVnodes();
static int  dnodeCreateVnode(int32_t vgId, SCreateVnodeMsg *cfg);
static int  dnodeDropVnode(SVnodeObj *pVnode);
static void dnodeRemoveVnode(SVnodeObj *pVnode);

static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);

int dnodeInitMgmt() {
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessDropVnodeMsg;
S
#1177  
slguan 已提交
53 54
}

S
slguan 已提交
55 56
void dnodeCleanupMgmt() {

S
#1177  
slguan 已提交
57 58
}

S
slguan 已提交
59 60 61 62 63 64
void dnodeMgmt(SRpcMsg *pMsg) {
  
  terrno = 0;

  if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
    (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
S
slguan 已提交
65
  } else {
S
slguan 已提交
66
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;  
S
slguan 已提交
67
  }
S
slguan 已提交
68 69 70 71 72 73 74

  SRpcMsg rsp;
  rsp.handle = pMsg->handle;
  rsp.code = terrno;
  rsp.pCont = NULL;
  rpcSendResponse(&rsp);
  rpcFreeCont(pMsg->pCont);  // free the received message
S
#1177  
slguan 已提交
75
}
S
slguan 已提交
76 77 78
 
void *dnodeGetVnode(int vgId) {
  SVnodeObj *pVnode;
S
slguan 已提交
79

S
slguan 已提交
80
  // retrieve the pVnode from vgId
S
slguan 已提交
81

S
slguan 已提交
82 83 84 85 86
  
  // if (pVnode->status == ....) {
  //   terrno = ;
  //   return NULL;
  // }
S
slguan 已提交
87

S
slguan 已提交
88 89
  atomic_add_fetch_32(&pVnode->refCount, 1);
  return pVnode;
S
slguan 已提交
90 91
}

S
slguan 已提交
92 93 94
int dnodeGetVnodeStatus(void *pVnode) {
  return ((SVnodeObj *)pVnode)->status;
}
S
slguan 已提交
95

S
slguan 已提交
96 97 98 99 100 101 102 103 104 105
void *dnodeGetVnodeWworker(void *pVnode) {
  return ((SVnodeObj *)pVnode)->wworker;
}
 
void *dnodeGetVnodeRworker(void *pVnode) {
  return ((SVnodeObj *)pVnode)->rworker;
}
 
void *dnodeGetVnodeWal(void *pVnode) {
  return ((SVnodeObj *)pVnode)->wal;
S
slguan 已提交
106 107
}

S
slguan 已提交
108 109
void *dnodeGetVnodeTsdb(void *pVnode) {
  return ((SVnodeObj *)pVnode)->tsdb;
S
slguan 已提交
110
}
S
#1177  
slguan 已提交
111

S
slguan 已提交
112 113
void dnodeReleaseVnode(void *param) {
  SVnodeObj *pVnode = (SVnodeObj *)param;
S
slguan 已提交
114

S
slguan 已提交
115 116
  int refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
  if (refCount == 0) dnodeRemoveVnode(pVnode);
S
slguan 已提交
117 118
}

S
slguan 已提交
119 120
static int dnodeOpenVnode() {
  SVnodeObj *pVnode;
S
slguan 已提交
121

S
slguan 已提交
122
  // create tsdb
S
slguan 已提交
123

S
slguan 已提交
124
  // create wal
S
slguan 已提交
125

S
slguan 已提交
126 127
  // allocate write worker
  pVnode->wworker = dnodeAllocateWriteWorker(); 
H
hzcheng 已提交
128

S
slguan 已提交
129 130
  // create read queue
  pVnode->rworker = dnodeAllocateReadWorker(); 
S
slguan 已提交
131

S
slguan 已提交
132
  // create the replica 
S
slguan 已提交
133

S
slguan 已提交
134
  // set the status
H
hzcheng 已提交
135

S
slguan 已提交
136
  pVnode->refCount = 1;
S
slguan 已提交
137

S
slguan 已提交
138
  return 0;
H
hzcheng 已提交
139 140
}

S
slguan 已提交
141 142 143
static int dnodeOpenVnodes() {
  return 0;
}
H
hzcheng 已提交
144

S
slguan 已提交
145
static void dnodeCleanupVnode() {
S
slguan 已提交
146

H
hzcheng 已提交
147 148
}

S
slguan 已提交
149
static void dnodeCleanupVnodes() {
H
hzcheng 已提交
150

S
slguan 已提交
151 152
}

S
slguan 已提交
153
static int dnodeCreateVnode(int32_t vgId, SCreateVnodeMsg *cfg) {
S
slguan 已提交
154

S
slguan 已提交
155 156 157 158 159 160 161 162
  SVnodeObj *pVnode = malloc(sizeof(SVnodeObj));
  
  // save the vnode info in non-volatile storage
  
  // add into hash, so it can be retrieved
  dnodeOpenVnode(pVnode);

  return 0;
H
hzcheng 已提交
163 164
}

S
slguan 已提交
165 166 167 168 169 170 171 172 173 174 175 176 177
static void dnodeRemoveVnode(SVnodeObj *pVnode) {
  
  // remove replica

  // remove read queue
  dnodeFreeReadWorker(pVnode->rworker);

  // remove write queue
  dnodeFreeWriteWorker(pVnode->wworker);
 
  // remove wal

  // remove tsdb
S
slguan 已提交
178

S
slguan 已提交
179
}
H
hzcheng 已提交
180

S
slguan 已提交
181 182 183
static int dnodeDropVnode(SVnodeObj *pVnode) {
  
  int count = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
slguan 已提交
184

S
slguan 已提交
185 186 187
  if (count<=0) dnodeRemoveVnode(pVnode);

  return 0;
H
hzcheng 已提交
188 189
}

S
slguan 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203
static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg) {

//  SVnodeObj  *pVnode;
//  int         vgId;
//  SVPeersMsg *pCfg;
  
  // check everything, if not ok, set terrno;


  // everything is ok

//  dnodeCreateVnode(vgId, pCfg);

  //if (pVnode == NULL) terrno = TSDB_CODE
S
slguan 已提交
204 205
}

S
slguan 已提交
206
static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg) {
H
hzcheng 已提交
207

S
slguan 已提交
208 209 210 211 212 213 214 215 216 217
  SVnodeObj *pVnode;
  int        vgId;
  
  // check everything, if not ok, set terrno;


  // everything is ok
  dnodeDropVnode(pVnode);

  //if (pVnode == NULL) terrno = TSDB_CODE
H
hzcheng 已提交
218 219
}

S
slguan 已提交
220 221 222 223 224 225 226 227 228 229
static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg) {

  SVnodeObj *pVnode;
  int        vgId;
  
  // check everything, if not ok, set terrno;


  // everything is ok
//  dnodeAlterVnode(pVnode);
S
#1177  
slguan 已提交
230

S
slguan 已提交
231
  //if (pVnode == NULL) terrno = TSDB_CODE
S
slguan 已提交
232
}
S
slguan 已提交
233