mgmtSdb.c 21.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
17
#include "os.h"
S
slguan 已提交
18
#include "taoserror.h"
19
#include "hash.h"
S
slguan 已提交
20
#include "trpc.h"
S
slguan 已提交
21 22
#include "tutil.h"
#include "tbalance.h"
S
slguan 已提交
23
#include "tqueue.h"
S
slguan 已提交
24
#include "twal.h"
S
slguan 已提交
25
#include "tsync.h"
S
slguan 已提交
26
#include "tglobal.h"
S
slguan 已提交
27 28
#include "dnode.h"
#include "mgmtDef.h"
guanshengliang's avatar
guanshengliang 已提交
29
#include "mgmtInt.h"
S
slguan 已提交
30
#include "mgmtMnode.h"
S
slguan 已提交
31
#include "mgmtSdb.h"
H
hzcheng 已提交
32

S
slguan 已提交
33 34 35 36 37 38 39 40 41
typedef enum {
  SDB_ACTION_INSERT,
  SDB_ACTION_DELETE,
  SDB_ACTION_UPDATE
} ESdbAction;

typedef enum {
  SDB_STATUS_OFFLINE,
  SDB_STATUS_SERVING,
S
slguan 已提交
42
  SDB_STATUS_CLOSING
S
slguan 已提交
43 44
} ESdbStatus;

S
slguan 已提交
45
typedef struct _SSdbTable {
S
slguan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
  char      tableName[TSDB_DB_NAME_LEN + 1];
  ESdbTable tableId;
  ESdbKey   keyType;
  int32_t   hashSessions;
  int32_t   maxRowSize;
  int32_t   refCountPos;
  int32_t   autoIndex;
  int64_t   numOfRows;
  void *    iHandle;
  int32_t (*insertFp)(SSdbOper *pDesc);
  int32_t (*deleteFp)(SSdbOper *pOper);
  int32_t (*updateFp)(SSdbOper *pOper);
  int32_t (*decodeFp)(SSdbOper *pOper);
  int32_t (*encodeFp)(SSdbOper *pOper);
  int32_t (*destroyFp)(SSdbOper *pOper);
  int32_t (*restoredFp)();
S
slguan 已提交
62 63 64
  pthread_mutex_t mutex;
} SSdbTable;

S
slguan 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78
typedef struct {
  ESyncRole  role;
  ESdbStatus status;
  int64_t    version;
  void *     sync;
  void *     wal;
  SSyncCfg   cfg;
  sem_t      sem;
  int32_t    code;
  int32_t    numOfTables;
  SSdbTable *tableList[SDB_TABLE_MAX];
  pthread_mutex_t mutex;
} SSdbObject;

S
slguan 已提交
79
typedef struct {
S
slguan 已提交
80
  int32_t rowSize;
S
slguan 已提交
81
  void *  row;
S
slguan 已提交
82
} SSdbRow;
H
hzcheng 已提交
83

S
slguan 已提交
84 85
static SSdbObject tsSdbObj = {0};
static int sdbWrite(void *param, void *data, int type);
S
slguan 已提交
86

S
slguan 已提交
87 88 89 90 91 92 93
int32_t sdbGetId(void *handle) {
  return ((SSdbTable *)handle)->autoIndex;
}

int64_t sdbGetNumOfRows(void *handle) {
  return ((SSdbTable *)handle)->numOfRows;
}
S
slguan 已提交
94

S
slguan 已提交
95
uint64_t sdbGetVersion() {
S
slguan 已提交
96 97 98 99 100
  return tsSdbObj.version;
}

bool sdbIsMaster() { 
  return tsSdbObj.role == TAOS_SYNC_ROLE_MASTER; 
S
slguan 已提交
101 102
}

S
slguan 已提交
103 104 105 106
bool sdbIsServing() {
  return tsSdbObj.status == SDB_STATUS_SERVING; 
}

107 108 109 110 111 112 113 114
static void *sdbGetObjKey(SSdbTable *pTable, void *key) {
  if (pTable->keyType == SDB_KEY_VAR_STRING) {
    return *(char **)key;
  }

  return key;
}

S
slguan 已提交
115 116 117 118 119 120 121 122 123 124 125
static char *sdbGetActionStr(int32_t action) {
  switch (action) {
    case SDB_ACTION_INSERT:
      return "insert";
    case SDB_ACTION_DELETE:
      return "delete";
    case SDB_ACTION_UPDATE:
      return "update";
  }
  return "invalid";
}
S
slguan 已提交
126

