mnodeSdb.c 21.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
17
#include "os.h"
S
slguan 已提交
18
#include "taoserror.h"
19
#include "hash.h"
S
slguan 已提交
20 21
#include "tutil.h"
#include "tbalance.h"
S
slguan 已提交
22
#include "tqueue.h"
S
slguan 已提交
23
#include "twal.h"
S
slguan 已提交
24
#include "tsync.h"
S
slguan 已提交
25
#include "tglobal.h"
S
slguan 已提交
26
#include "dnode.h"
S
Shengliang Guan 已提交
27 28 29 30 31
#include "mnodeDef.h"
#include "mnodeInt.h"
#include "mnodeMnode.h"
#include "mnodeDnode.h"
#include "mnodeSdb.h"
H
hzcheng 已提交
32

S
slguan 已提交
33 34 35 36 37 38 39 40 41
typedef enum {
  SDB_ACTION_INSERT,
  SDB_ACTION_DELETE,
  SDB_ACTION_UPDATE
} ESdbAction;

typedef enum {
  SDB_STATUS_OFFLINE,
  SDB_STATUS_SERVING,
S
slguan 已提交
42
  SDB_STATUS_CLOSING
S
slguan 已提交
43 44
} ESdbStatus;

S
slguan 已提交
45
typedef struct _SSdbTable {
S
slguan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
  char      tableName[TSDB_DB_NAME_LEN + 1];
  ESdbTable tableId;
  ESdbKey   keyType;
  int32_t   hashSessions;
  int32_t   maxRowSize;
  int32_t   refCountPos;
  int32_t   autoIndex;
  int64_t   numOfRows;
  void *    iHandle;
  int32_t (*insertFp)(SSdbOper *pDesc);
  int32_t (*deleteFp)(SSdbOper *pOper);
  int32_t (*updateFp)(SSdbOper *pOper);
  int32_t (*decodeFp)(SSdbOper *pOper);
  int32_t (*encodeFp)(SSdbOper *pOper);
  int32_t (*destroyFp)(SSdbOper *pOper);
  int32_t (*restoredFp)();
S
slguan 已提交
62 63 64
  pthread_mutex_t mutex;
} SSdbTable;

S
slguan 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78
typedef struct {
  ESyncRole  role;
  ESdbStatus status;
  int64_t    version;
  void *     sync;
  void *     wal;
  SSyncCfg   cfg;
  sem_t      sem;
  int32_t    code;
  int32_t    numOfTables;
  SSdbTable *tableList[SDB_TABLE_MAX];
  pthread_mutex_t mutex;
} SSdbObject;

S
slguan 已提交
79
typedef struct {
S
slguan 已提交
80
  int32_t rowSize;
S
slguan 已提交
81
  void *  row;
S
slguan 已提交
82
} SSdbRow;
H
hzcheng 已提交
83

S
slguan 已提交
84 85
static SSdbObject tsSdbObj = {0};
static int sdbWrite(void *param, void *data, int type);
S
slguan 已提交
86

S
slguan 已提交
87 88 89 90 91 92 93
int32_t sdbGetId(void *handle) {
  return ((SSdbTable *)handle)->autoIndex;
}

int64_t sdbGetNumOfRows(void *handle) {
  return ((SSdbTable *)handle)->numOfRows;
}
S
slguan 已提交
94

S
slguan 已提交
95
uint64_t sdbGetVersion() {
S
slguan 已提交
96 97 98 99 100
  return tsSdbObj.version;
}

bool sdbIsMaster() { 
  return tsSdbObj.role == TAOS_SYNC_ROLE_MASTER; 
S
slguan 已提交
101 102
}

S
slguan 已提交
103 104 105 106
bool sdbIsServing() {
  return tsSdbObj.status == SDB_STATUS_SERVING; 
}

107 108 109 110 111 112 113 114
static void *sdbGetObjKey(SSdbTable *pTable, void *key) {
  if (pTable->keyType == SDB_KEY_VAR_STRING) {
    return *(char **)key;
  }

  return key;
}

S
slguan 已提交
115 116 117 118 119 120 121 122 123 124 125
static char *sdbGetActionStr(int32_t action) {
  switch (action) {
    case SDB_ACTION_INSERT:
      return "insert";
    case SDB_ACTION_DELETE:
      return "delete";
    case SDB_ACTION_UPDATE:
      return "update";
  }
  return "invalid";
}
S
slguan 已提交
126

