mgmtSdb.c 16.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
17
#include "os.h"
S
slguan 已提交
18
#include "taoserror.h"
S
slguan 已提交
19 20
#include "tlog.h"
#include "trpc.h"
S
slguan 已提交
21
#include "tqueue.h"
S
slguan 已提交
22
#include "twal.h"
S
slguan 已提交
23 24
#include "hashint.h"
#include "hashstr.h"
S
slguan 已提交
25
#include "mpeer.h"
S
slguan 已提交
26
#include "mgmtSdb.h"
H
hzcheng 已提交
27

S
slguan 已提交
28
typedef struct _SSdbTable {
S
slguan 已提交
29 30
  char        tableName[TSDB_DB_NAME_LEN + 1];
  ESdbTable   tableId;
S
slguan 已提交
31 32 33
  ESdbKeyType keyType;
  int32_t     hashSessions;
  int32_t     maxRowSize;
S
slguan 已提交
34
  int32_t     refCountPos;
S
slguan 已提交
35
  int32_t     autoIndex;
S
slguan 已提交
36 37
  int64_t     numOfRows;
  void *      iHandle;
S
slguan 已提交
38 39 40 41 42 43 44
  int32_t   (*insertFp)(SSdbOperDesc *pDesc);
  int32_t   (*deleteFp)(SSdbOperDesc *pOper);
  int32_t   (*updateFp)(SSdbOperDesc *pOper);
  int32_t   (*decodeFp)(SSdbOperDesc *pOper);
  int32_t   (*encodeFp)(SSdbOperDesc *pOper);
  int32_t   (*destroyFp)(SSdbOperDesc *pOper);
  int32_t   (*restoredFp)();
S
slguan 已提交
45 46 47 48
  pthread_mutex_t mutex;
} SSdbTable;

typedef struct {
S
slguan 已提交
49
  int32_t rowSize;
S
slguan 已提交
50 51 52
  void *  row;
} SRowMeta;

S
slguan 已提交
53
typedef enum {
S
slguan 已提交
54 55 56 57
  SDB_ACTION_INSERT,
  SDB_ACTION_DELETE,
  SDB_ACTION_UPDATE
} ESdbActionType;
H
hzcheng 已提交
58

S
slguan 已提交
59 60
static SSdbTable *tsSdbTableList[SDB_TABLE_MAX] = {0};
static int32_t    tsSdbNumOfTables = 0;
S
slguan 已提交
61
static SSdbObject * tsSdbObj;
H
hzcheng 已提交
62

S
slguan 已提交
63 64 65 66 67 68
static void *(*sdbInitIndexFp[])(int32_t maxRows, int32_t dataSize) = {sdbOpenStrHash, sdbOpenIntHash, sdbOpenIntHash};
static void *(*sdbAddIndexFp[])(void *handle, void *key, void *data) = {sdbAddStrHash, sdbAddIntHash, sdbAddIntHash};
static void  (*sdbDeleteIndexFp[])(void *handle, void *key) = {sdbDeleteStrHash, sdbDeleteIntHash, sdbDeleteIntHash};
static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, sdbGetIntHashData, sdbGetIntHashData};
static void  (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash};
static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData};
S
slguan 已提交
69

S
slguan 已提交
70
uint64_t sdbGetVersion() { return tsSdbObj->version; }
S
slguan 已提交
71
int32_t  sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; }
S
slguan 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84
int64_t  sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; }

static char *sdbGetActionStr(int32_t action) {
  switch (action) {
    case SDB_ACTION_INSERT:
      return "insert";
    case SDB_ACTION_DELETE:
      return "delete";
    case SDB_ACTION_UPDATE:
      return "update";
  }
  return "invalid";
}
S
slguan 已提交
85