127
static char *sdbGetKeyStr(SSdbTable *pTable, void *key) {
S
slguan 已提交
128 129
  static char str[16];
  switch (pTable->keyType) {
S
slguan 已提交
130
    case SDB_KEY_STRING:
131 132
    case SDB_KEY_VAR_STRING:
      return (char *)key;
S
slguan 已提交
133
    case SDB_KEY_INT:
S
slguan 已提交
134
    case SDB_KEY_AUTO:
135
      sprintf(str, "%d", *(int32_t *)key);
S
slguan 已提交
136 137
      return str;
    default:
S
slguan 已提交
138
      return "invalid";
S
slguan 已提交
139 140 141
  }
}

142 143 144 145
static char *sdbGetKeyStrFromObj(SSdbTable *pTable, void *key) {
  return sdbGetKeyStr(pTable, sdbGetObjKey(pTable, key));
}

S
slguan 已提交
146
static void *sdbGetTableFromId(int32_t tableId) {
S
slguan 已提交
147
  return tsSdbObj.tableList[tableId];
S
slguan 已提交
148 149
}

S
slguan 已提交
150
static int32_t sdbInitWal() {
H
hjxilinx 已提交
151
  SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1};
S
slguan 已提交
152 153 154
  char temp[TSDB_FILENAME_LEN];
  sprintf(temp, "%s/wal", tsMnodeDir);
  tsSdbObj.wal = walOpen(temp, &walCfg);
S
slguan 已提交
155 156
  if (tsSdbObj.wal == NULL) {
    sdbError("failed to open sdb wal in %s", tsMnodeDir);
H
hzcheng 已提交
157 158 159
    return -1;
  }

S
slguan 已提交
160
  sdbTrace("open sdb wal for restore");
S
slguan 已提交
161
  walRestore(tsSdbObj.wal, NULL, sdbWrite);
S
slguan 已提交
162 163
  return 0;
}
H
hzcheng 已提交
164

S
slguan 已提交
165
static void sdbRestoreTables() {
S
slguan 已提交
166 167
  int32_t totalRows = 0;
  int32_t numOfTables = 0;
S
slguan 已提交
168
  for (int32_t tableId = 0; tableId < SDB_TABLE_MAX; ++tableId) {
S
slguan 已提交
169 170
    SSdbTable *pTable = sdbGetTableFromId(tableId);
    if (pTable == NULL) continue;
S
slguan 已提交
171 172
    if (pTable->restoredFp) {
      (*pTable->restoredFp)();
H
hzcheng 已提交
173 174
    }

S
slguan 已提交
175 176
    totalRows += pTable->numOfRows;
    numOfTables++;
S
slguan 已提交
177 178 179 180 181 182 183 184 185 186 187 188
    sdbTrace("table:%s, is restored, numOfRows:%d", pTable->tableName, pTable->numOfRows);
  }

  sdbTrace("sdb is restored, version:%d totalRows:%d numOfTables:%d", tsSdbObj.version, totalRows, numOfTables);
}

void sdbUpdateMnodeRoles() {
  if (tsSdbObj.sync == NULL) return;

  SNodesRole roles = {0};
  syncGetNodesRole(tsSdbObj.sync, &roles);

S
slguan 已提交
189
  sdbPrint("update mnodes:%d sync roles", tsSdbObj.cfg.replica);
S
slguan 已提交
190 191 192 193
  for (int32_t i = 0; i < tsSdbObj.cfg.replica; ++i) {
    SMnodeObj *pMnode = mgmtGetMnode(roles.nodeId[i]);
    if (pMnode != NULL) {
      pMnode->role = roles.role[i];
S
slguan 已提交
194
      sdbPrint("mnode:%d, role:%s", pMnode->mnodeId, mgmtGetMnodeRoleStr(pMnode->role));
195
      if (pMnode->mnodeId == dnodeGetDnodeId()) tsSdbObj.role = pMnode->role;
196
      mgmtDecMnodeRef(pMnode);
S
slguan 已提交
197 198 199 200
    }
  }
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201
static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion) {
S
slguan 已提交
202 203 204 205 206
  sdbUpdateMnodeRoles();
  return 0;
}

static int sdbGetWalInfo(void *ahandle, char *name, uint32_t *index) {
S
slguan 已提交
207
  return walGetWalFile(tsSdbObj.wal, name, index);
S
slguan 已提交
208 209 210
}