127
static char *sdbGetKeyStr(SSdbTable *pTable, void *key) {
S
slguan 已提交
128 129
  static char str[16];
  switch (pTable->keyType) {
S
slguan 已提交
130
    case SDB_KEY_STRING:
131 132
    case SDB_KEY_VAR_STRING:
      return (char *)key;
S
slguan 已提交
133
    case SDB_KEY_INT:
S
slguan 已提交
134
    case SDB_KEY_AUTO:
135
      sprintf(str, "%d", *(int32_t *)key);
S
slguan 已提交
136 137
      return str;
    default:
S
slguan 已提交
138
      return "invalid";
S
slguan 已提交
139 140 141
  }
}

142 143 144 145
static char *sdbGetKeyStrFromObj(SSdbTable *pTable, void *key) {
  return sdbGetKeyStr(pTable, sdbGetObjKey(pTable, key));
}

S
slguan 已提交
146
static void *sdbGetTableFromId(int32_t tableId) {
S
slguan 已提交
147
  return tsSdbObj.tableList[tableId];
S
slguan 已提交
148 149
}

S
slguan 已提交
150
static int32_t sdbInitWal() {
H
hjxilinx 已提交
151
  SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1};
S
slguan 已提交
152 153 154
  char temp[TSDB_FILENAME_LEN];
  sprintf(temp, "%s/wal", tsMnodeDir);
  tsSdbObj.wal = walOpen(temp, &walCfg);
S
slguan 已提交
155 156
  if (tsSdbObj.wal == NULL) {
    sdbError("failed to open sdb wal in %s", tsMnodeDir);
H
hzcheng 已提交
157 158 159
    return -1;
  }

S
slguan 已提交
160
  sdbTrace("open sdb wal for restore");
S
slguan 已提交
161
  walRestore(tsSdbObj.wal, NULL, sdbWrite);
S
slguan 已提交
162 163
  return 0;
}
H
hzcheng 已提交
164

S
slguan 已提交
165
static void sdbRestoreTables() {
S
slguan 已提交
166 167
  int32_t totalRows = 0;
  int32_t numOfTables = 0;
S
slguan 已提交
168
  for (int32_t tableId = 0; tableId < SDB_TABLE_MAX; ++tableId) {
S
slguan 已提交
169 170
    SSdbTable *pTable = sdbGetTableFromId(tableId);
    if (pTable == NULL) continue;
S
slguan 已提交
171 172
    if (pTable->restoredFp) {
      (*pTable->restoredFp)();
H
hzcheng 已提交
173 174
    }

S
slguan 已提交
175 176
    totalRows += pTable->numOfRows;
    numOfTables++;
S
slguan 已提交
177 178 179 180 181 182 183 184 185 186 187 188
    sdbTrace("table:%s, is restored, numOfRows:%d", pTable->tableName, pTable->numOfRows);
  }

  sdbTrace("sdb is restored, version:%d totalRows:%d numOfTables:%d", tsSdbObj.version, totalRows, numOfTables);
}

void sdbUpdateMnodeRoles() {
  if (tsSdbObj.sync == NULL) return;

  SNodesRole roles = {0};
  syncGetNodesRole(tsSdbObj.sync, &roles);

189
  sdbPrint("update mnodes sync roles, total:%d", tsSdbObj.cfg.replica);
S
slguan 已提交
190
  for (int32_t i = 0; i < tsSdbObj.cfg.replica; ++i) {
191
    SMnodeObj *pMnode = mnodeGetMnode(roles.nodeId[i]);
S
slguan 已提交
192 193
    if (pMnode != NULL) {
      pMnode->role = roles.role[i];
194
      sdbPrint("mnode:%d, role:%s", pMnode->mnodeId, mnodeGetMnodeRoleStr(pMnode->role));
195
      if (pMnode->mnodeId == dnodeGetDnodeId()) tsSdbObj.role = pMnode->role;
196
      mnodeDecMnodeRef(pMnode);
S
slguan 已提交
197 198
    }
  }
199

200
  mnodeUpdateMnodeIpSet();
S
slguan 已提交
201 202
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
203
static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion) {
S
slguan 已提交
204 205 206 207 208
  sdbUpdateMnodeRoles();
  return 0;
}

static int sdbGetWalInfo(void *ahandle, char *name, uint32_t *index) {
S
slguan 已提交
209
  return walGetWalFile(tsSdbObj.wal, name, index);
S
slguan 已提交
210 211 212
}

