vnodeSync.c 4.4 KB
Newer Older
S
TD-2289  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "query.h"
#include "dnode.h"
S
TD-2289  
Shengliang Guan 已提交
21 22
#include "vnodeVersion.h"
#include "vnodeMain.h"
S
TD-2798  
Shengliang Guan 已提交
23
#include "vnodeStatus.h"
S
TD-2289  
Shengliang Guan 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86

uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver) {
  SVnodeObj *pVnode = vnodeAcquire(vgId);
  if (pVnode == NULL) {
    vError("vgId:%d, vnode not found while get file info", vgId);
    return 0;
  }

  *fver = pVnode->fversion;
  uint32_t ret = tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);

  vnodeRelease(pVnode);
  return ret;
}

int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
  SVnodeObj *pVnode = vnodeAcquire(vgId);
  if (pVnode == NULL) {
    vError("vgId:%d, vnode not found while get wal info", vgId);
    return -1;
  }

  int32_t code = walGetWalFile(pVnode->wal, fileName, fileId);

  vnodeRelease(pVnode);
  return code;
}

void vnodeNotifyRole(int32_t vgId, int8_t role) {
  SVnodeObj *pVnode = vnodeAcquire(vgId);
  if (pVnode == NULL) {
    vTrace("vgId:%d, vnode not found while notify role", vgId);
    return;
  }

  vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]);
  pVnode->role = role;
  dnodeSendStatusMsgToMnode();

  if (pVnode->role == TAOS_SYNC_ROLE_MASTER) {
    cqStart(pVnode->cq);
  } else {
    cqStop(pVnode->cq);
  }

  vnodeRelease(pVnode);
}

void vnodeCtrlFlow(int32_t vgId, int32_t level) {
  SVnodeObj *pVnode = vnodeAcquire(vgId);
  if (pVnode == NULL) {
    vTrace("vgId:%d, vnode not found while flow ctrl", vgId);
    return;
  }

  if (pVnode->flowctrlLevel != level) {
    vDebug("vgId:%d, set flowctrl level from %d to %d", pVnode->vgId, pVnode->flowctrlLevel, level);
    pVnode->flowctrlLevel = level;
  }

  vnodeRelease(pVnode);
}

S
TD-2798  
Shengliang Guan 已提交
87
void vnodeStartSyncFile(int32_t vgId) {
S
TD-2289  
Shengliang Guan 已提交
88 89
  SVnodeObj *pVnode = vnodeAcquire(vgId);
  if (pVnode == NULL) {
S
TD-2798  
Shengliang Guan 已提交
90 91 92 93
    vError("vgId:%d, vnode not found while start filesync", vgId);
    return;
  }

S
Shengliang Guan 已提交
94
  vInfo("vgId:%d, datafile will be synced", vgId);
S
TD-2798  
Shengliang Guan 已提交
95 96 97 98 99 100 101 102 103 104
  vnodeSetResetStatus(pVnode);

  vnodeRelease(pVnode);
}

void vnodeStopSyncFile(int32_t vgId, uint64_t fversion) {
  SVnodeObj *pVnode = vnodeAcquire(vgId);
  if (pVnode == NULL) {
    vError("vgId:%d, vnode not found while stop filesync", vgId);
    return;
S
TD-2289  
Shengliang Guan 已提交
105 106 107 108 109
  }

  pVnode->fversion = fversion;
  pVnode->version = fversion;
  vnodeSaveVersion(pVnode);
S
slguan 已提交
110
  walResetVersion(pVnode->wal, fversion);
S
TD-2289  
Shengliang Guan 已提交
111

S
slguan 已提交
112
  vInfo("vgId:%d, datafile is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion);
S
TD-2798  
Shengliang Guan 已提交
113
  vnodeSetReadyStatus(pVnode);
S
TD-2289  
Shengliang Guan 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149

  vnodeRelease(pVnode);
}

void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) {
  void *pVnode = vnodeAcquire(vgId);
  if (pVnode == NULL) {
    vError("vgId:%d, vnode not found while confirm forward", vgId);
  }

  dnodeSendRpcVWriteRsp(pVnode, wparam, code);
  vnodeRelease(pVnode);
}

int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam) {
  SVnodeObj *pVnode = vnodeAcquire(vgId);
  if (pVnode == NULL) {
    vError("vgId:%d, vnode not found while write to cache", vgId);
    return TSDB_CODE_VND_INVALID_VGROUP_ID;
  }

  int32_t code = vnodeWriteToWQueue(pVnode, wparam, qtype, rparam);

  vnodeRelease(pVnode);
  return code;
}

int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) {
  SVnodeObj *pVnode = vnodeAcquire(vgId);
  if (pVnode == NULL) {
    vError("vgId:%d, vnode not found while write to cache", vgId);
    return -1;
  }

  int32_t code = 0;
  if (pVnode->isCommiting) {
S
Shengliang Guan 已提交
150
    vInfo("vgId:%d, vnode is commiting while get version", vgId);
S
TD-2289  
Shengliang Guan 已提交
151 152 153 154 155 156 157 158 159
    code = -1;
  } else {
    *fver = pVnode->fversion;
    *wver = pVnode->version;
  }

  vnodeRelease(pVnode);
  return code;
}
S
TD-2289  
Shengliang Guan 已提交
160

S
TD-3308  
Shengliang Guan 已提交
161
void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code, bool force) {
S
TD-2289  
Shengliang Guan 已提交
162
  SVnodeObj *pVnode = vparam;
S
TD-3308  
Shengliang Guan 已提交
163
  syncConfirmForward(pVnode->sync, version, code, force);
164
}