mndCluster.c 10.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndCluster.h"
S
Shengliang Guan 已提交
18
#include "mndShow.h"
19
#include "mndTrans.h"
S
Shengliang Guan 已提交
20

21
#define CLUSTER_VER_NUMBE    1
H
Haojun Liao 已提交
22
#define CLUSTER_RESERVE_SIZE 60
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24 25 26 27
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster);
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw);
static int32_t  mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster);
static int32_t  mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster);
S
Shengliang Guan 已提交
28
static int32_t  mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster);
S
Shengliang Guan 已提交
29
static int32_t  mndCreateDefaultCluster(SMnode *pMnode);
S
Shengliang Guan 已提交
30
static int32_t  mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
31
static void     mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
H
Haojun Liao 已提交
32
static int32_t  mndProcessUptimeTimer(SRpcMsg *pReq);
S
Shengliang Guan 已提交
33 34

int32_t mndInitCluster(SMnode *pMnode) {
35 36 37 38 39 40 41 42 43 44
  SSdbTable table = {
      .sdbType = SDB_CLUSTER,
      .keyType = SDB_KEY_INT64,
      .deployFp = (SdbDeployFp)mndCreateDefaultCluster,
      .encodeFp = (SdbEncodeFp)mndClusterActionEncode,
      .decodeFp = (SdbDecodeFp)mndClusterActionDecode,
      .insertFp = (SdbInsertFp)mndClusterActionInsert,
      .updateFp = (SdbUpdateFp)mndClusterActionUpdate,
      .deleteFp = (SdbDeleteFp)mndClusterActionDelete,
  };
S
Shengliang Guan 已提交
45

H
Haojun Liao 已提交
46
  mndSetMsgHandle(pMnode, TDMT_MND_UPTIME_TIMER, mndProcessUptimeTimer);
S
Shengliang Guan 已提交
47 48
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
H
Haojun Liao 已提交
49

S
Shengliang Guan 已提交
50 51 52 53 54
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupCluster(SMnode *pMnode) {}

S
Shengliang Guan 已提交
55 56 57 58
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) {
  SSdb *pSdb = pMnode->pSdb;

  SClusterObj *pCluster = sdbAcquire(pSdb, SDB_CLUSTER, &pMnode->clusterId);
S
Shengliang Guan 已提交
59
  if (pCluster == NULL) {
S
Shengliang Guan 已提交
60 61 62 63 64 65 66 67
    return -1;
  }

  tstrncpy(clusterName, pCluster->name, len);
  sdbRelease(pSdb, pCluster);
  return 0;
}

H
Haojun Liao 已提交
68 69 70
static SClusterObj *mndAcquireCluster(SMnode *pMnode) {
  SSdb *pSdb = pMnode->pSdb;
  void *pIter = NULL;
S
shm  
Shengliang Guan 已提交
71 72 73 74 75 76

  while (1) {
    SClusterObj *pCluster = NULL;
    pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster);
    if (pIter == NULL) break;

H
Haojun Liao 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
    return pCluster;
  }

  return NULL;
}

static void mndReleaseCluster(SMnode *pMnode, SClusterObj *pCluster) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pCluster);
}

int64_t mndGetClusterId(SMnode *pMnode) {
  int64_t      clusterId = 0;
  SClusterObj *pCluster = mndAcquireCluster(pMnode);
  if (pCluster != NULL) {
S
shm  
Shengliang Guan 已提交
92
    clusterId = pCluster->id;
H
Haojun Liao 已提交
93
    mndReleaseCluster(pMnode, pCluster);
S
shm  
Shengliang Guan 已提交
94 95 96 97 98
  }

  return clusterId;
}

C
Cary Xu 已提交
99
int64_t mndGetClusterCreateTime(SMnode *pMnode) {
H
Haojun Liao 已提交
100 101 102
  int64_t      createTime = 0;
  SClusterObj *pCluster = mndAcquireCluster(pMnode);
  if (pCluster != NULL) {
C
Cary Xu 已提交
103
    createTime = pCluster->createdTime;
H
Haojun Liao 已提交
104
    mndReleaseCluster(pMnode, pCluster);
C
Cary Xu 已提交
105 106 107 108 109
  }

  return createTime;
}