static void sdbNotifyRole(void *ahandle, int8_t role) {
213
  sdbPrint("mnode role changed from %s to %s", mnodeGetMnodeRoleStr(tsSdbObj.role), mnodeGetMnodeRoleStr(role));
S
slguan 已提交
214 215 216 217 218 219 220 221 222 223 224 225

  if (role == TAOS_SYNC_ROLE_MASTER && tsSdbObj.role != TAOS_SYNC_ROLE_MASTER) {
    balanceReset();
  }
  tsSdbObj.role = role;

  sdbUpdateMnodeRoles();
}

static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
  tsSdbObj.code = code;
  sem_post(&tsSdbObj.sem);
S
slguan 已提交
226
  sdbTrace("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code));
S
slguan 已提交
227 228
}

S
slguan 已提交
229
static int32_t sdbForwardToPeer(SWalHead *pHead) {
S
slguan 已提交
230 231
  if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS;

232
  int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC);
S
slguan 已提交
233
  if (code > 0) {
234
    sdbTrace("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code);
S
slguan 已提交
235 236 237 238 239 240 241 242 243 244
    sem_wait(&tsSdbObj.sem);
    return tsSdbObj.code;
  } 
  return code;
}

void sdbUpdateSync() {
  SSyncCfg syncCfg = {0};
  int32_t index = 0;

S
slguan 已提交
245
  SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
S
slguan 已提交
246
  for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
S
slguan 已提交
247
    SDMMnodeInfo *node = &mnodes->nodeInfos[i];
S
slguan 已提交
248
    syncCfg.nodeInfo[i].nodeId = node->nodeId;
J
jtao1735 已提交
249 250
    taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[i].nodeFqdn, &syncCfg.nodeInfo[i].nodePort);
    syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
S
slguan 已提交
251 252 253 254
    index++;
  }

  if (index == 0) {
S
Shengliang Guan 已提交
255
    void *pIter = NULL;
S
slguan 已提交
256 257
    while (1) {
      SMnodeObj *pMnode = NULL;
258
      pIter = mnodeGetNextMnode(pIter, &pMnode);
S
slguan 已提交
259 260 261 262
      if (pMnode == NULL) break;

      syncCfg.nodeInfo[index].nodeId = pMnode->mnodeId;

263
      SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
264 265 266 267 268 269
      if (pDnode != NULL) {
        syncCfg.nodeInfo[index].nodePort = pDnode->dnodePort + TSDB_PORT_SYNC;
        strcpy(syncCfg.nodeInfo[index].nodeFqdn, pDnode->dnodeEp);
        index++;
      }

270 271
      mnodeDecDnodeRef(pDnode);
      mnodeDecMnodeRef(pMnode);
S
slguan 已提交
272
    }
S
Shengliang Guan 已提交
273
    sdbFreeIter(pIter);
S
slguan 已提交
274 275 276
  }

  syncCfg.replica = index;
J
jtao1735 已提交
277
  syncCfg.quorum = (syncCfg.replica == 1) ? 1:2;
S
slguan 已提交
278 279 280 281 282 283 284 285 286 287 288 289

  bool hasThisDnode = false;
  for (int32_t i = 0; i < syncCfg.replica; ++i) {
    if (syncCfg.nodeInfo[i].nodeId == dnodeGetDnodeId()) {
      hasThisDnode = true;
      break;
    }
  }

  if (!hasThisDnode) return;
  if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) return;

J
jtao1735 已提交
290
  sdbPrint("work as mnode, replica:%d", syncCfg.replica);
S
slguan 已提交
291
  for (int32_t i = 0; i < syncCfg.replica; ++i) {
J
jtao1735 已提交
292
    sdbPrint("mnode:%d, %s:%d", syncCfg.nodeInfo[i].nodeId, syncCfg.nodeInfo[i].nodeFqdn, syncCfg.nodeInfo[i].nodePort);
S
slguan 已提交
293
  }
S
slguan 已提交
294

guanshengliang's avatar
guanshengliang 已提交
295
  SSyncInfo syncInfo = {0};
S
slguan 已提交
296 297 298
  syncInfo.vgId = 1;
  syncInfo.version = sdbGetVersion();
  syncInfo.syncCfg = syncCfg;
S
slguan 已提交
299
  sprintf(syncInfo.path, "%s", tsMnodeDir);
S
slguan 已提交
300 301 302 303 304 305 306
  syncInfo.ahandle = NULL;
  syncInfo.getWalInfo = sdbGetWalInfo;
  syncInfo.getFileInfo = sdbGetFileInfo;
  syncInfo.writeToCache = sdbWrite;
  syncInfo.confirmForward = sdbConfirmForward; 
  syncInfo.notifyRole = sdbNotifyRole;
  tsSdbObj.cfg = syncCfg;
307
  
