mndSync.c 7.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndSync.h"
18
#include "mndTrans.h"
19

20
static int32_t mndInitWal(SMnode *pMnode) {
S
Shengliang Guan 已提交
21 22
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;

S
Shengliang Guan 已提交
23 24
  char path[PATH_MAX] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
25 26 27 28 29 30 31 32 33
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };
S
Shengliang Guan 已提交
34
  pMgmt->pWal = walOpen(path, &cfg);
35 36 37 38 39 40 41 42 43 44 45 46 47 48
  if (pMgmt->pWal == NULL) return -1;

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  if (pMgmt->pWal != NULL) {
    walClose(pMgmt->pWal);
    pMgmt->pWal = NULL;
  }
}

static int32_t mndRestoreWal(SMnode *pMnode) {
M
Minghao Li 已提交
49 50 51 52 53 54

// do nothing
return 0;

#if 0

55 56 57 58 59 60 61 62
  SWal   *pWal = pMnode->syncMgmt.pWal;
  SSdb   *pSdb = pMnode->pSdb;
  int64_t lastSdbVer = sdbUpdateVer(pSdb, 0);
  int32_t code = -1;

  SWalReadHandle *pHandle = walOpenReadHandle(pWal);
  if (pHandle == NULL) return -1;

S
Shengliang Guan 已提交
63 64
  int64_t first = walGetFirstVer(pWal);
  int64_t last = walGetLastVer(pWal);
65
  mDebug("start to restore wal, sdbver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last);
66

dengyihao's avatar
dengyihao 已提交
67
  first = TMAX(lastSdbVer + 1, first);
S
Shengliang Guan 已提交
68
  for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) {
69
    if (walReadWithHandle(pHandle, ver) < 0) {
70 71
      mError("ver:%" PRId64 ", failed to read from wal since %s", ver, terrstr());
      goto _OVER;
72 73 74 75 76 77
    }

    SWalHead *pHead = pHandle->pHead;
    int64_t   sdbVer = sdbUpdateVer(pSdb, 0);
    if (sdbVer + 1 != ver) {
      terrno = TSDB_CODE_SDB_INVALID_WAl_VER;
78 79
      mError("ver:%" PRId64 ", failed to write to sdb, since inconsistent with sdbver:%" PRId64, ver, sdbVer);
      goto _OVER;
80 81
    }

82
    mTrace("ver:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body);
83
    if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) {
84 85
      mError("ver:%" PRId64 ", failed to write to sdb since %s", ver, terrstr());
      goto _OVER;
86 87 88
    }

    sdbUpdateVer(pSdb, 1);
89
    mDebug("ver:%" PRId64 ", is restored", ver);
90 91 92
  }

  int64_t sdbVer = sdbUpdateVer(pSdb, 0);
93
  mDebug("restore wal finished, sdbver:%" PRId64, sdbVer);
S
Shengliang Guan 已提交
94

S
Shengliang Guan 已提交
95
  mndTransPullup(pMnode);
96
  sdbVer = sdbUpdateVer(pSdb, 0);
97
  mDebug("pullup trans finished, sdbver:%" PRId64, sdbVer);
S
Shengliang Guan 已提交
98

S
Shengliang Guan 已提交
99
  if (sdbVer != lastSdbVer) {
S
Shengliang Guan 已提交
100
    mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
S
Shengliang Guan 已提交
101
    if (sdbWriteFile(pSdb) != 0) {
102
      goto _OVER;
103 104
    }

S
Shengliang Guan 已提交
105
    if (walCommit(pWal, sdbVer) != 0) {
106
      goto _OVER;
S
Shengliang Guan 已提交
107 108
    }

L
Liu Jicong 已提交
109
    if (walBeginSnapshot(pWal, sdbVer) < 0) {
110
      goto _OVER;
L
Liu Jicong 已提交
111 112 113
    }

    if (walEndSnapshot(pWal) < 0) {
114
      goto _OVER;
L
Liu Jicong 已提交
115
    }
L
Liu Jicong 已提交
116
  }
117 118 119

  code = 0;

120
_OVER:
121
  walCloseReadHandle(pHandle);
S
Shengliang Guan 已提交
122
  return code;
M
Minghao Li 已提交
123 124 125 126 127

#endif

}

128
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
M
Minghao Li 已提交
129

130
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
M
Minghao Li 已提交
131

132
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
  SyncIndex beginIndex = SYNC_INDEX_INVALID;
  if (pFsm->FpGetSnapshot != NULL) {
    SSnapshot snapshot;
    pFsm->FpGetSnapshot(pFsm, &snapshot);
    beginIndex = snapshot.lastApplyIndex;
  }

  if (cbMeta.index > beginIndex) {
    SMnode    *pMnode = pFsm->data;
    SSyncMgmt *pMgmt = &pMnode->syncMgmt;

    mndProcessApplyMsg((SRpcMsg*)pMsg);
    //mmPutNodeMsgToApplyQueue(pMnode->pWrapper->pMgmt, pMsg);

    if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
      tsem_post(&pMgmt->syncSem);
    }
  }
}