static void sdbNotifyRole(void *ahandle, int8_t role) {
S
slguan 已提交
211
  sdbPrint("mnode role changed from %s to %s", mgmtGetMnodeRoleStr(tsSdbObj.role), mgmtGetMnodeRoleStr(role));
S
slguan 已提交
212 213 214 215 216 217 218 219 220 221 222 223

  if (role == TAOS_SYNC_ROLE_MASTER && tsSdbObj.role != TAOS_SYNC_ROLE_MASTER) {
    balanceReset();
  }
  tsSdbObj.role = role;

  sdbUpdateMnodeRoles();
}

static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
  tsSdbObj.code = code;
  sem_post(&tsSdbObj.sem);
S
slguan 已提交
224
  sdbTrace("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code));
S
slguan 已提交
225 226
}

S
slguan 已提交
227
static int32_t sdbForwardToPeer(SWalHead *pHead) {
S
slguan 已提交
228 229
  if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS;

S
slguan 已提交
230
  int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version);
S
slguan 已提交
231
  if (code > 0) {
232
    sdbTrace("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code);
S
slguan 已提交
233 234 235 236 237 238 239 240 241 242
    sem_wait(&tsSdbObj.sem);
    return tsSdbObj.code;
  } 
  return code;
}

void sdbUpdateSync() {
  SSyncCfg syncCfg = {0};
  int32_t index = 0;

S
slguan 已提交
243
  SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
S
slguan 已提交
244
  for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
S
slguan 已提交
245
    SDMMnodeInfo *node = &mnodes->nodeInfos[i];
S
slguan 已提交
246
    syncCfg.nodeInfo[i].nodeId = node->nodeId;
J
jtao1735 已提交
247 248
    taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[i].nodeFqdn, &syncCfg.nodeInfo[i].nodePort);
    syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
S
slguan 已提交
249 250 251 252
    index++;
  }

  if (index == 0) {
S
Shengliang Guan 已提交
253
    void *pIter = NULL;
S
slguan 已提交
254 255
    while (1) {
      SMnodeObj *pMnode = NULL;
S
Shengliang Guan 已提交
256
      pIter = mgmtGetNextMnode(pIter, &pMnode);
S
slguan 已提交
257 258 259
      if (pMnode == NULL) break;

      syncCfg.nodeInfo[index].nodeId = pMnode->mnodeId;
J
jtao1735 已提交
260 261
      syncCfg.nodeInfo[index].nodePort = pMnode->pDnode->dnodePort + TSDB_PORT_SYNC;
      strcpy(syncCfg.nodeInfo[index].nodeFqdn, pMnode->pDnode->dnodeEp);
S
slguan 已提交
262 263
      index++;

264
      mgmtDecMnodeRef(pMnode);
S
slguan 已提交
265
    }
S
Shengliang Guan 已提交
266
    sdbFreeIter(pIter);
S
slguan 已提交
267 268 269
  }

  syncCfg.replica = index;
J
jtao1735 已提交
270
  syncCfg.quorum = (syncCfg.replica == 1) ? 1:2;
S
slguan 已提交
271 272 273 274 275 276 277 278 279 280 281 282

  bool hasThisDnode = false;
  for (int32_t i = 0; i < syncCfg.replica; ++i) {
    if (syncCfg.nodeInfo[i].nodeId == dnodeGetDnodeId()) {
      hasThisDnode = true;
      break;
    }
  }

  if (!hasThisDnode) return;
  if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) return;

J
jtao1735 已提交
283
  sdbPrint("work as mnode, replica:%d", syncCfg.replica);
S
slguan 已提交
284
  for (int32_t i = 0; i < syncCfg.replica; ++i) {
J
jtao1735 已提交
285
    sdbPrint("mnode:%d, %s:%d", syncCfg.nodeInfo[i].nodeId, syncCfg.nodeInfo[i].nodeFqdn, syncCfg.nodeInfo[i].nodePort);
S
slguan 已提交
286
  }
S
slguan 已提交
287

guanshengliang's avatar
guanshengliang 已提交
288
  SSyncInfo syncInfo = {0};
S
slguan 已提交
289 290 291
  syncInfo.vgId = 1;
  syncInfo.version = sdbGetVersion();
  syncInfo.syncCfg = syncCfg;
S
slguan 已提交
292
  sprintf(syncInfo.path, "%s", tsMnodeDir);
S
slguan 已提交
293 294 295 296 297 298 299
  syncInfo.ahandle = NULL;
  syncInfo.getWalInfo = sdbGetWalInfo;
  syncInfo.getFileInfo = sdbGetFileInfo;
  syncInfo.writeToCache = sdbWrite;
  syncInfo.confirmForward = sdbConfirmForward; 
  syncInfo.notifyRole = sdbNotifyRole;
  tsSdbObj.cfg = syncCfg;