S
slguan 已提交
308 309 310 311 312
  if (tsSdbObj.sync) {
    syncReconfig(tsSdbObj.sync, &syncCfg);
  } else {
    tsSdbObj.sync = syncStart(&syncInfo);
  }
313
  sdbUpdateMnodeRoles();
S
slguan 已提交
314 315 316 317 318 319 320 321 322
}

int32_t sdbInit() {
  pthread_mutex_init(&tsSdbObj.mutex, NULL);
  sem_init(&tsSdbObj.sem, 0, 0);

  if (sdbInitWal() != 0) {
    return -1;
  }
S
slguan 已提交
323
  
S
slguan 已提交
324
  sdbRestoreTables();
S
slguan 已提交
325

326
  if (mnodeGetMnodesNum() == 1) {
S
slguan 已提交
327 328 329 330 331 332
    tsSdbObj.role = TAOS_SYNC_ROLE_MASTER;
  }

  sdbUpdateSync();

  tsSdbObj.status = SDB_STATUS_SERVING;
S
slguan 已提交
333
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
334 335
}

S
slguan 已提交
336
void sdbCleanUp() {
S
slguan 已提交
337 338
  if (tsSdbObj.status != SDB_STATUS_SERVING) return;

S
slguan 已提交
339
  tsSdbObj.status = SDB_STATUS_CLOSING;
guanshengliang's avatar
guanshengliang 已提交
340 341 342 343 344 345 346 347 348 349 350
  
  if (tsSdbObj.sync) {
    syncStop(tsSdbObj.sync);
    tsSdbObj.sync = NULL;
  }

  if (tsSdbObj.wal) {
    walClose(tsSdbObj.wal);
    tsSdbObj.wal = NULL;
  }
  
S
slguan 已提交
351 352
  sem_destroy(&tsSdbObj.sem);
  pthread_mutex_destroy(&tsSdbObj.mutex);
H
hzcheng 已提交
353 354
}

355 356 357 358 359 360
void sdbIncRef(void *handle, void *pObj) {
  if (pObj == NULL) return;

  SSdbTable *pTable = handle;
  int32_t *  pRefCount = (int32_t *)(pObj + pTable->refCountPos);
  atomic_add_fetch_32(pRefCount, 1);
S
Shengliang Guan 已提交
361
  if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) {
362
    sdbTrace("add ref to table:%s record:%s:%d", pTable->tableName, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount);
S
slguan 已提交
363 364 365
  }
}

366 367 368 369 370 371
void sdbDecRef(void *handle, void *pObj) {
  if (pObj == NULL) return;

  SSdbTable *pTable = handle;
  int32_t *  pRefCount = (int32_t *)(pObj + pTable->refCountPos);
  int32_t    refCount = atomic_sub_fetch_32(pRefCount, 1);
S
Shengliang Guan 已提交
372
  if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) {
373
    sdbTrace("def ref of table:%s record:%s:%d", pTable->tableName, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount);
S
slguan 已提交
374 375
  }

376 377 378 379 380 381 382
  int8_t *updateEnd = pObj + pTable->refCountPos - 1;
  if (refCount <= 0 && *updateEnd) {
    sdbTrace("table:%s, record:%s:%d is destroyed", pTable->tableName, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount);
    SSdbOper oper = {.pObj = pObj};
    (*pTable->destroyFp)(&oper);
  }
}
S
slguan 已提交
383

384 385
static SSdbRow *sdbGetRowMeta(SSdbTable *pTable, void *key) {
  if (pTable == NULL) return NULL;
S
slguan 已提交
386

387
  int32_t keySize = sizeof(int32_t);
388
  if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
389 390
    keySize = strlen((char *)key);
  }
391 392 393
  
  return taosHashGet(pTable->iHandle, key, keySize);
}
S
slguan 已提交
394

395 396
static SSdbRow *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) {
  return sdbGetRowMeta(pTable, sdbGetObjKey(pTable, key));
S
slguan 已提交
397 398
}

H
hzcheng 已提交
399 400
void *sdbGetRow(void *handle, void *key) {
  SSdbTable *pTable = (SSdbTable *)handle;
S
slguan 已提交
401
  SSdbRow * pMeta;
H
hzcheng 已提交
402 403 404 405

  if (handle == NULL) return NULL;

  pthread_mutex_lock(&pTable->mutex);
406 407

  int32_t keySize = sizeof(int32_t);
408
  if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
409 410 411 412
    keySize = strlen((char *)key);
  }
  pMeta = taosHashGet(pTable->iHandle, key, keySize);