H
Haojun Liao 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
static int32_t mndGetClusterUpTimeImp(SClusterObj *pCluster) {
#if 0
  int32_t upTime = taosGetTimestampSec() - pCluster->updateTime / 1000;
  upTime = upTime + pCluster->upTime;
  return upTime;
#else
  return pCluster->upTime;
#endif
}

float mndGetClusterUpTime(SMnode *pMnode) {
  int64_t      upTime = 0;
  SClusterObj *pCluster = mndAcquireCluster(pMnode);
  if (pCluster != NULL) {
    upTime = mndGetClusterUpTimeImp(pCluster);
    mndReleaseCluster(pMnode, pCluster);
  }

  return upTime / 86400.0f;
}

S
Shengliang Guan 已提交
131
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
132 133
  terrno = TSDB_CODE_OUT_OF_MEMORY;

134 135
  SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + CLUSTER_RESERVE_SIZE);
  if (pRaw == NULL) goto _OVER;
S
Shengliang Guan 已提交
136 137

  int32_t dataPos = 0;
138 139 140 141
  SDB_SET_INT64(pRaw, dataPos, pCluster->id, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER)
  SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
H
Haojun Liao 已提交
142
  SDB_SET_INT32(pRaw, dataPos, pCluster->upTime, _OVER)
143
  SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
144 145 146

  terrno = 0;

147
_OVER:
148 149 150 151 152
  if (terrno != 0) {
    mError("cluster:%" PRId64 ", failed to encode to raw:%p since %s", pCluster->id, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }
S
Shengliang Guan 已提交
153

154
  mTrace("cluster:%" PRId64 ", encode to raw:%p, row:%p", pCluster->id, pRaw, pCluster);
S
Shengliang Guan 已提交
155 156 157 158
  return pRaw;
}

static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
159 160
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
161
  int8_t sver = 0;
162
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
S
Shengliang Guan 已提交
163

164
  if (sver != CLUSTER_VER_NUMBE) {
S
Shengliang Guan 已提交
165
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
166
    goto _OVER;
S
Shengliang Guan 已提交
167 168
  }

169
  SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj));
170
  if (pRow == NULL) goto _OVER;
171

S
Shengliang Guan 已提交
172
  SClusterObj *pCluster = sdbGetRowObj(pRow);
173
  if (pCluster == NULL) goto _OVER;
S
Shengliang Guan 已提交
174 175

  int32_t dataPos = 0;
176 177 178 179
  SDB_GET_INT64(pRaw, dataPos, &pCluster->id, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER)
  SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
H
Haojun Liao 已提交
180
  SDB_GET_INT32(pRaw, dataPos, &pCluster->upTime, _OVER)
181
  SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
182 183 184

  terrno = 0;

185
_OVER:
186 187
  if (terrno != 0) {
    mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster->id, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
188
    taosMemoryFreeClear(pRow);
189 190
    return NULL;
  }
S
Shengliang Guan 已提交
191

192
  mTrace("cluster:%" PRId64 ", decode from raw:%p, row:%p", pCluster->id, pRaw, pCluster);
S
Shengliang Guan 已提交
193 194 195 196
  return pRow;
}

static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) {
197
  mTrace("cluster:%" PRId64 ", perform insert action, row:%p", pCluster->id, pCluster);
S
Shengliang Guan 已提交
198
  pSdb->pMnode->clusterId = pCluster->id;
H
Haojun Liao 已提交
199
  pCluster->updateTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
200 201 202 203
  return 0;
}

static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) {
204
  mTrace("cluster:%" PRId64 ", perform delete action, row:%p", pCluster->id, pCluster);
S
Shengliang Guan 已提交
205 206 207
  return 0;
}

208
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj *pNew) {
H
Haojun Liao 已提交
209 210 211 212
  mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p, uptime from %d to %d", pOld->id, pOld,
         pNew, pOld->upTime, pNew->upTime);
  pOld->upTime = pNew->upTime;
  pOld->updateTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