300
  
S
slguan 已提交
301 302 303 304 305
  if (tsSdbObj.sync) {
    syncReconfig(tsSdbObj.sync, &syncCfg);
  } else {
    tsSdbObj.sync = syncStart(&syncInfo);
  }
306
  sdbUpdateMnodeRoles();
S
slguan 已提交
307 308 309 310 311 312 313 314 315
}

int32_t sdbInit() {
  pthread_mutex_init(&tsSdbObj.mutex, NULL);
  sem_init(&tsSdbObj.sem, 0, 0);

  if (sdbInitWal() != 0) {
    return -1;
  }
S
slguan 已提交
316
  
S
slguan 已提交
317
  sdbRestoreTables();
S
slguan 已提交
318

S
slguan 已提交
319 320 321 322 323 324 325
  if (mgmtGetMnodesNum() == 1) {
    tsSdbObj.role = TAOS_SYNC_ROLE_MASTER;
  }

  sdbUpdateSync();

  tsSdbObj.status = SDB_STATUS_SERVING;
S
slguan 已提交
326
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
327 328
}

S
slguan 已提交
329
void sdbCleanUp() {
S
slguan 已提交
330 331
  if (tsSdbObj.status != SDB_STATUS_SERVING) return;

S
slguan 已提交
332
  tsSdbObj.status = SDB_STATUS_CLOSING;
guanshengliang's avatar
guanshengliang 已提交
333 334 335 336 337 338 339 340 341 342 343
  
  if (tsSdbObj.sync) {
    syncStop(tsSdbObj.sync);
    tsSdbObj.sync = NULL;
  }

  if (tsSdbObj.wal) {
    walClose(tsSdbObj.wal);
    tsSdbObj.wal = NULL;
  }
  
S
slguan 已提交
344 345
  sem_destroy(&tsSdbObj.sem);
  pthread_mutex_destroy(&tsSdbObj.mutex);
H
hzcheng 已提交
346 347
}

348 349 350 351 352 353 354 355
void sdbIncRef(void *handle, void *pObj) {
  if (pObj == NULL) return;

  SSdbTable *pTable = handle;
  int32_t *  pRefCount = (int32_t *)(pObj + pTable->refCountPos);
  atomic_add_fetch_32(pRefCount, 1);
  if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) {
    sdbTrace("table:%s, add ref to record:%s:%d", pTable->tableName, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount);
S
slguan 已提交
356 357 358
  }
}

359 360 361 362 363 364 365 366
void sdbDecRef(void *handle, void *pObj) {
  if (pObj == NULL) return;

  SSdbTable *pTable = handle;
  int32_t *  pRefCount = (int32_t *)(pObj + pTable->refCountPos);
  int32_t    refCount = atomic_sub_fetch_32(pRefCount, 1);
  if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) {
    sdbTrace("table:%s, def ref of record:%s:%d", pTable->tableName, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount);
S
slguan 已提交
367 368
  }

369 370 371 372 373 374 375
  int8_t *updateEnd = pObj + pTable->refCountPos - 1;
  if (refCount <= 0 && *updateEnd) {
    sdbTrace("table:%s, record:%s:%d is destroyed", pTable->tableName, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount);
    SSdbOper oper = {.pObj = pObj};
    (*pTable->destroyFp)(&oper);
  }
}
S
slguan 已提交
376

377 378
static SSdbRow *sdbGetRowMeta(SSdbTable *pTable, void *key) {
  if (pTable == NULL) return NULL;
S
slguan 已提交
379

380
  int32_t keySize = sizeof(int32_t);
381
  if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
382 383
    keySize = strlen((char *)key);
  }
384 385 386
  
  return taosHashGet(pTable->iHandle, key, keySize);
}
S
slguan 已提交
387

388 389
static SSdbRow *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) {
  return sdbGetRowMeta(pTable, sdbGetObjKey(pTable, key));
S
slguan 已提交
390 391
}

H
hzcheng 已提交
392 393
void *sdbGetRow(void *handle, void *key) {
  SSdbTable *pTable = (SSdbTable *)handle;
S
slguan 已提交
394
  SSdbRow * pMeta;
H
hzcheng 已提交
395 396 397 398

  if (handle == NULL) return NULL;

  pthread_mutex_lock(&pTable->mutex);
399 400

  int32_t keySize = sizeof(int32_t);
401
  if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
402 403 404 405
    keySize = strlen((char *)key);
  }
  pMeta = taosHashGet(pTable->iHandle, key, keySize);