S
slguan 已提交
413
  if (pMeta) sdbIncRef(pTable, pMeta->row);
H
hzcheng 已提交
414 415
  pthread_mutex_unlock(&pTable->mutex);

416
  if (pMeta == NULL) return NULL;
H
hzcheng 已提交
417 418 419 420

  return pMeta->row;
}

421 422 423 424
static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) {
  return sdbGetRow(pTable, sdbGetObjKey(pTable, key));
}

S
slguan 已提交
425 426
static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
  SSdbRow rowMeta;
S
slguan 已提交
427 428
  rowMeta.rowSize = pOper->rowSize;
  rowMeta.row = pOper->pObj;
H
hzcheng 已提交
429

S
slguan 已提交
430
  pthread_mutex_lock(&pTable->mutex);
431

432
  void *  key = sdbGetObjKey(pTable, pOper->pObj);
433
  int32_t keySize = sizeof(int32_t);
434 435 436

  if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
    keySize = strlen((char *)key);
437
  }
438 439

  taosHashPut(pTable->iHandle, key, keySize, &rowMeta, sizeof(SSdbRow));
440

S
slguan 已提交
441 442
  sdbIncRef(pTable, pOper->pObj);
  pTable->numOfRows++;
S
slguan 已提交
443 444 445

  if (pTable->keyType == SDB_KEY_AUTO) {
    pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pOper->pObj));
S
slguan 已提交
446 447
  } else {
    pTable->autoIndex++;
S
slguan 已提交
448 449
  }

S
slguan 已提交
450 451
  pthread_mutex_unlock(&pTable->mutex);

452 453
  sdbTrace("table:%s, insert record:%s to hash, rowSize:%d vnumOfRows:%d version:%" PRIu64, pTable->tableName,
           sdbGetKeyStrFromObj(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, sdbGetVersion());
S
slguan 已提交
454 455 456 457

  (*pTable->insertFp)(pOper);
  return TSDB_CODE_SUCCESS;
}
H
hzcheng 已提交
458

S
slguan 已提交
459
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
S
slguan 已提交
460 461
  (*pTable->deleteFp)(pOper);
  
H
hzcheng 已提交
462
  pthread_mutex_lock(&pTable->mutex);
463

464
  void *  key = sdbGetObjKey(pTable, pOper->pObj);
465
  int32_t keySize = sizeof(int32_t);
466 467 468

  if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
    keySize = strlen((char *)key);
469
  }
470 471

  taosHashRemove(pTable->iHandle, key, keySize);
472

S
slguan 已提交
473 474
  pTable->numOfRows--;
  pthread_mutex_unlock(&pTable->mutex);
H
hzcheng 已提交
475

S
slguan 已提交
476
  sdbTrace("table:%s, delete record:%s from hash, numOfRows:%d version:%" PRIu64, pTable->tableName,
477
           sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion());
S
slguan 已提交
478

S
slguan 已提交
479
  int8_t *updateEnd = pOper->pObj + pTable->refCountPos - 1;
S
slguan 已提交
480 481
  *updateEnd = 1;
  sdbDecRef(pTable, pOper->pObj);
S
slguan 已提交
482