S
slguan 已提交
86 87 88
static char *sdbGetkeyStr(SSdbTable *pTable, void *row) {
  static char str[16];
  switch (pTable->keyType) {
S
slguan 已提交
89
    case SDB_KEY_STRING:
S
slguan 已提交
90
      return (char *)row;
S
slguan 已提交
91
    case SDB_KEY_INT:
S
slguan 已提交
92
    case SDB_KEY_AUTO:
S
slguan 已提交
93 94 95
      sprintf(str, "%d", *(int32_t *)row);
      return str;
    default:
S
slguan 已提交
96
      return "invalid";
S
slguan 已提交
97 98 99
  }
}

S
slguan 已提交
100 101
static void *sdbGetTableFromId(int32_t tableId) {
  return tsSdbTableList[tableId];
S
slguan 已提交
102 103
}

S
slguan 已提交
104
int32_t sdbInit() {
S
slguan 已提交
105 106
  tsSdbObj = calloc(1, sizeof(SSdbObject));
  pthread_mutex_init(&tsSdbObj->mutex, NULL);
S
slguan 已提交
107 108

  SWalCfg walCfg = {.commitLog = 2, .wals = 2, .keep = 1};
S
slguan 已提交
109 110
  tsSdbObj->wal = walOpen(tsMnodeDir, &walCfg);
  if (tsSdbObj->wal == NULL) {
S
slguan 已提交
111
    sdbError("failed to open sdb in %s", tsMnodeDir);
H
hzcheng 已提交
112 113 114
    return -1;
  }

S
slguan 已提交
115
  sdbTrace("open sdb file for read");
S
slguan 已提交
116
  walRestore(tsSdbObj->wal, tsSdbObj, sdbProcessWrite);
H
hzcheng 已提交
117

S
slguan 已提交
118 119
  int32_t totalRows = 0;
  int32_t numOfTables = 0;
S
slguan 已提交
120
  for (int32_t tableId = SDB_TABLE_DNODE; tableId < SDB_TABLE_MAX; ++tableId) {
S
slguan 已提交
121 122
    SSdbTable *pTable = sdbGetTableFromId(tableId);
    if (pTable == NULL) continue;
S
slguan 已提交
123 124
    if (pTable->restoredFp) {
      (*pTable->restoredFp)();
H
hzcheng 已提交
125 126
    }

S
slguan 已提交
127 128 129
    totalRows += pTable->numOfRows;
    numOfTables++;
    sdbTrace("table:%s, is initialized, numOfRows:%d", pTable->tableName, pTable->numOfRows);
S
slguan 已提交
130
  }
S
slguan 已提交
131

S
slguan 已提交
132
  sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbObj->version, totalRows, numOfTables);
S
slguan 已提交
133
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
134 135
}

S
slguan 已提交
136
void sdbCleanUp() {
S
slguan 已提交
137 138 139 140 141
  if (tsSdbObj) {
    pthread_mutex_destroy(&tsSdbObj->mutex);
    walClose(tsSdbObj->wal);
    free(tsSdbObj);
    tsSdbObj = NULL;
S
slguan 已提交
142
  }
H
hzcheng 已提交
143 144
}

S
slguan 已提交
145 146 147 148
SSdbObject *sdbGetObj() {
  return tsSdbObj;
}

S
slguan 已提交
149 150 151
void sdbIncRef(void *handle, void *pRow) {
  if (pRow) {
    SSdbTable *pTable = handle;
S
slguan 已提交
152
    int32_t *  pRefCount = (int32_t *)(pRow + pTable->refCountPos);
S
slguan 已提交
153
    atomic_add_fetch_32(pRefCount, 1);
S
slguan 已提交
154 155 156 157
    if (0 && strcmp(pTable->tableName, "dnodes") == 0) {
      sdbTrace("table:%s, add ref to record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow),
               *pRefCount);
    }
S
slguan 已提交
158 159 160 161 162 163
  }
}