S
slguan 已提交
406
  if (pMeta) sdbIncRef(pTable, pMeta->row);
H
hzcheng 已提交
407 408
  pthread_mutex_unlock(&pTable->mutex);

409
  if (pMeta == NULL) return NULL;
H
hzcheng 已提交
410 411 412 413

  return pMeta->row;
}

414 415 416 417
static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) {
  return sdbGetRow(pTable, sdbGetObjKey(pTable, key));
}

S
slguan 已提交
418 419
static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
  SSdbRow rowMeta;
S
slguan 已提交
420 421
  rowMeta.rowSize = pOper->rowSize;
  rowMeta.row = pOper->pObj;
H
hzcheng 已提交
422

S
slguan 已提交
423
  pthread_mutex_lock(&pTable->mutex);
424

425
  void *  key = sdbGetObjKey(pTable, pOper->pObj);
426
  int32_t keySize = sizeof(int32_t);
427 428 429

  if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
    keySize = strlen((char *)key);
430
  }
431 432

  taosHashPut(pTable->iHandle, key, keySize, &rowMeta, sizeof(SSdbRow));
433

S
slguan 已提交
434 435
  sdbIncRef(pTable, pOper->pObj);
  pTable->numOfRows++;
S
slguan 已提交
436 437 438

  if (pTable->keyType == SDB_KEY_AUTO) {
    pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pOper->pObj));
S
slguan 已提交
439 440
  } else {
    pTable->autoIndex++;
S
slguan 已提交
441 442
  }

S
slguan 已提交
443 444
  pthread_mutex_unlock(&pTable->mutex);

S
slguan 已提交
445
  sdbTrace("table:%s, insert record:%s to hash, numOfRows:%d version:%" PRIu64, pTable->tableName,
446
           sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion());
S
slguan 已提交
447 448 449 450

  (*pTable->insertFp)(pOper);
  return TSDB_CODE_SUCCESS;
}
H
hzcheng 已提交
451

S
slguan 已提交
452
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
S
slguan 已提交
453 454
  (*pTable->deleteFp)(pOper);
  
H
hzcheng 已提交
455
  pthread_mutex_lock(&pTable->mutex);
456

457
  void *  key = sdbGetObjKey(pTable, pOper->pObj);
458
  int32_t keySize = sizeof(int32_t);
459 460 461

  if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
    keySize = strlen((char *)key);
462
  }
463 464

  taosHashRemove(pTable->iHandle, key, keySize);
465

S
slguan 已提交
466 467
  pTable->numOfRows--;
  pthread_mutex_unlock(&pTable->mutex);
H
hzcheng 已提交
468

S
slguan 已提交
469
  sdbTrace("table:%s, delete record:%s from hash, numOfRows:%d version:%" PRIu64, pTable->tableName,
470
           sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion());
S
slguan 已提交
471

S
slguan 已提交
472
  int8_t *updateEnd = pOper->pObj + pTable->refCountPos - 1;
S
slguan 已提交
473 474
  *updateEnd = 1;
  sdbDecRef(pTable, pOper->pObj);
S
slguan 已提交
475