S
slguan 已提交
483 484 485
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
486
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) {
S
slguan 已提交
487
  sdbTrace("table:%s, update record:%s in hash, numOfRows:%d version:%" PRIu64, pTable->tableName,
488
           sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion());
S
slguan 已提交
489 490 491 492 493

  (*pTable->updateFp)(pOper);
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
494 495 496 497
static int sdbWrite(void *param, void *data, int type) {
  SWalHead *pHead = data;
  int32_t   tableId = pHead->msgType / 10;
  int32_t   action = pHead->msgType % 10;
S
slguan 已提交
498

S
slguan 已提交
499 500
  SSdbTable *pTable = sdbGetTableFromId(tableId);
  assert(pTable != NULL);
S
slguan 已提交
501

S
slguan 已提交
502 503 504 505 506 507 508 509 510
  pthread_mutex_lock(&tsSdbObj.mutex);
  if (pHead->version == 0) {
     // assign version
    tsSdbObj.version++;
    pHead->version = tsSdbObj.version;
  } else {
    // for data from WAL or forward, version may be smaller
    if (pHead->version <= tsSdbObj.version) {
      pthread_mutex_unlock(&tsSdbObj.mutex);
S
slguan 已提交
511 512 513 514
      if (type == TAOS_QTYPE_FWD && tsSdbObj.sync != NULL) {
        sdbTrace("forward request is received, version:%" PRIu64 " confirm it", pHead->version);
        syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
      }
S
slguan 已提交
515 516 517 518
      return TSDB_CODE_SUCCESS;
    } else if (pHead->version != tsSdbObj.version + 1) {
      pthread_mutex_unlock(&tsSdbObj.mutex);
      sdbError("table:%s, failed to restore %s record:%s from wal, version:%" PRId64 " too large, sdb version:%" PRId64,
519
               pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version,
S
slguan 已提交
520 521 522 523 524
               tsSdbObj.version);
      return TSDB_CODE_OTHERS;
    } else {
      tsSdbObj.version = pHead->version;
    }
S
slguan 已提交
525
  }
S
slguan 已提交
526

S
slguan 已提交
527
  int32_t code = walWrite(tsSdbObj.wal, pHead);
S
slguan 已提交
528
  if (code < 0) {
S
slguan 已提交
529 530
    pthread_mutex_unlock(&tsSdbObj.mutex);
    return code;
S
slguan 已提交
531
  }
S
slguan 已提交
532
  walFsync(tsSdbObj.wal);
S
slguan 已提交
533

S
slguan 已提交
534
  code = sdbForwardToPeer(pHead);
S
slguan 已提交
535
  pthread_mutex_unlock(&tsSdbObj.mutex);
S
slguan 已提交
536

S
slguan 已提交
537
  // from app, oper is created
S
slguan 已提交
538 539 540 541 542 543
  if (param != NULL) {
    //sdbTrace("request from app is disposed, version:%" PRIu64 " code:%s", pHead->version, tstrerror(code));
    return code;
  }
  
  // from wal or forward msg, oper not created, should add into hash
S
slguan 已提交
544
  if (tsSdbObj.sync != NULL) {
S
slguan 已提交
545
    sdbTrace("forward request is received, version:%" PRIu64 " result:%s, confirm it", pHead->version, tstrerror(code));
S
slguan 已提交
546 547
    syncConfirmForward(tsSdbObj.sync, pHead->version, code);
  }
S
slguan 已提交
548 549

  if (action == SDB_ACTION_INSERT) {
S
slguan 已提交
550
    SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
S
slguan 已提交
551
    code = (*pTable->decodeFp)(&oper);
S
slguan 已提交
552
    return sdbInsertHash(pTable, &oper);
S
slguan 已提交
553
  } else if (action == SDB_ACTION_DELETE) {
S
slguan 已提交
554
    SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
S
slguan 已提交
555
    assert(rowMeta != NULL && rowMeta->row != NULL);
S
slguan 已提交
556 557
    SSdbOper oper = {.table = pTable, .pObj = rowMeta->row};
    return sdbDeleteHash(pTable, &oper);
S
slguan 已提交
558
  } else if (action == SDB_ACTION_UPDATE) {
S
slguan 已提交
559
    SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
S
slguan 已提交
560
    assert(rowMeta != NULL && rowMeta->row != NULL);
S
slguan 已提交
561
    SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
S
slguan 已提交
562
    code = (*pTable->decodeFp)(&oper);
S
slguan 已提交
563 564
    return sdbUpdateHash(pTable, &oper);
  } else { return TSDB_CODE_INVALID_MSG_TYPE; }
S
slguan 已提交
565 566
}

S
slguan 已提交
567
int32_t sdbInsertRow(SSdbOper *pOper) {
S
slguan 已提交
568 569 570
  SSdbTable *pTable = (SSdbTable *)pOper->table;
  if (pTable == NULL) return -1;

571 572
  if (sdbGetRowFromObj(pTable, pOper->pObj)) {
    sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, sdbGetKeyStrFromObj(pTable, pOper->pObj));
S
slguan 已提交
573 574
    sdbDecRef(pTable, pOper->pObj);
    return TSDB_CODE_ALREADY_THERE;
S
slguan 已提交
575
  }
S
slguan 已提交
576

S
slguan 已提交
577
  if (pTable->keyType == SDB_KEY_AUTO) {
S
slguan 已提交
578
    pthread_mutex_lock(&pTable->mutex);
S
slguan 已提交
579
    *((uint32_t *)pOper->pObj) = ++pTable->autoIndex;
S
slguan 已提交
580 581 582 583 584

    // let vgId increase from 2
    if (pTable->autoIndex == 1 && strcmp(pTable->tableName, "vgroups") == 0) {
      *((uint32_t *)pOper->pObj) = ++pTable->autoIndex;
    }
S
slguan 已提交
585
    pthread_mutex_unlock(&pTable->mutex);
S
slguan 已提交
586
  }
H
hzcheng 已提交
587

S
slguan 已提交
588
  if (pOper->type == SDB_OPER_GLOBAL) {
S
slguan 已提交
589
    int32_t   size = sizeof(SWalHead) + pTable->maxRowSize;
S
slguan 已提交
590
    SWalHead *pHead = taosAllocateQitem(size);
S
slguan 已提交
591 592 593 594 595 596 597 598
    pHead->version = 0;
    pHead->len = pOper->rowSize;
    pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT;

    pOper->rowData = pHead->cont;
    (*pTable->encodeFp)(pOper);
    pHead->len = pOper->rowSize;

S
slguan 已提交
599
    int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
S
slguan 已提交
600
    taosFreeQitem(pHead);
S
slguan 已提交
601
    if (code < 0) return code;
S
slguan 已提交
602 603 604
  }

  return sdbInsertHash(pTable, pOper);
H
hzcheng 已提交
605 606
}

