mndSync.c 7.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndSync.h"
18
#include "mndTrans.h"
19

20
static int32_t mndInitWal(SMnode *pMnode) {
S
Shengliang Guan 已提交
21 22
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;

S
Shengliang Guan 已提交
23 24
  char path[PATH_MAX] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
25 26 27 28 29 30 31 32 33
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };
S
Shengliang Guan 已提交
34
  pMgmt->pWal = walOpen(path, &cfg);
35 36 37 38 39 40 41 42 43 44 45 46 47 48
  if (pMgmt->pWal == NULL) return -1;

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  if (pMgmt->pWal != NULL) {
    walClose(pMgmt->pWal);
    pMgmt->pWal = NULL;
  }
}

static int32_t mndRestoreWal(SMnode *pMnode) {
49 50
  // do nothing
  return 0;
M
Minghao Li 已提交
51 52 53

#if 0

54 55 56 57 58 59 60 61
  SWal   *pWal = pMnode->syncMgmt.pWal;
  SSdb   *pSdb = pMnode->pSdb;
  int64_t lastSdbVer = sdbUpdateVer(pSdb, 0);
  int32_t code = -1;

  SWalReadHandle *pHandle = walOpenReadHandle(pWal);
  if (pHandle == NULL) return -1;

S
Shengliang Guan 已提交
62 63
  int64_t first = walGetFirstVer(pWal);
  int64_t last = walGetLastVer(pWal);
64
  mDebug("start to restore wal, sdbver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last);
65

dengyihao's avatar
dengyihao 已提交
66
  first = TMAX(lastSdbVer + 1, first);
S
Shengliang Guan 已提交
67
  for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) {
68
    if (walReadWithHandle(pHandle, ver) < 0) {
69 70
      mError("ver:%" PRId64 ", failed to read from wal since %s", ver, terrstr());
      goto _OVER;
71 72 73 74 75 76
    }

    SWalHead *pHead = pHandle->pHead;
    int64_t   sdbVer = sdbUpdateVer(pSdb, 0);
    if (sdbVer + 1 != ver) {
      terrno = TSDB_CODE_SDB_INVALID_WAl_VER;
77 78
      mError("ver:%" PRId64 ", failed to write to sdb, since inconsistent with sdbver:%" PRId64, ver, sdbVer);
      goto _OVER;
79 80
    }

81
    mTrace("ver:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body);
82
    if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) {
83 84
      mError("ver:%" PRId64 ", failed to write to sdb since %s", ver, terrstr());
      goto _OVER;
85 86 87
    }

    sdbUpdateVer(pSdb, 1);
88
    mDebug("ver:%" PRId64 ", is restored", ver);
89 90 91
  }

  int64_t sdbVer = sdbUpdateVer(pSdb, 0);
92
  mDebug("restore wal finished, sdbver:%" PRId64, sdbVer);
S
Shengliang Guan 已提交
93

S
Shengliang Guan 已提交
94
  mndTransPullup(pMnode);
95
  sdbVer = sdbUpdateVer(pSdb, 0);
96
  mDebug("pullup trans finished, sdbver:%" PRId64, sdbVer);
S
Shengliang Guan 已提交
97

S
Shengliang Guan 已提交
98
  if (sdbVer != lastSdbVer) {
S
Shengliang Guan 已提交
99
    mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
S
Shengliang Guan 已提交
100
    if (sdbWriteFile(pSdb) != 0) {
101
      goto _OVER;
102 103
    }

S
Shengliang Guan 已提交
104
    if (walCommit(pWal, sdbVer) != 0) {
105
      goto _OVER;
S
Shengliang Guan 已提交
106 107
    }

L
Liu Jicong 已提交
108
    if (walBeginSnapshot(pWal, sdbVer) < 0) {
109
      goto _OVER;
L
Liu Jicong 已提交
110 111 112
    }

    if (walEndSnapshot(pWal) < 0) {
113
      goto _OVER;
L
Liu Jicong 已提交
114
    }
L
Liu Jicong 已提交
115
  }
116 117 118

  code = 0;

119
_OVER:
120
  walCloseReadHandle(pHandle);
S
Shengliang Guan 已提交
121
  return code;
M
Minghao Li 已提交
122 123 124 125

#endif
}

126
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
M
Minghao Li 已提交
127

128
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
M
Minghao Li 已提交
129

130
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
131 132
  SyncIndex beginIndex = SYNC_INDEX_INVALID;
  if (pFsm->FpGetSnapshot != NULL) {
133
    SSnapshot snapshot = {0};
M
Minghao Li 已提交
134 135 136 137 138 139 140 141
    pFsm->FpGetSnapshot(pFsm, &snapshot);
    beginIndex = snapshot.lastApplyIndex;
  }

  if (cbMeta.index > beginIndex) {
    SMnode    *pMnode = pFsm->data;
    SSyncMgmt *pMgmt = &pMnode->syncMgmt;

142 143 144
    SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg;
    pApplyMsg->info.node = pFsm->data;
    mndProcessApplyMsg(pApplyMsg);
145
    sdbUpdateVer(pMnode->pSdb, 1);
M
Minghao Li 已提交
146 147 148 149 150 151 152

    if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
      tsem_post(&pMgmt->syncSem);
    }
  }
}