S
slguan 已提交
476 477 478
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
479
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) {
S
slguan 已提交
480
  sdbTrace("table:%s, update record:%s in hash, numOfRows:%d version:%" PRIu64, pTable->tableName,
481
           sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion());
S
slguan 已提交
482 483 484 485 486

  (*pTable->updateFp)(pOper);
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
487 488 489 490
static int sdbWrite(void *param, void *data, int type) {
  SWalHead *pHead = data;
  int32_t   tableId = pHead->msgType / 10;
  int32_t   action = pHead->msgType % 10;
S
slguan 已提交
491

S
slguan 已提交
492 493
  SSdbTable *pTable = sdbGetTableFromId(tableId);
  assert(pTable != NULL);
S
slguan 已提交
494

S
slguan 已提交
495 496 497 498 499 500 501 502 503
  pthread_mutex_lock(&tsSdbObj.mutex);
  if (pHead->version == 0) {
     // assign version
    tsSdbObj.version++;
    pHead->version = tsSdbObj.version;
  } else {
    // for data from WAL or forward, version may be smaller
    if (pHead->version <= tsSdbObj.version) {
      pthread_mutex_unlock(&tsSdbObj.mutex);
S
slguan 已提交
504 505 506 507
      if (type == TAOS_QTYPE_FWD && tsSdbObj.sync != NULL) {
        sdbTrace("forward request is received, version:%" PRIu64 " confirm it", pHead->version);
        syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
      }
S
slguan 已提交
508 509 510 511
      return TSDB_CODE_SUCCESS;
    } else if (pHead->version != tsSdbObj.version + 1) {
      pthread_mutex_unlock(&tsSdbObj.mutex);
      sdbError("table:%s, failed to restore %s record:%s from wal, version:%" PRId64 " too large, sdb version:%" PRId64,
512
               pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version,
S
slguan 已提交
513 514 515 516 517
               tsSdbObj.version);
      return TSDB_CODE_OTHERS;
    } else {
      tsSdbObj.version = pHead->version;
    }
S
slguan 已提交
518
  }
S
slguan 已提交
519

S
slguan 已提交
520
  int32_t code = walWrite(tsSdbObj.wal, pHead);
S
slguan 已提交
521
  if (code < 0) {
S
slguan 已提交
522 523
    pthread_mutex_unlock(&tsSdbObj.mutex);
    return code;
S
slguan 已提交
524
  }
S
slguan 已提交
525
  walFsync(tsSdbObj.wal);
S
slguan 已提交
526

S
slguan 已提交
527
  code = sdbForwardToPeer(pHead);
S
slguan 已提交
528
  pthread_mutex_unlock(&tsSdbObj.mutex);
S
slguan 已提交
529

S
slguan 已提交
530
  // from app, oper is created
S
slguan 已提交
531 532 533 534 535 536
  if (param != NULL) {
    //sdbTrace("request from app is disposed, version:%" PRIu64 " code:%s", pHead->version, tstrerror(code));
    return code;
  }
  
  // from wal or forward msg, oper not created, should add into hash
S
slguan 已提交
537
  if (tsSdbObj.sync != NULL) {
S
slguan 已提交
538
    sdbTrace("forward request is received, version:%" PRIu64 " result:%s, confirm it", pHead->version, tstrerror(code));
S
slguan 已提交
539 540
    syncConfirmForward(tsSdbObj.sync, pHead->version, code);
  }
S
slguan 已提交
541 542

  if (action == SDB_ACTION_INSERT) {
S
slguan 已提交
543
    SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
S
slguan 已提交
544
    code = (*pTable->decodeFp)(&oper);
S
slguan 已提交
545
    return sdbInsertHash(pTable, &oper);
S
slguan 已提交
546
  } else if (action == SDB_ACTION_DELETE) {
S
slguan 已提交
547
    SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
S
slguan 已提交
548
    assert(rowMeta != NULL && rowMeta->row != NULL);
S
slguan 已提交
549 550
    SSdbOper oper = {.table = pTable, .pObj = rowMeta->row};
    return sdbDeleteHash(pTable, &oper);
S
slguan 已提交
551
  } else if (action == SDB_ACTION_UPDATE) {
S
slguan 已提交
552
    SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
S
slguan 已提交
553
    assert(rowMeta != NULL && rowMeta->row != NULL);
S
slguan 已提交
554
    SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
S
slguan 已提交
555
    code = (*pTable->decodeFp)(&oper);
S
slguan 已提交
556 557
    return sdbUpdateHash(pTable, &oper);
  } else { return TSDB_CODE_INVALID_MSG_TYPE; }
S
slguan 已提交
558 559
}

S
slguan 已提交
560
int32_t sdbInsertRow(SSdbOper *pOper) {
S
slguan 已提交
561 562 563
  SSdbTable *pTable = (SSdbTable *)pOper->table;
  if (pTable == NULL) return -1;

564 565
  if (sdbGetRowFromObj(pTable, pOper->pObj)) {
    sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, sdbGetKeyStrFromObj(pTable, pOper->pObj));
S
slguan 已提交
566 567
    sdbDecRef(pTable, pOper->pObj);
    return TSDB_CODE_ALREADY_THERE;
S
slguan 已提交
568
  }
S
slguan 已提交
569

S
slguan 已提交
570
  if (pTable->keyType == SDB_KEY_AUTO) {
S
slguan 已提交
571
    pthread_mutex_lock(&pTable->mutex);
S
slguan 已提交
572
    *((uint32_t *)pOper->pObj) = ++pTable->autoIndex;
S
slguan 已提交
573 574 575 576 577

    // let vgId increase from 2
    if (pTable->autoIndex == 1 && strcmp(pTable->tableName, "vgroups") == 0) {
      *((uint32_t *)pOper->pObj) = ++pTable->autoIndex;
    }
S
slguan 已提交
578
    pthread_mutex_unlock(&pTable->mutex);
S
slguan 已提交
579
  }
H
hzcheng 已提交
580

S
slguan 已提交
581
  if (pOper->type == SDB_OPER_GLOBAL) {
S
slguan 已提交
582
    int32_t   size = sizeof(SWalHead) + pTable->maxRowSize;
S
slguan 已提交
583
    SWalHead *pHead = taosAllocateQitem(size);
S
slguan 已提交
584 585 586 587 588 589 590 591
    pHead->version = 0;
    pHead->len = pOper->rowSize;
    pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT;

    pOper->rowData = pHead->cont;
    (*pTable->encodeFp)(pOper);
    pHead->len = pOper->rowSize;

S
slguan 已提交
592
    int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
S
slguan 已提交
593
    taosFreeQitem(pHead);
S
slguan 已提交
594
    if (code < 0) return code;
S
slguan 已提交
595 596 597
  }

  return sdbInsertHash(pTable, pOper);
H
hzcheng 已提交
598 599
}