S
slguan 已提交
607
int32_t sdbDeleteRow(SSdbOper *pOper) {
S
slguan 已提交
608
  SSdbTable *pTable = (SSdbTable *)pOper->table;
H
hzcheng 已提交
609 610
  if (pTable == NULL) return -1;

611
  SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj);
H
hzcheng 已提交
612
  if (pMeta == NULL) {
S
slguan 已提交
613
    sdbTrace("table:%s, record is not there, delete failed", pTable->tableName);
H
hzcheng 已提交
614 615 616
    return -1;
  }

S
slguan 已提交
617
  void * pMetaRow = pMeta->row;
H
hzcheng 已提交
618 619
  assert(pMetaRow != NULL);

S
slguan 已提交
620
  if (pOper->type == SDB_OPER_GLOBAL) {
621 622
    void *  key = sdbGetObjKey(pTable, pOper->pObj);
    int32_t keySize = 0;
S
slguan 已提交
623
    switch (pTable->keyType) {
S
slguan 已提交
624
      case SDB_KEY_STRING:
625 626
      case SDB_KEY_VAR_STRING:
        keySize = strlen((char *)key) + 1;
S
slguan 已提交
627
        break;
S
slguan 已提交
628
      case SDB_KEY_INT:
S
slguan 已提交
629
      case SDB_KEY_AUTO:
630
        keySize = sizeof(uint32_t);
S
slguan 已提交
631 632 633 634
        break;
      default:
        return -1;
    }
S
slguan 已提交
635

636
    int32_t   size = sizeof(SWalHead) + keySize;
S
slguan 已提交
637
    SWalHead *pHead = taosAllocateQitem(size);
S
slguan 已提交
638
    pHead->version = 0;
639
    pHead->len = keySize;
S
slguan 已提交
640
    pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
641
    memcpy(pHead->cont, key, keySize);
H
hzcheng 已提交
642

S
slguan 已提交
643
    int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
S
slguan 已提交
644
    taosFreeQitem(pHead);
S
slguan 已提交
645
    if (code < 0) return code;
S
slguan 已提交
646 647 648
  }

  return sdbDeleteHash(pTable, pOper);
H
hzcheng 已提交
649 650
}

S
slguan 已提交
651
int32_t sdbUpdateRow(SSdbOper *pOper) {
S
slguan 已提交
652 653
  SSdbTable *pTable = (SSdbTable *)pOper->table;
  if (pTable == NULL) return -1;
H
hzcheng 已提交
654

655
  SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj);
H
hzcheng 已提交
656
  if (pMeta == NULL) {
S
slguan 已提交
657
    sdbTrace("table:%s, record is not there, delete failed", pTable->tableName);
H
hzcheng 已提交
658 659 660
    return -1;
  }

S
slguan 已提交
661
  void * pMetaRow = pMeta->row;
H
hzcheng 已提交
662 663
  assert(pMetaRow != NULL);

S
slguan 已提交
664
  if (pOper->type == SDB_OPER_GLOBAL) {
S
slguan 已提交
665
    int32_t   size = sizeof(SWalHead) + pTable->maxRowSize;
S
slguan 已提交
666
    SWalHead *pHead = taosAllocateQitem(size);
S
slguan 已提交
667 668
    pHead->version = 0;
    pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE;
H
hzcheng 已提交
669

S
slguan 已提交
670 671 672 673
    pOper->rowData = pHead->cont;
    (*pTable->encodeFp)(pOper);
    pHead->len = pOper->rowSize;

S
slguan 已提交
674
    int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
S
slguan 已提交
675
    taosFreeQitem(pHead);
S
slguan 已提交
676
    if (code < 0) return code;
S
slguan 已提交
677
  } 