void sdbDecRef(void *handle, void *pRow) {
  if (pRow) {
    SSdbTable *pTable = handle;
S
slguan 已提交
164 165 166 167 168 169 170
    int32_t *  pRefCount = (int32_t *)(pRow + pTable->refCountPos);
    int32_t    refCount = atomic_sub_fetch_32(pRefCount, 1);
    if (0 && strcmp(pTable->tableName, "dnodes") == 0) {
      sdbTrace("table:%s, def ref of record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow),
               *pRefCount);
    }
    int8_t *updateEnd = pRow + pTable->refCountPos - 1;
S
slguan 已提交
171
    if (refCount <= 0 && *updateEnd) {
S
slguan 已提交
172 173
      sdbTrace("table:%s, record:%s:%s:%d is destroyed", pTable->tableName, pTable->tableName,
               sdbGetkeyStr(pTable, pRow), *pRefCount);
S
slguan 已提交
174 175
      SSdbOperDesc oper = {.pObj = pRow};
      (*pTable->destroyFp)(&oper);
S
slguan 已提交
176 177 178 179
    }
  }
}

S
slguan 已提交
180 181 182 183 184 185 186 187 188 189 190
static SRowMeta *sdbGetRowMeta(void *handle, void *key) {
  SSdbTable *pTable = (SSdbTable *)handle;
  SRowMeta * pMeta;

  if (handle == NULL) return NULL;

  pMeta = (*sdbGetIndexFp[pTable->keyType])(pTable->iHandle, key);

  return pMeta;
}

H
hzcheng 已提交
191 192 193 194 195 196 197 198
void *sdbGetRow(void *handle, void *key) {
  SSdbTable *pTable = (SSdbTable *)handle;
  SRowMeta * pMeta;

  if (handle == NULL) return NULL;

  pthread_mutex_lock(&pTable->mutex);
  pMeta = (*sdbGetIndexFp[pTable->keyType])(pTable->iHandle, key);
S
slguan 已提交
199
  if (pMeta) sdbIncRef(pTable, pMeta->row);
H
hzcheng 已提交
200 201
  pthread_mutex_unlock(&pTable->mutex);

S
slguan 已提交
202 203 204
  if (pMeta == NULL) {
    return NULL;
  }
H
hzcheng 已提交
205 206 207 208

  return pMeta->row;
}

S
slguan 已提交
209
static int32_t sdbInsertLocal(SSdbTable *pTable, SSdbOperDesc *pOper) {
S
slguan 已提交
210 211 212
  SRowMeta rowMeta;
  rowMeta.rowSize = pOper->rowSize;
  rowMeta.row = pOper->pObj;
H
hzcheng 已提交
213

S
slguan 已提交
214 215 216 217 218 219
  pthread_mutex_lock(&pTable->mutex);
  (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj, &rowMeta);
  sdbIncRef(pTable, pOper->pObj);
  pTable->numOfRows++;
  pthread_mutex_unlock(&pTable->mutex);

S
slguan 已提交
220 221
  sdbTrace("table:%s, insert record:%s, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj),
           pTable->numOfRows);
S
slguan 已提交
222 223 224 225

  (*pTable->insertFp)(pOper);
  return TSDB_CODE_SUCCESS;
}
H
hzcheng 已提交
226

