syncEnv.c 4.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncEnv.h"
S
Shengliang Guan 已提交
18
#include "tref.h"
M
Minghao Li 已提交
19

S
Shengliang Guan 已提交
20 21 22 23
static SSyncEnv gSyncEnv = {0};
static int32_t  gNodeRefId = -1;
bool            gRaftDetailLog = false;
static void     syncEnvTick(void *param, void *tmrId);
M
Minghao Li 已提交
24

S
Shengliang Guan 已提交
25
SSyncEnv *syncEnv() { return &gSyncEnv; }
M
Minghao Li 已提交
26

S
Shengliang Guan 已提交
27
bool syncIsInit() { return atomic_load_8(&gSyncEnv.isStart); }
28

S
Shengliang Guan 已提交
29 30
int32_t syncInit() {
  if (syncIsInit()) return 0;
31

32 33
  uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF);
  taosSeedRand(seed);
M
Minghao Li 已提交
34

S
Shengliang Guan 已提交
35 36 37 38 39 40
  memset(&gSyncEnv, 0, sizeof(SSyncEnv));
  gSyncEnv.envTickTimerCounter = 0;
  gSyncEnv.envTickTimerMS = ENV_TICK_TIMER_MS;
  gSyncEnv.FpEnvTickTimer = syncEnvTick;
  atomic_store_64(&gSyncEnv.envTickTimerLogicClock, 0);
  atomic_store_64(&gSyncEnv.envTickTimerLogicClockUser, 0);
M
Minghao Li 已提交
41

S
Shengliang Guan 已提交
42 43 44 45 46 47 48 49 50 51
  // start tmr thread
  gSyncEnv.pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
  atomic_store_8(&gSyncEnv.isStart, 1);

  gNodeRefId = taosOpenRef(200, (RefFp)syncNodeClose);
  if (gNodeRefId < 0) {
    sError("failed to init node ref");
    syncCleanUp();
    return -1;
  }
M
Minghao Li 已提交
52

S
Shengliang Guan 已提交
53 54
  sDebug("sync rsetId:%d is open", gNodeRefId);
  return 0;
M
Minghao Li 已提交
55
}
M
Minghao Li 已提交
56

S
Shengliang Guan 已提交
57 58 59 60
void syncCleanUp() {
  atomic_store_8(&gSyncEnv.isStart, 0);
  taosTmrCleanUp(gSyncEnv.pTimerManager);
  memset(&gSyncEnv, 0, sizeof(SSyncEnv));
M
Minghao Li 已提交
61

S
Shengliang Guan 已提交
62 63 64 65
  if (gNodeRefId != -1) {
    sDebug("sync rsetId:%d is closed", gNodeRefId);
    taosCloseRef(gNodeRefId);
    gNodeRefId = -1;
M
Minghao Li 已提交
66
  }
M
Minghao Li 已提交
67 68
}

S
Shengliang Guan 已提交
69 70 71
int64_t syncNodeAdd(SSyncNode *pNode) {
  pNode->rid = taosAddRef(gNodeRefId, pNode);
  if (pNode->rid < 0) return -1;
M
Minghao Li 已提交
72

S
Shengliang Guan 已提交
73 74 75
  sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId);
  return 0;
}
76

S
Shengliang Guan 已提交
77 78 79
void syncNodeRemove(SSyncNode *pNode) {
  taosRemoveRef(gNodeRefId, pNode->rid);
  sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId);
M
Minghao Li 已提交
80 81
}

S
Shengliang Guan 已提交
82 83 84 85
SSyncNode *syncNodeAcquire(int64_t rid) {
  SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid);
  if (pNode == NULL) {
    sTrace("failed to acquire node from refId:%" PRId64, rid);
L
Li Minghao 已提交
86
  }
S
Shengliang Guan 已提交
87 88

  return pNode;
M
Minghao Li 已提交
89 90
}

S
Shengliang Guan 已提交
91 92 93 94 95 96 97
void syncNodeRelease(SSyncNode *pNode) { taosReleaseRef(gNodeRefId, pNode->rid); }

#if 0
void syncEnvStartTimer() {
  taosTmrReset(gSyncEnv.FpEnvTickTimer, gSyncEnv.envTickTimerMS, &gSyncEnv, gSyncEnv.pTimerManager,
               &gSyncEnv.pEnvTickTimer);
  atomic_store_64(&gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerLogicClockUser);
M
Minghao Li 已提交
98
}
M
Minghao Li 已提交
99

S
Shengliang Guan 已提交
100
void syncEnvStopTimer() {
M
Minghao Li 已提交
101
  int32_t ret = 0;
S
Shengliang Guan 已提交
102 103 104
  atomic_add_fetch_64(&gSyncEnv.envTickTimerLogicClockUser, 1);
  taosTmrStop(gSyncEnv.pEnvTickTimer);
  gSyncEnv.pEnvTickTimer = NULL;
M
Minghao Li 已提交
105
  return ret;
M
Minghao Li 已提交
106
}
S
Shengliang Guan 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
#endif

static void syncEnvTick(void *param, void *tmrId) {
  SSyncEnv *pSyncEnv = param;
  if (atomic_load_64(&gSyncEnv.envTickTimerLogicClockUser) <= atomic_load_64(&gSyncEnv.envTickTimerLogicClock)) {
    gSyncEnv.envTickTimerCounter++;
    sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
           ", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p",
           gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter,
           gSyncEnv.envTickTimerMS, tmrId);

    // do something, tick ...
    taosTmrReset(syncEnvTick, gSyncEnv.envTickTimerMS, pSyncEnv, gSyncEnv.pTimerManager, &gSyncEnv.pEnvTickTimer);
  } else {
    sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
           ", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p",
           gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter,
           gSyncEnv.envTickTimerMS, tmrId);
  }
}