S
slguan 已提交
678
  
S
slguan 已提交
679
  return sdbUpdateHash(pTable, pOper);
S
slguan 已提交
680
}
S
slguan 已提交
681

S
slguan 已提交
682 683 684 685
void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
  SSdbTable *pTable = (SSdbTable *)handle;
  *ppRow = NULL;
  if (pTable == NULL) return NULL;
H
hzcheng 已提交
686

687 688 689 690 691 692 693 694 695 696 697
  SHashMutableIterator *pIter = pNode;
  if (pIter == NULL) {
    pIter = taosHashCreateIter(pTable->iHandle);
  }

  if (!taosHashIterNext(pIter)) {
    taosHashDestroyIter(pIter);
    return NULL;
  }

  SSdbRow *pMeta = taosHashIterGet(pIter);
S
Shengliang Guan 已提交
698 699 700 701
  if (pMeta == NULL) {
    taosHashDestroyIter(pIter);
    return NULL;
  }
S
slguan 已提交
702

S
slguan 已提交
703 704
  *ppRow = pMeta->row;
  sdbIncRef(handle, pMeta->row);
H
hzcheng 已提交
705

706
  return pIter;
S
slguan 已提交
707 708
}

S
Shengliang Guan 已提交
709 710 711 712 713 714
void sdbFreeIter(void *pIter) {
  if (pIter != NULL) {
    taosHashDestroyIter(pIter);
  }
}

S
slguan 已提交
715 716
void *sdbOpenTable(SSdbTableDesc *pDesc) {
  SSdbTable *pTable = (SSdbTable *)calloc(1, sizeof(SSdbTable));
S
slguan 已提交
717
  
S
slguan 已提交
718 719 720 721 722 723 724 725 726 727 728 729 730 731
  if (pTable == NULL) return NULL;

  strcpy(pTable->tableName, pDesc->tableName);
  pTable->keyType      = pDesc->keyType;
  pTable->tableId      = pDesc->tableId;
  pTable->hashSessions = pDesc->hashSessions;
  pTable->maxRowSize   = pDesc->maxRowSize;
  pTable->refCountPos  = pDesc->refCountPos;
  pTable->insertFp     = pDesc->insertFp;
  pTable->deleteFp     = pDesc->deleteFp;
  pTable->updateFp     = pDesc->updateFp;
  pTable->encodeFp     = pDesc->encodeFp;
  pTable->decodeFp     = pDesc->decodeFp;
  pTable->destroyFp    = pDesc->destroyFp;
S
slguan 已提交
732
  pTable->restoredFp   = pDesc->restoredFp;
733 734 735 736

  _hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
  if (pTable->keyType == SDB_KEY_STRING) {
    hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
S
slguan 已提交
737
  }
738
  pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true);
S
slguan 已提交
739 740 741

  pthread_mutex_init(&pTable->mutex, NULL);

S
slguan 已提交
742 743
  tsSdbObj.numOfTables++;
  tsSdbObj.tableList[pTable->tableId] = pTable;
S
slguan 已提交
744
  return pTable;
H
hzcheng 已提交
745 746 747 748 749
}

void sdbCloseTable(void *handle) {
  SSdbTable *pTable = (SSdbTable *)handle;
  if (pTable == NULL) return;
S
slguan 已提交
750
  
S
slguan 已提交
751 752
  tsSdbObj.numOfTables--;
  tsSdbObj.tableList[pTable->tableId] = NULL;
H
hzcheng 已提交
753

754 755 756 757
  SHashMutableIterator *pIter = taosHashCreateIter(pTable->iHandle);
  while (taosHashIterNext(pIter)) {
    SSdbRow *pMeta = taosHashIterGet(pIter);
    if (pMeta == NULL) continue;
S
slguan 已提交
758

S
slguan 已提交
759
    SSdbOper oper = {
760
      .pObj = pMeta->row,
S
slguan 已提交
761 762
      .table = pTable,
    };
763

S
slguan 已提交
764
    (*pTable->destroyFp)(&oper);
H
hzcheng 已提交
765 766
  }

767 768
  taosHashDestroyIter(pIter);
  taosHashCleanup(pTable->iHandle);
H
hzcheng 已提交
769 770

  pthread_mutex_destroy(&pTable->mutex);
S
slguan 已提交
771
  
S
slguan 已提交
772
  sdbTrace("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbObj.numOfTables);
S
slguan 已提交
773
  free(pTable);
H
hzcheng 已提交
774
}
S
slguan 已提交
775