S
slguan 已提交
600
int32_t sdbDeleteRow(SSdbOper *pOper) {
S
slguan 已提交
601
  SSdbTable *pTable = (SSdbTable *)pOper->table;
H
hzcheng 已提交
602 603
  if (pTable == NULL) return -1;

604
  SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj);
H
hzcheng 已提交
605
  if (pMeta == NULL) {
S
slguan 已提交
606
    sdbTrace("table:%s, record is not there, delete failed", pTable->tableName);
H
hzcheng 已提交
607 608 609
    return -1;
  }

S
slguan 已提交
610
  void * pMetaRow = pMeta->row;
H
hzcheng 已提交
611 612
  assert(pMetaRow != NULL);

S
slguan 已提交
613
  if (pOper->type == SDB_OPER_GLOBAL) {
614 615
    void *  key = sdbGetObjKey(pTable, pOper->pObj);
    int32_t keySize = 0;
S
slguan 已提交
616
    switch (pTable->keyType) {
S
slguan 已提交
617
      case SDB_KEY_STRING:
618 619
      case SDB_KEY_VAR_STRING:
        keySize = strlen((char *)key) + 1;
S
slguan 已提交
620
        break;
S
slguan 已提交
621
      case SDB_KEY_INT:
S
slguan 已提交
622
      case SDB_KEY_AUTO:
623
        keySize = sizeof(uint32_t);
S
slguan 已提交
624 625 626 627
        break;
      default:
        return -1;
    }
S
slguan 已提交
628

629
    int32_t   size = sizeof(SWalHead) + keySize;
S
slguan 已提交
630
    SWalHead *pHead = taosAllocateQitem(size);
S
slguan 已提交
631
    pHead->version = 0;
632
    pHead->len = keySize;
S
slguan 已提交
633
    pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
634
    memcpy(pHead->cont, key, keySize);
H
hzcheng 已提交
635

S
slguan 已提交
636
    int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
S
slguan 已提交
637
    taosFreeQitem(pHead);
S
slguan 已提交
638
    if (code < 0) return code;
S
slguan 已提交
639 640 641
  }

  return sdbDeleteHash(pTable, pOper);
H
hzcheng 已提交
642 643
}

S
slguan 已提交
644
int32_t sdbUpdateRow(SSdbOper *pOper) {
S
slguan 已提交
645 646
  SSdbTable *pTable = (SSdbTable *)pOper->table;
  if (pTable == NULL) return -1;
H
hzcheng 已提交
647

648
  SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj);
H
hzcheng 已提交
649
  if (pMeta == NULL) {
S
slguan 已提交
650
    sdbTrace("table:%s, record is not there, delete failed", pTable->tableName);
H
hzcheng 已提交
651 652 653
    return -1;
  }

S
slguan 已提交
654
  void * pMetaRow = pMeta->row;
H
hzcheng 已提交
655 656
  assert(pMetaRow != NULL);

S
slguan 已提交
657
  if (pOper->type == SDB_OPER_GLOBAL) {
S
slguan 已提交
658
    int32_t   size = sizeof(SWalHead) + pTable->maxRowSize;
S
slguan 已提交
659
    SWalHead *pHead = taosAllocateQitem(size);
S
slguan 已提交
660 661
    pHead->version = 0;
    pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE;
H
hzcheng 已提交
662

S
slguan 已提交
663 664 665 666
    pOper->rowData = pHead->cont;
    (*pTable->encodeFp)(pOper);
    pHead->len = pOper->rowSize;

S
slguan 已提交
667
    int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
S
slguan 已提交
668
    taosFreeQitem(pHead);
S
slguan 已提交
669
    if (code < 0) return code;
S
slguan 已提交
670
  } 