213 214 215 216 217 218 219 220
  return 0;
}

static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
  SClusterObj clusterObj = {0};
  clusterObj.createdTime = taosGetTimestampMs();
  clusterObj.updateTime = clusterObj.createdTime;

H
Haojun Liao 已提交
221
  int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN);
S
Shengliang Guan 已提交
222
  if (code != 0) {
223
    strcpy(clusterObj.name, "tdengine3.0");
S
Shengliang Guan 已提交
224
    mError("failed to get name from system, set to default val %s", clusterObj.name);
S
Shengliang Guan 已提交
225
  }
226 227

  clusterObj.id = mndGenerateUid(clusterObj.name, TSDB_CLUSTER_ID_LEN);
228
  clusterObj.id = (clusterObj.id >= 0 ? clusterObj.id : -clusterObj.id);
S
Shengliang Guan 已提交
229
  pMnode->clusterId = clusterObj.id;
230
  mInfo("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name);
S
Shengliang Guan 已提交
231 232 233 234 235

  SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj);
  if (pRaw == NULL) return -1;
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);

236
  mDebug("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw);
237 238

  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL);
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
  if (pTrans == NULL) {
    mError("cluster:%" PRId64 ", failed to create since %s", clusterObj.id, terrstr());
    return -1;
  }
  mDebug("trans:%d, used to create cluster:%" PRId64, pTrans->id, clusterObj.id);

  if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
    mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }

  mndTransDrop(pTrans);
  return 0;
S
Shengliang Guan 已提交
260 261
}

S
Shengliang Guan 已提交
262 263
static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
  SMnode      *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
264 265 266 267
  SSdb        *pSdb = pMnode->pSdb;
  int32_t      numOfRows = 0;
  int32_t      cols = 0;
  SClusterObj *pCluster = NULL;
S
Shengliang Guan 已提交
268

S
Shengliang Guan 已提交
269 270 271
  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_CLUSTER, pShow->pIter, (void **)&pCluster);
    if (pShow->pIter == NULL) break;
S
Shengliang Guan 已提交
272

S
Shengliang Guan 已提交
273
    cols = 0;
274 275
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pCluster->id, false);
S
Shengliang Guan 已提交
276

277
    char buf[tListLen(pCluster->name) + VARSTR_HEADER_SIZE] = {0};
278
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pCluster->name, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
279

280 281
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, buf, false);
S
Shengliang Guan 已提交
282

H
Haojun Liao 已提交
283 284 285 286
    int32_t upTime = mndGetClusterUpTimeImp(pCluster);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&upTime, false);

287
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
288
    colDataAppend(pColInfo, numOfRows, (const char *)&pCluster->createdTime, false);
S
Shengliang Guan 已提交
289

S
Shengliang Guan 已提交
290 291 292
    sdbRelease(pSdb, pCluster);
    numOfRows++;
  }
S
Shengliang Guan 已提交
293

294
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
295
  return numOfRows;
S
Shengliang Guan 已提交
296 297
}

S
Shengliang Guan 已提交
298 299 300
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
S
Shengliang Guan 已提交
301
}
H
Haojun Liao 已提交
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338

static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
  SMnode      *pMnode = pReq->info.node;
  SClusterObj  clusterObj = {0};
  SClusterObj *pCluster = mndAcquireCluster(pMnode);
  if (pCluster != NULL) {
    memcpy(&clusterObj, pCluster, sizeof(SClusterObj));
    clusterObj.upTime += tsUptimeInterval;
    mndReleaseCluster(pMnode, pCluster);
  }

  if (clusterObj.id <= 0) {
    mError("can't get cluster info while update uptime");
    return 0;
  }

  mTrace("update cluster uptime to %" PRId64, clusterObj.upTime);
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
  if (pTrans == NULL) return -1;

  SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj);
  if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }
  sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);

  if (mndTransPrepare(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    mndTransDrop(pTrans);
    return -1;
  }

  mndTransDrop(pTrans);
  return 0;
}