153
void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
154 155 156
  // strict consistent, do nothing
}

157
void mndSyncRollBackMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
158 159 160
  // strict consistent, do nothing
}

161
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
M
Minghao Li 已提交
162 163 164 165
  // snapshot
  return 0;
}

166 167
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
168
  pFsm->data = pMnode;
169 170 171 172
  pFsm->FpCommitCb = mndSyncCommitMsg;
  pFsm->FpPreCommitCb = mndSyncPreCommitMsg;
  pFsm->FpRollBackCb = mndSyncRollBackMsg;
  pFsm->FpGetSnapshot = mndSyncGetSnapshot;
M
Minghao Li 已提交
173
  return pFsm;
174 175 176 177 178 179 180 181 182 183 184
}

int32_t mndInitSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  tsem_init(&pMgmt->syncSem, 0, 0);

  if (mndInitWal(pMnode) < 0) {
    mError("failed to open wal since %s", terrstr());
    return -1;
  }

185
  SSyncInfo syncInfo = {.vgId = 1};
M
Minghao Li 已提交
186 187 188
  SSyncCfg *pCfg = &(syncInfo.syncCfg);
  pCfg->replicaNum = pMnode->replica;
  pCfg->myIndex = pMnode->selfIndex;
189 190 191 192
  for (int32_t i = 0; i < pMnode->replica; ++i) {
    SNodeInfo *pNodeInfo = &pCfg->nodeInfo[i];
    tstrncpy(pNodeInfo->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNodeInfo->nodeFqdn));
    pNodeInfo->nodePort = pMnode->replicas[i].port;
M
Minghao Li 已提交
193
  }
194
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
M
Minghao Li 已提交
195
  syncInfo.pWal = pMnode->syncMgmt.pWal;
196 197
  syncInfo.pFsm = mndSyncMakeFsm(pMnode);
  syncInfo.FpSendMsg = mndSyncSendMsg;
M
Minghao Li 已提交
198 199 200 201 202
  syncInfo.FpEqMsg = mndSyncEqMsg;

  pMnode->syncMgmt.sync = syncOpen(&syncInfo);
  ASSERT(pMnode->syncMgmt.sync > 0);

S
Shengliang Guan 已提交
203 204 205 206 207
  return 0;
}

void mndCleanupSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
208 209
  tsem_destroy(&pMgmt->syncSem);
  mndCloseWal(pMnode);
S
Shengliang Guan 已提交
210 211 212 213 214 215 216 217 218 219 220
}

static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
  SMnode    *pMnode = pData;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;

  pMgmt->errCode = 0;
  tsem_post(&pMgmt->syncSem);

  return 0;
}
221 222

int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
223 224 225
  SWal *pWal = pMnode->syncMgmt.pWal;
  SSdb *pSdb = pMnode->pSdb;

M
Minghao Li 已提交
226
#if 0
S
Shengliang Guan 已提交
227 228 229
  int64_t ver = sdbUpdateVer(pSdb, 1);
  if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
    sdbUpdateVer(pSdb, -1);
230
    mError("ver:%" PRId64 ", failed to write raw:%p to wal since %s", ver, pRaw, terrstr());
S
Shengliang Guan 已提交
231 232 233
    return -1;
  }

234
  mTrace("ver:%" PRId64 ", write to wal, raw:%p", ver, pRaw);
235
  walCommit(pWal, ver);
S
Shengliang Guan 已提交
236 237
  walFsync(pWal, true);

S
Shengliang Guan 已提交
238
  return 0;
M
Minghao Li 已提交
239

S
Shengliang Guan 已提交
240
#else
M
Minghao Li 已提交
241

S
Shengliang Guan 已提交
242 243 244
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  pMgmt->errCode = 0;

245 246 247 248 249
  SRpcMsg rsp = {0};
  rsp.code = TDMT_MND_APPLY_MSG;
  rsp.contLen = sdbGetRawTotalSize(pRaw);
  rsp.pCont = rpcMallocCont(rsp.contLen);
  memcpy(rsp.pCont, pRaw, rsp.contLen);
S
Shengliang Guan 已提交
250 251

  bool    isWeak = false;
252 253 254 255 256 257 258 259 260 261 262 263 264
  int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak);
  if (code == 0) {
    tsem_wait(&pMgmt->syncSem);
  } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
    terrno = TSDB_CODE_APP_NOT_READY;
    mError("failed to propose raw:%p since not leader", pRaw);
    return -1;
  } else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    mError("failed to propose raw:%p since sync internal error", pRaw);
  } else {
    assert(0);
  }
265

S
Shengliang Guan 已提交
266 267 268
  if (code != 0) return code;
  return pMgmt->errCode;
#endif
269 270
}

S
Shengliang Guan 已提交
271
bool mndIsMaster(SMnode *pMnode) {
S
Shengliang Guan 已提交
272
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
273
  pMgmt->state = syncGetMyRole(pMgmt->sync);
S
Shengliang Guan 已提交
274
  return pMgmt->state == TAOS_SYNC_STATE_LEADER;
L
Liu Jicong 已提交
275
}