S
slguan 已提交
671
  
S
slguan 已提交
672
  return sdbUpdateHash(pTable, pOper);
S
slguan 已提交
673
}
S
slguan 已提交
674

S
slguan 已提交
675 676 677 678
void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
  SSdbTable *pTable = (SSdbTable *)handle;
  *ppRow = NULL;
  if (pTable == NULL) return NULL;
H
hzcheng 已提交
679

680 681 682 683 684 685 686 687 688 689 690
  SHashMutableIterator *pIter = pNode;
  if (pIter == NULL) {
    pIter = taosHashCreateIter(pTable->iHandle);
  }

  if (!taosHashIterNext(pIter)) {
    taosHashDestroyIter(pIter);
    return NULL;
  }

  SSdbRow *pMeta = taosHashIterGet(pIter);
S
Shengliang Guan 已提交
691 692 693 694
  if (pMeta == NULL) {
    taosHashDestroyIter(pIter);
    return NULL;
  }
S
slguan 已提交
695

S
slguan 已提交
696 697
  *ppRow = pMeta->row;
  sdbIncRef(handle, pMeta->row);
H
hzcheng 已提交
698

699
  return pIter;
S
slguan 已提交
700 701
}

S
Shengliang Guan 已提交
702 703 704 705 706 707
void sdbFreeIter(void *pIter) {
  if (pIter != NULL) {
    taosHashDestroyIter(pIter);
  }
}

S
slguan 已提交
708 709
void *sdbOpenTable(SSdbTableDesc *pDesc) {
  SSdbTable *pTable = (SSdbTable *)calloc(1, sizeof(SSdbTable));
S
slguan 已提交
710
  
S
slguan 已提交
711 712 713 714 715 716 717 718 719 720 721 722 723 724
  if (pTable == NULL) return NULL;

  strcpy(pTable->tableName, pDesc->tableName);
  pTable->keyType      = pDesc->keyType;
  pTable->tableId      = pDesc->tableId;
  pTable->hashSessions = pDesc->hashSessions;
  pTable->maxRowSize   = pDesc->maxRowSize;
  pTable->refCountPos  = pDesc->refCountPos;
  pTable->insertFp     = pDesc->insertFp;
  pTable->deleteFp     = pDesc->deleteFp;
  pTable->updateFp     = pDesc->updateFp;
  pTable->encodeFp     = pDesc->encodeFp;
  pTable->decodeFp     = pDesc->decodeFp;
  pTable->destroyFp    = pDesc->destroyFp;
S
slguan 已提交
725
  pTable->restoredFp   = pDesc->restoredFp;
726 727 728 729

  _hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
  if (pTable->keyType == SDB_KEY_STRING) {
    hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
S
slguan 已提交
730
  }
731
  pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true);
S
slguan 已提交
732 733 734

  pthread_mutex_init(&pTable->mutex, NULL);

S
slguan 已提交
735 736
  tsSdbObj.numOfTables++;
  tsSdbObj.tableList[pTable->tableId] = pTable;
S
slguan 已提交
737
  return pTable;
H
hzcheng 已提交
738 739 740 741 742
}

void sdbCloseTable(void *handle) {
  SSdbTable *pTable = (SSdbTable *)handle;
  if (pTable == NULL) return;
S
slguan 已提交
743
  
S
slguan 已提交
744 745
  tsSdbObj.numOfTables--;
  tsSdbObj.tableList[pTable->tableId] = NULL;
H
hzcheng 已提交
746

747 748 749 750
  SHashMutableIterator *pIter = taosHashCreateIter(pTable->iHandle);
  while (taosHashIterNext(pIter)) {
    SSdbRow *pMeta = taosHashIterGet(pIter);
    if (pMeta == NULL) continue;
S
slguan 已提交
751

S
slguan 已提交
752
    SSdbOper oper = {
753
      .pObj = pMeta->row,
S
slguan 已提交
754 755
      .table = pTable,
    };
756

S
slguan 已提交
757
    (*pTable->destroyFp)(&oper);
H
hzcheng 已提交
758 759
  }

760 761
  taosHashDestroyIter(pIter);
  taosHashCleanup(pTable->iHandle);
H
hzcheng 已提交
762 763

  pthread_mutex_destroy(&pTable->mutex);
S
slguan 已提交
764
  
S
slguan 已提交
765
  sdbTrace("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbObj.numOfTables);
S
slguan 已提交
766
  free(pTable);
H
hzcheng 已提交
767
}
S
slguan 已提交
768