153
void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
154 155 156
  // strict consistent, do nothing
}

157
void mndSyncRollBackMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
158 159 160
  // strict consistent, do nothing
}

161
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
M
Minghao Li 已提交
162 163 164 165
  // snapshot
  return 0;
}

166 167
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
168
  pFsm->data = pMnode;
169 170 171 172
  pFsm->FpCommitCb = mndSyncCommitMsg;
  pFsm->FpPreCommitCb = mndSyncPreCommitMsg;
  pFsm->FpRollBackCb = mndSyncRollBackMsg;
  pFsm->FpGetSnapshot = mndSyncGetSnapshot;
M
Minghao Li 已提交
173
  return pFsm;
174 175 176 177 178 179 180 181 182 183 184
}

int32_t mndInitSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  tsem_init(&pMgmt->syncSem, 0, 0);

  if (mndInitWal(pMnode) < 0) {
    mError("failed to open wal since %s", terrstr());
    return -1;
  }

185 186 187
  if (pMnode->selfId == 1) {
    pMgmt->state = TAOS_SYNC_STATE_LEADER;
  }
188 189

  SSyncInfo syncInfo = {.vgId = 1};
M
Minghao Li 已提交
190 191 192 193 194 195 196 197 198
  SSyncCfg *pCfg = &(syncInfo.syncCfg);
  pCfg->replicaNum = pMnode->replica;
  pCfg->myIndex = pMnode->selfIndex;
  for (int i = 0; i < pMnode->replica; ++i) {
    snprintf((pCfg->nodeInfo)->nodeFqdn, sizeof((pCfg->nodeInfo)->nodeFqdn), "%s", (pMnode->replicas)[i].fqdn);
    (pCfg->nodeInfo)->nodePort = (pMnode->replicas)[i].port;
  }
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pMnode->path);
  syncInfo.pWal = pMnode->syncMgmt.pWal;
199 200
  syncInfo.pFsm = mndSyncMakeFsm(pMnode);
  syncInfo.FpSendMsg = mndSyncSendMsg;
M
Minghao Li 已提交
201 202 203 204 205
  syncInfo.FpEqMsg = mndSyncEqMsg;

  pMnode->syncMgmt.sync = syncOpen(&syncInfo);
  ASSERT(pMnode->syncMgmt.sync > 0);

S
Shengliang Guan 已提交
206 207 208 209 210
  return 0;
}

void mndCleanupSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
211 212
  tsem_destroy(&pMgmt->syncSem);
  mndCloseWal(pMnode);
S
Shengliang Guan 已提交
213 214 215 216 217 218 219 220 221 222 223
}

static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
  SMnode    *pMnode = pData;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;

  pMgmt->errCode = 0;
  tsem_post(&pMgmt->syncSem);

  return 0;
}
224 225

int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
226 227 228
  SWal *pWal = pMnode->syncMgmt.pWal;
  SSdb *pSdb = pMnode->pSdb;

M
Minghao Li 已提交
229
#if 0
S
Shengliang Guan 已提交
230 231 232
  int64_t ver = sdbUpdateVer(pSdb, 1);
  if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
    sdbUpdateVer(pSdb, -1);
233
    mError("ver:%" PRId64 ", failed to write raw:%p to wal since %s", ver, pRaw, terrstr());
S
Shengliang Guan 已提交
234 235 236
    return -1;
  }

237
  mTrace("ver:%" PRId64 ", write to wal, raw:%p", ver, pRaw);
238
  walCommit(pWal, ver);
S
Shengliang Guan 已提交
239 240
  walFsync(pWal, true);

S
Shengliang Guan 已提交
241
  return 0;
M
Minghao Li 已提交
242

S
Shengliang Guan 已提交
243
#else
M
Minghao Li 已提交
244

S
Shengliang Guan 已提交
245 246 247 248 249
  if (pMnode->replica == 1) return 0;

  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  pMgmt->errCode = 0;

M
Minghao Li 已提交
250 251 252 253 254 255 256
  //SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)};

  SRpcMsg rpcMsg;
  rpcMsg.code = TDMT_MND_APPLY_MSG;
  rpcMsg.contLen = sdbGetRawTotalSize(pRaw);
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  memcpy(rpcMsg.pCont, pRaw, rpcMsg.contLen);
S
Shengliang Guan 已提交
257 258

  bool    isWeak = false;
M
Minghao Li 已提交
259
  int32_t code = syncPropose(pMgmt->sync, &rpcMsg, isWeak);
260

S
Shengliang Guan 已提交
261
  if (code != 0) return code;
262

S
Shengliang Guan 已提交
263 264 265
  tsem_wait(&pMgmt->syncSem);
  return pMgmt->errCode;
#endif
M
Minghao Li 已提交
266

267 268
}

S
Shengliang Guan 已提交
269
bool mndIsMaster(SMnode *pMnode) {
S
Shengliang Guan 已提交
270 271
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  return pMgmt->state == TAOS_SYNC_STATE_LEADER;
L
Liu Jicong 已提交
272
}