S
slguan 已提交
227
static int32_t sdbDeleteLocal(SSdbTable *pTable, SSdbOperDesc *pOper) {
H
hzcheng 已提交
228
  pthread_mutex_lock(&pTable->mutex);
S
slguan 已提交
229 230 231
  (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj);
  pTable->numOfRows--;
  pthread_mutex_unlock(&pTable->mutex);
H
hzcheng 已提交
232

S
slguan 已提交
233 234
  sdbTrace("table:%s, delete record:%s, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj),
           pTable->numOfRows);
S
slguan 已提交
235 236

  (*pTable->deleteFp)(pOper);
S
slguan 已提交
237
  int8_t *updateEnd = pOper->pObj + pTable->refCountPos - 1;
S
slguan 已提交
238 239
  *updateEnd = 1;
  sdbDecRef(pTable, pOper->pObj);
S
slguan 已提交
240

S
slguan 已提交
241 242 243
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
244 245 246
static int32_t sdbUpdateLocal(SSdbTable *pTable, SSdbOperDesc *pOper) {
  sdbTrace("table:%s, update record:%s, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj),
           pTable->numOfRows);
S
slguan 已提交
247 248 249 250 251

  (*pTable->updateFp)(pOper);
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
252 253
static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_t action) {
  int32_t code = 0;
S
slguan 已提交
254

S
slguan 已提交
255 256 257
  pthread_mutex_lock(&tsSdbObj->mutex);
  tsSdbObj->version++;
  pHead->version = tsSdbObj->version;
S
slguan 已提交
258

S
slguan 已提交
259
  code = mpeerForwardReqToPeer(pHead);
S
slguan 已提交
260
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
261
    pthread_mutex_unlock(&tsSdbObj->mutex);
S
slguan 已提交
262 263 264 265
    sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName,
             sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, tstrerror(code));
    return code;
  }
S
slguan 已提交
266

S
slguan 已提交
267 268
  code = walWrite(tsSdbObj->wal, pHead);
  pthread_mutex_unlock(&tsSdbObj->mutex);
S
slguan 已提交
269

S
slguan 已提交
270 271 272
  if (code < 0) {
    sdbError("table:%s, failed to %s record:%s to file, version:%" PRId64 ", reason:%s", pTable->tableName,
             sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, tstrerror(code));
S
slguan 已提交
273
  } else {
S
slguan 已提交
274 275 276
    sdbTrace("table:%s, success to %s record:%s to file, version:%" PRId64, pTable->tableName, sdbGetActionStr(action),
             sdbGetkeyStr(pTable, pHead->cont), pHead->version);
  }
S
slguan 已提交
277

S
slguan 已提交
278
  walFsync(tsSdbObj->wal);
S
slguan 已提交
279
  taosFreeQitem(pHead);
S
slguan 已提交
280 281
  return code;
}
S
slguan 已提交
282

S
slguan 已提交
283
static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_t action) {
S
slguan 已提交
284 285 286
  pthread_mutex_lock(&tsSdbObj->mutex);
  if (pHead->version <= tsSdbObj->version) {
    pthread_mutex_unlock(&tsSdbObj->mutex);
S
slguan 已提交
287
    return TSDB_CODE_SUCCESS;
S
slguan 已提交
288 289
  } else if (pHead->version != tsSdbObj->version + 1) {
    pthread_mutex_unlock(&tsSdbObj->mutex);
S
slguan 已提交
290 291
    sdbError("table:%s, failed to restore %s record:%s from file, version:%" PRId64 " too large, sdb version:%" PRId64,
             pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version,
S
slguan 已提交
292
             tsSdbObj->version);
S
slguan 已提交
293 294 295
    return TSDB_CODE_OTHERS;
  }

S
slguan 已提交
296
  tsSdbObj->version = pHead->version;
S
slguan 已提交
297 298 299 300 301 302
  sdbTrace("table:%s, success to restore %s record:%s from file, version:%" PRId64, pTable->tableName,
           sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version);

  int32_t code = -1;
  if (action == SDB_ACTION_INSERT) {
    SSdbOperDesc oper = {
S
slguan 已提交
303 304 305
        .rowSize = pHead->len,
        .rowData = pHead->cont,
        .table = pTable,
S
slguan 已提交
306 307 308 309 310
    };
    code = (*pTable->decodeFp)(&oper);
    if (code < 0) {
      sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName,
               sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version);
S
slguan 已提交
311
      pthread_mutex_unlock(&tsSdbObj->mutex);
S
slguan 已提交
312 313 314 315 316 317 318 319 320
      return code;
    }

    code = sdbInsertLocal(pTable, &oper);
  } else if (action == SDB_ACTION_DELETE) {
    SRowMeta *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
    assert(rowMeta != NULL && rowMeta->row != NULL);

    SSdbOperDesc oper = {
S
slguan 已提交
321 322
        .table = pTable,
        .pObj = rowMeta->row,
S
slguan 已提交
323
    };
S
slguan 已提交
324

S
slguan 已提交
325 326 327 328
    code = sdbDeleteLocal(pTable, &oper);
  } else if (action == SDB_ACTION_UPDATE) {
    SRowMeta *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
    assert(rowMeta != NULL && rowMeta->row != NULL);
S
slguan 已提交
329

S
slguan 已提交
330
    SSdbOperDesc oper1 = {
S
slguan 已提交
331 332
        .table = pTable,
        .pObj = rowMeta->row,
S
slguan 已提交
333 334 335 336
    };
    sdbDeleteLocal(pTable, &oper1);

    SSdbOperDesc oper2 = {
S
slguan 已提交
337 338 339
        .rowSize = pHead->len,
        .rowData = pHead->cont,
        .table = pTable,
S
slguan 已提交
340 341 342 343 344
    };
    code = (*pTable->decodeFp)(&oper2);
    if (code < 0) {
      sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName,
               sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version);
S
slguan 已提交
345
      pthread_mutex_unlock(&tsSdbObj->mutex);
S
slguan 已提交
346
      return code;
S
slguan 已提交
347
    }
