syncEnv.c 5.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncEnv.h"
S
Shengliang Guan 已提交
18
#include "syncUtil.h"
S
Shengliang Guan 已提交
19
#include "tref.h"
M
Minghao Li 已提交
20

S
Shengliang Guan 已提交
21 22
static SSyncEnv gSyncEnv = {0};
static int32_t  gNodeRefId = -1;
23
static int32_t  gHbDataRefId = -1;
S
Shengliang Guan 已提交
24
static void     syncEnvTick(void *param, void *tmrId);
M
Minghao Li 已提交
25

S
Shengliang Guan 已提交
26
SSyncEnv *syncEnv() { return &gSyncEnv; }
M
Minghao Li 已提交
27

S
Shengliang Guan 已提交
28
bool syncIsInit() { return atomic_load_8(&gSyncEnv.isStart); }
29

S
Shengliang Guan 已提交
30 31
int32_t syncInit() {
  if (syncIsInit()) return 0;
32

33 34
  uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF);
  taosSeedRand(seed);
M
Minghao Li 已提交
35

S
Shengliang Guan 已提交
36 37 38 39 40 41
  memset(&gSyncEnv, 0, sizeof(SSyncEnv));
  gSyncEnv.envTickTimerCounter = 0;
  gSyncEnv.envTickTimerMS = ENV_TICK_TIMER_MS;
  gSyncEnv.FpEnvTickTimer = syncEnvTick;
  atomic_store_64(&gSyncEnv.envTickTimerLogicClock, 0);
  atomic_store_64(&gSyncEnv.envTickTimerLogicClockUser, 0);
M
Minghao Li 已提交
42

S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52
  // start tmr thread
  gSyncEnv.pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
  atomic_store_8(&gSyncEnv.isStart, 1);

  gNodeRefId = taosOpenRef(200, (RefFp)syncNodeClose);
  if (gNodeRefId < 0) {
    sError("failed to init node ref");
    syncCleanUp();
    return -1;
  }
M
Minghao Li 已提交
53

54 55 56 57 58 59 60
  gHbDataRefId = taosOpenRef(200, (RefFp)syncHbTimerDataFree);
  if (gHbDataRefId < 0) {
    sError("failed to init hb-data ref");
    syncCleanUp();
    return -1;
  }

S
Shengliang Guan 已提交
61 62
  sDebug("sync rsetId:%d is open", gNodeRefId);
  return 0;
M
Minghao Li 已提交
63
}
M
Minghao Li 已提交
64

S
Shengliang Guan 已提交
65 66 67 68
void syncCleanUp() {
  atomic_store_8(&gSyncEnv.isStart, 0);
  taosTmrCleanUp(gSyncEnv.pTimerManager);
  memset(&gSyncEnv, 0, sizeof(SSyncEnv));
M
Minghao Li 已提交
69

S
Shengliang Guan 已提交
70 71 72 73
  if (gNodeRefId != -1) {
    sDebug("sync rsetId:%d is closed", gNodeRefId);
    taosCloseRef(gNodeRefId);
    gNodeRefId = -1;
M
Minghao Li 已提交
74
  }
75 76 77 78 79 80

  if (gHbDataRefId != -1) {
    sDebug("sync rsetId:%d is closed", gHbDataRefId);
    taosCloseRef(gHbDataRefId);
    gHbDataRefId = -1;
  }
M
Minghao Li 已提交
81 82
}

S
Shengliang Guan 已提交
83 84 85
int64_t syncNodeAdd(SSyncNode *pNode) {
  pNode->rid = taosAddRef(gNodeRefId, pNode);
  if (pNode->rid < 0) return -1;
M
Minghao Li 已提交
86

S
Shengliang Guan 已提交
87
  sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId);
S
Shengliang Guan 已提交
88
  return pNode->rid;
S
Shengliang Guan 已提交
89
}
M
Minghao Li 已提交
90

S
Shengliang Guan 已提交
91
void syncNodeRemove(int64_t rid) { taosRemoveRef(gNodeRefId, rid); }
92

S
Shengliang Guan 已提交
93 94 95
SSyncNode *syncNodeAcquire(int64_t rid) {
  SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid);
  if (pNode == NULL) {
96
    sError("failed to acquire node from refId:%" PRId64, rid);
S
Shengliang Guan 已提交
97
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
L
Li Minghao 已提交
98
  }
S
Shengliang Guan 已提交
99 100

  return pNode;
M
Minghao Li 已提交
101 102
}

103 104
void syncNodeRelease(SSyncNode *pNode) {
  if (pNode) taosReleaseRef(gNodeRefId, pNode->rid);
M
Minghao Li 已提交
105 106
}

107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
  pData->rid = taosAddRef(gHbDataRefId, pData);
  if (pData->rid < 0) return -1;
  return pData->rid;
}

void syncHbTimerDataRemove(int64_t rid) { taosRemoveRef(gHbDataRefId, rid); }

SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) {
  SSyncHbTimerData *pData = taosAcquireRef(gHbDataRefId, rid);
  if (pData == NULL) {
    sError("failed to acquire hb-timer-data from refId:%" PRId64, rid);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }

  return pData;
}

void syncHbTimerDataRelease(SSyncHbTimerData *pData) { taosReleaseRef(gHbDataRefId, pData->rid); }

S
Shengliang Guan 已提交
127 128 129 130 131
#if 0
void syncEnvStartTimer() {
  taosTmrReset(gSyncEnv.FpEnvTickTimer, gSyncEnv.envTickTimerMS, &gSyncEnv, gSyncEnv.pTimerManager,
               &gSyncEnv.pEnvTickTimer);
  atomic_store_64(&gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerLogicClockUser);
M
Minghao Li 已提交
132
}
M
Minghao Li 已提交
133

S
Shengliang Guan 已提交
134
void syncEnvStopTimer() {
M
Minghao Li 已提交
135
  int32_t ret = 0;
S
Shengliang Guan 已提交
136 137 138
  atomic_add_fetch_64(&gSyncEnv.envTickTimerLogicClockUser, 1);
  taosTmrStop(gSyncEnv.pEnvTickTimer);
  gSyncEnv.pEnvTickTimer = NULL;
M
Minghao Li 已提交
139
  return ret;
M
Minghao Li 已提交
140
}
S
Shengliang Guan 已提交
141 142 143
#endif

static void syncEnvTick(void *param, void *tmrId) {
144
#if 0
S
Shengliang Guan 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
  SSyncEnv *pSyncEnv = param;
  if (atomic_load_64(&gSyncEnv.envTickTimerLogicClockUser) <= atomic_load_64(&gSyncEnv.envTickTimerLogicClock)) {
    gSyncEnv.envTickTimerCounter++;
    sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
           ", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p",
           gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter,
           gSyncEnv.envTickTimerMS, tmrId);

    // do something, tick ...
    taosTmrReset(syncEnvTick, gSyncEnv.envTickTimerMS, pSyncEnv, gSyncEnv.pTimerManager, &gSyncEnv.pEnvTickTimer);
  } else {
    sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
           ", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p",
           gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter,
           gSyncEnv.envTickTimerMS, tmrId);
  }
161
#endif
S
Shengliang Guan 已提交
162
}