S
slguan 已提交
348 349 350
    code = sdbInsertLocal(pTable, &oper2);
  }

S
slguan 已提交
351
  pthread_mutex_unlock(&tsSdbObj->mutex);
S
slguan 已提交
352 353 354
  return code;
}

S
slguan 已提交
355
int sdbProcessWrite(void *param, void *data, int type) {
S
slguan 已提交
356 357 358 359 360 361 362 363 364 365 366
  SWalHead *pHead = data;
  int32_t   tableId = pHead->msgType / 10;
  int32_t   action = pHead->msgType % 10;

  SSdbTable *pTable = sdbGetTableFromId(tableId);
  assert(pTable != NULL);

  if (pHead->version == 0) {
    return sdbProcessWriteFromApp(pTable, pHead, action);
  } else {
    return sdbProcessWriteFromWal(pTable, pHead, action);
S
slguan 已提交
367
  }
S
slguan 已提交
368
}
H
hzcheng 已提交
369

S
slguan 已提交
370 371 372 373 374 375 376 377
int32_t sdbInsertRow(SSdbOperDesc *pOper) {
  SSdbTable *pTable = (SSdbTable *)pOper->table;
  if (pTable == NULL) return -1;

  if (sdbGetRow(pTable, pOper->pObj)) {
    sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj));
    sdbDecRef(pTable, pOper->pObj);
    return TSDB_CODE_ALREADY_THERE;
S
slguan 已提交
378
  }
S
slguan 已提交
379

S
slguan 已提交
380
  if (pTable->keyType == SDB_KEY_AUTO) {
S
slguan 已提交
381
    pthread_mutex_lock(&pTable->mutex);
S
slguan 已提交
382
    *((uint32_t *)pOper->pObj) = ++pTable->autoIndex;
S
slguan 已提交
383 384 385 386 387

    // let vgId increase from 2
    if (pTable->autoIndex == 1 && strcmp(pTable->tableName, "vgroups") == 0) {
      *((uint32_t *)pOper->pObj) = ++pTable->autoIndex;
    }
S
slguan 已提交
388
    pthread_mutex_unlock(&pTable->mutex);
S
slguan 已提交
389
  }
H
hzcheng 已提交
390

S
slguan 已提交
391
  if (pOper->type == SDB_OPER_GLOBAL) {
S
slguan 已提交
392
    int32_t   size = sizeof(SWalHead) + pTable->maxRowSize;
S
slguan 已提交
393
    SWalHead *pHead = taosAllocateQitem(size);
S
slguan 已提交
394 395 396 397 398 399 400 401
    pHead->version = 0;
    pHead->len = pOper->rowSize;
    pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT;

    pOper->rowData = pHead->cont;
    (*pTable->encodeFp)(pOper);
    pHead->len = pOper->rowSize;

S
slguan 已提交
402
    int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType);
S
slguan 已提交
403 404
    if (code < 0) return code;
  } 
S
slguan 已提交
405
  
S
slguan 已提交
406
  return sdbInsertLocal(pTable, pOper);
H
hzcheng 已提交
407 408 409
}

// row here can be object or null-terminated string
S
slguan 已提交
410 411
int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
  SSdbTable *pTable = (SSdbTable *)pOper->table;
H
hzcheng 已提交
412 413
  if (pTable == NULL) return -1;

S
slguan 已提交
414
  SRowMeta *pMeta = sdbGetRowMeta(pTable, pOper->pObj);
H
hzcheng 已提交
415
  if (pMeta == NULL) {
S
slguan 已提交
416
    sdbTrace("table:%s, record is not there, delete failed", pTable->tableName);
H
hzcheng 已提交
417 418 419
    return -1;
  }

S
slguan 已提交
420
  void * pMetaRow = pMeta->row;
H
hzcheng 已提交
421 422
  assert(pMetaRow != NULL);

S
slguan 已提交
423
  if (pOper->type == SDB_OPER_GLOBAL) {
S
slguan 已提交
424 425
    int32_t rowSize = 0;
    switch (pTable->keyType) {
S
slguan 已提交
426
      case SDB_KEY_STRING:
S
slguan 已提交
427 428
        rowSize = strlen((char *)pOper->pObj) + 1;
        break;
S
slguan 已提交
429
      case SDB_KEY_INT:
S
slguan 已提交
430
      case SDB_KEY_AUTO:
S
slguan 已提交
431 432 433 434 435
        rowSize = sizeof(uint64_t);
        break;
      default:
        return -1;
    }
S
slguan 已提交
436

S
slguan 已提交
437
    int32_t   size = sizeof(SWalHead) + rowSize;
S
slguan 已提交
438
    SWalHead *pHead = taosAllocateQitem(size);
S
slguan 已提交
439 440 441 442
    pHead->version = 0;
    pHead->len = rowSize;
    pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
    memcpy(pHead->cont, pOper->pObj, rowSize);
H
hzcheng 已提交
443

S
slguan 已提交
444
    int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType);
S
slguan 已提交
445 446 447 448
    if (code < 0) return code;
  } 
  
  return sdbDeleteLocal(pTable, pOper);
H
hzcheng 已提交
449 450
}

S
slguan 已提交
451 452 453
int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
  SSdbTable *pTable = (SSdbTable *)pOper->table;
  if (pTable == NULL) return -1;
H
hzcheng 已提交
454

S
slguan 已提交
455
  SRowMeta *pMeta = sdbGetRowMeta(pTable, pOper->pObj);
H
hzcheng 已提交
456
  if (pMeta == NULL) {
S
slguan 已提交
457
    sdbTrace("table:%s, record is not there, delete failed", pTable->tableName);
H
hzcheng 已提交
458 459 460
    return -1;
  }

S
slguan 已提交
461
  void * pMetaRow = pMeta->row;
H
hzcheng 已提交
462 463
  assert(pMetaRow != NULL);

S
slguan 已提交
464
  if (pOper->type == SDB_OPER_GLOBAL) {
S
slguan 已提交
465
    int32_t   size = sizeof(SWalHead) + pTable->maxRowSize;
S
slguan 已提交
466
    SWalHead *pHead = taosAllocateQitem(size);
S
slguan 已提交
467 468
    pHead->version = 0;
    pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE;
H
hzcheng 已提交
469

S
slguan 已提交
470 471 472 473
    pOper->rowData = pHead->cont;
    (*pTable->encodeFp)(pOper);
    pHead->len = pOper->rowSize;

S
slguan 已提交
474
    int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType);
S
slguan 已提交
475
    if (code < 0) return code;
S
slguan 已提交
476
  }
S
slguan 已提交
477
  
S
slguan 已提交
478 479
  return sdbUpdateLocal(pTable, pOper);
}
S
slguan 已提交
480

S
slguan 已提交
481 482 483
void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
  SSdbTable *pTable = (SSdbTable *)handle;
  SRowMeta * pMeta;
S
slguan 已提交
484

S
slguan 已提交
485 486
  *ppRow = NULL;
  if (pTable == NULL) return NULL;
H
hzcheng 已提交
487

S
slguan 已提交
488 489
  pNode = (*sdbFetchRowFp[pTable->keyType])(pTable->iHandle, pNode, (void **)&pMeta);
  if (pMeta == NULL) return NULL;
S
slguan 已提交
490

S
slguan 已提交
491 492
  *ppRow = pMeta->row;
  sdbIncRef(handle, pMeta->row);
H
hzcheng 已提交
493

S
slguan 已提交
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
  return pNode;
}

void *sdbOpenTable(SSdbTableDesc *pDesc) {
  SSdbTable *pTable = (SSdbTable *)calloc(1, sizeof(SSdbTable));
  if (pTable == NULL) return NULL;

  strcpy(pTable->tableName, pDesc->tableName);
  pTable->keyType      = pDesc->keyType;
  pTable->tableId      = pDesc->tableId;
  pTable->hashSessions = pDesc->hashSessions;
  pTable->maxRowSize   = pDesc->maxRowSize;
  pTable->refCountPos  = pDesc->refCountPos;
  pTable->insertFp     = pDesc->insertFp;
  pTable->deleteFp     = pDesc->deleteFp;
  pTable->updateFp     = pDesc->updateFp;
  pTable->encodeFp     = pDesc->encodeFp;
  pTable->decodeFp     = pDesc->decodeFp;
  pTable->destroyFp    = pDesc->destroyFp;
S
slguan 已提交
513
  pTable->restoredFp   = pDesc->restoredFp;
S
slguan 已提交
514 515 516 517 518 519 520 521 522 523
  
  if (sdbInitIndexFp[pTable->keyType] != NULL) {
    pTable->iHandle = (*sdbInitIndexFp[pTable->keyType])(pTable->maxRowSize, sizeof(SRowMeta));
  }

  pthread_mutex_init(&pTable->mutex, NULL);

  tsSdbNumOfTables++;
  tsSdbTableList[pTable->tableId] = pTable;
  return pTable;
H
hzcheng 已提交
524 525 526 527 528
}

void sdbCloseTable(void *handle) {
  SSdbTable *pTable = (SSdbTable *)handle;
  if (pTable == NULL) return;
S
slguan 已提交
529 530 531
  
  tsSdbNumOfTables--;
  tsSdbTableList[pTable->tableId] = NULL;
H
hzcheng 已提交
532

S
slguan 已提交
533
  void *pNode = NULL;
H
hzcheng 已提交
534
  while (1) {
S
slguan 已提交
535
    SRowMeta *pMeta;
536 537
    pNode = (*sdbFetchRowFp[pTable->keyType])(pTable->iHandle, pNode, (void **)&pMeta);
    if (pMeta == NULL) break;
S
slguan 已提交
538 539

    SSdbOperDesc oper = {
540
      .pObj = pMeta->row,
S
slguan 已提交
541 542
      .table = pTable,
    };
543

S
slguan 已提交
544
    (*pTable->destroyFp)(&oper);
H
hzcheng 已提交
545 546
  }

S
slguan 已提交
547 548 549
  if (sdbCleanUpIndexFp[pTable->keyType]) {
    (*sdbCleanUpIndexFp[pTable->keyType])(pTable->iHandle);
  }
H
hzcheng 已提交
550 551 552

  pthread_mutex_destroy(&pTable->mutex);

S
slguan 已提交
553
  sdbTrace("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbNumOfTables);
S
slguan 已提交
554
  free(pTable);
H
hzcheng 已提交
555
}