tdbTest.cpp 15.8 KB
Newer Older
1
#include <gtest/gtest.h>
H
Hongze Cheng 已提交
2

H
Hongze Cheng 已提交
3
#define ALLOW_FORBID_FUNC
H
Hongze Cheng 已提交
4
#include "os.h"
H
Hongze Cheng 已提交
5
#include "tdb.h"
H
more  
Hongze Cheng 已提交
6

H
Hongze Cheng 已提交
7
#include <shared_mutex>
H
Hongze Cheng 已提交
8
#include <string>
H
Hongze Cheng 已提交
9 10
#include <thread>
#include <vector>
H
Hongze Cheng 已提交
11

H
Hongze Cheng 已提交
12 13 14 15 16 17 18
typedef struct SPoolMem {
  int64_t          size;
  struct SPoolMem *prev;
  struct SPoolMem *next;
} SPoolMem;

static SPoolMem *openPool() {
H
Hongze Cheng 已提交
19
  SPoolMem *pPool = (SPoolMem *)taosMemoryMalloc(sizeof(*pPool));
H
Hongze Cheng 已提交
20 21 22 23 24 25 26

  pPool->prev = pPool->next = pPool;
  pPool->size = 0;

  return pPool;
}

H
Hongze Cheng 已提交
27
static void clearPool(SPoolMem *pPool) {
H
Hongze Cheng 已提交
28 29 30 31 32 33 34 35 36 37 38
  SPoolMem *pMem;

  do {
    pMem = pPool->next;

    if (pMem == pPool) break;

    pMem->next->prev = pMem->prev;
    pMem->prev->next = pMem->next;
    pPool->size -= pMem->size;

H
Hongze Cheng 已提交
39
    taosMemoryFree(pMem);
H
Hongze Cheng 已提交
40 41 42
  } while (1);

  assert(pPool->size == 0);
H
Hongze Cheng 已提交
43
}
H
Hongze Cheng 已提交
44

H
Hongze Cheng 已提交
45 46
static void closePool(SPoolMem *pPool) {
  clearPool(pPool);
H
Hongze Cheng 已提交
47
  taosMemoryFree(pPool);
H
Hongze Cheng 已提交
48 49
}

H
Hongze Cheng 已提交
50
static void *poolMalloc(void *arg, size_t size) {
H
Hongze Cheng 已提交
51 52 53 54
  void     *ptr = NULL;
  SPoolMem *pPool = (SPoolMem *)arg;
  SPoolMem *pMem;

H
Hongze Cheng 已提交
55
  pMem = (SPoolMem *)taosMemoryMalloc(sizeof(*pMem) + size);
H
Hongze Cheng 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
  if (pMem == NULL) {
    assert(0);
  }

  pMem->size = sizeof(*pMem) + size;
  pMem->next = pPool->next;
  pMem->prev = pPool;

  pPool->next->prev = pMem;
  pPool->next = pMem;
  pPool->size += pMem->size;

  ptr = (void *)(&pMem[1]);
  return ptr;
}

static void poolFree(void *arg, void *ptr) {
  SPoolMem *pPool = (SPoolMem *)arg;
  SPoolMem *pMem;

  pMem = &(((SPoolMem *)ptr)[-1]);

  pMem->next->prev = pMem->prev;
  pMem->prev->next = pMem->next;
  pPool->size -= pMem->size;

H
Hongze Cheng 已提交
82
  taosMemoryFree(pMem);
H
Hongze Cheng 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
}

static int tKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
  int k1, k2;

  std::string s1((char *)pKey1 + 3, kLen1 - 3);
  std::string s2((char *)pKey2 + 3, kLen2 - 3);
  k1 = stoi(s1);
  k2 = stoi(s2);

  if (k1 < k2) {
    return -1;
  } else if (k1 > k2) {
    return 1;
  } else {
    return 0;
  }
}

static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2) {
  int mlen;
  int cret;

  ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL);

  mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2;
  cret = memcmp(pKey1, pKey2, mlen);
  if (cret == 0) {
    if (keyLen1 < keyLen2) {
      cret = -1;
    } else if (keyLen1 > keyLen2) {
      cret = 1;
    } else {
      cret = 0;
    }
  }
  return cret;
}
H
more  
Hongze Cheng 已提交
121

H
Hongze Cheng 已提交
122
TEST(tdb_test, DISABLED_simple_insert1) {
H
Hongze Cheng 已提交
123
  int           ret;
H
Hongze Cheng 已提交
124 125
  TDB          *pEnv;
  TTB          *pDb;
H
Hongze Cheng 已提交
126
  tdb_cmpr_fn_t compFunc;
H
Hongze Cheng 已提交
127
  int           nData = 1000000;
H
Hongze Cheng 已提交
128
  TXN           txn;
H
more  
Hongze Cheng 已提交
129

H
Hongze Cheng 已提交
130 131
  taosRemoveDir("tdb");

H
Hongze Cheng 已提交
132
  // Open Env
H
Hongze Cheng 已提交
133
  ret = tdbOpen("tdb", 4096, 64, &pEnv);
H
Hongze Cheng 已提交
134
  GTEST_ASSERT_EQ(ret, 0);
H
Hongze Cheng 已提交
135

H
Hongze Cheng 已提交
136
  // Create a database
H
more  
Hongze Cheng 已提交
137
  compFunc = tKeyCmpr;
H
Hongze Cheng 已提交
138
  ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
H
Hongze Cheng 已提交
139
  GTEST_ASSERT_EQ(ret, 0);
H
Hongze Cheng 已提交
140

H
Hongze Cheng 已提交
141
  {
H
Hongze Cheng 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
    char      key[64];
    char      val[64];
    int64_t   poolLimit = 4096;  // 1M pool limit
    int64_t   txnid = 0;
    SPoolMem *pPool;

    // open the pool
    pPool = openPool();

    // start a transaction
    txnid++;
    tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
    tdbBegin(pEnv, &txn);

    for (int iData = 1; iData <= nData; iData++) {
      sprintf(key, "key%d", iData);
      sprintf(val, "value%d", iData);
H
Hongze Cheng 已提交
159
      ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
H
Hongze Cheng 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172
      GTEST_ASSERT_EQ(ret, 0);

      // if pool is full, commit the transaction and start a new one
      if (pPool->size >= poolLimit) {
        // commit current transaction
        tdbCommit(pEnv, &txn);
        tdbTxnClose(&txn);

        // start a new transaction
        clearPool(pPool);
        txnid++;
        tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
        tdbBegin(pEnv, &txn);
H
Hongze Cheng 已提交
173 174 175
      }
    }

H
Hongze Cheng 已提交
176 177 178 179
    // commit the transaction
    tdbCommit(pEnv, &txn);
    tdbTxnClose(&txn);

H
Hongze Cheng 已提交
180
    {  // Query the data
H
Hongze Cheng 已提交
181 182 183
      void *pVal = NULL;
      int   vLen;

H
Hongze Cheng 已提交
184 185 186 187
      for (int i = 1; i <= nData; i++) {
        sprintf(key, "key%d", i);
        sprintf(val, "value%d", i);

H
Hongze Cheng 已提交
188
        ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen);
H
Hongze Cheng 已提交
189
        ASSERT(ret == 0);
H
Hongze Cheng 已提交
190 191 192 193 194
        GTEST_ASSERT_EQ(ret, 0);

        GTEST_ASSERT_EQ(vLen, strlen(val));
        GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
      }
H
Hongze Cheng 已提交
195

H
Hongze Cheng 已提交
196
      tdbFree(pVal);
H
Hongze Cheng 已提交
197
    }
H
Hongze Cheng 已提交
198

H
Hongze Cheng 已提交
199
    {  // Iterate to query the DB data
H
Hongze Cheng 已提交
200
      TBC  *pDBC;
H
Hongze Cheng 已提交
201 202 203 204
      void *pKey = NULL;
      void *pVal = NULL;
      int   vLen, kLen;
      int   count = 0;
H
Hongze Cheng 已提交
205

H
Hongze Cheng 已提交
206
      ret = tdbTbcOpen(pDb, &pDBC, NULL);
H
Hongze Cheng 已提交
207 208
      GTEST_ASSERT_EQ(ret, 0);

H
Hongze Cheng 已提交
209
      tdbTbcMoveToFirst(pDBC);
H
Hongze Cheng 已提交
210

H
Hongze Cheng 已提交
211
      for (;;) {
H
Hongze Cheng 已提交
212
        ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
H
Hongze Cheng 已提交
213
        if (ret < 0) break;
H
more  
Hongze Cheng 已提交
214 215 216 217 218

        // std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
        // std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
        // std::cout << std::endl;

H
Hongze Cheng 已提交
219 220 221 222 223
        count++;
      }

      GTEST_ASSERT_EQ(count, nData);

H
Hongze Cheng 已提交
224
      tdbTbcClose(pDBC);
H
Hongze Cheng 已提交
225

H
Hongze Cheng 已提交
226 227
      tdbFree(pKey);
      tdbFree(pVal);
H
Hongze Cheng 已提交
228
    }
H
Hongze Cheng 已提交
229
  }
H
Hongze Cheng 已提交
230

H
Hongze Cheng 已提交
231
  ret = tdbTbDrop(pDb);
H
Hongze Cheng 已提交
232
  GTEST_ASSERT_EQ(ret, 0);
H
Hongze Cheng 已提交
233

H
Hongze Cheng 已提交
234
  // Close a database
H
Hongze Cheng 已提交
235
  tdbTbClose(pDb);
H
Hongze Cheng 已提交
236

H
Hongze Cheng 已提交
237
  // Close Env
H
Hongze Cheng 已提交
238
  ret = tdbClose(pEnv);
H
Hongze Cheng 已提交
239 240 241
  GTEST_ASSERT_EQ(ret, 0);
}

H
Hongze Cheng 已提交
242
TEST(tdb_test, DISABLED_simple_insert2) {
H
Hongze Cheng 已提交
243
  int           ret;
H
Hongze Cheng 已提交
244 245
  TDB          *pEnv;
  TTB          *pDb;
H
Hongze Cheng 已提交
246 247 248
  tdb_cmpr_fn_t compFunc;
  int           nData = 1000000;
  TXN           txn;
H
Hongze Cheng 已提交
249

H
Hongze Cheng 已提交
250 251
  taosRemoveDir("tdb");

H
Hongze Cheng 已提交
252
  // Open Env
H
Hongze Cheng 已提交
253
  ret = tdbOpen("tdb", 1024, 10, &pEnv);
H
Hongze Cheng 已提交
254 255 256
  GTEST_ASSERT_EQ(ret, 0);

  // Create a database
H
Hongze Cheng 已提交
257
  compFunc = tDefaultKeyCmpr;
H
Hongze Cheng 已提交
258
  ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
H
Hongze Cheng 已提交
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
  GTEST_ASSERT_EQ(ret, 0);

  {
    char      key[64];
    char      val[64];
    int64_t   txnid = 0;
    SPoolMem *pPool;

    // open the pool
    pPool = openPool();

    // start a transaction
    txnid++;
    tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
    tdbBegin(pEnv, &txn);

    for (int iData = 1; iData <= nData; iData++) {
      sprintf(key, "key%d", iData);
      sprintf(val, "value%d", iData);
H
Hongze Cheng 已提交
278
      ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
H
Hongze Cheng 已提交
279 280 281 282
      GTEST_ASSERT_EQ(ret, 0);
    }

    {  // Iterate to query the DB data
H
Hongze Cheng 已提交
283
      TBC  *pDBC;
H
Hongze Cheng 已提交
284 285 286 287 288
      void *pKey = NULL;
      void *pVal = NULL;
      int   vLen, kLen;
      int   count = 0;

H
Hongze Cheng 已提交
289
      ret = tdbTbcOpen(pDb, &pDBC, NULL);
H
Hongze Cheng 已提交
290 291
      GTEST_ASSERT_EQ(ret, 0);

H
Hongze Cheng 已提交
292
      tdbTbcMoveToFirst(pDBC);
H
Hongze Cheng 已提交
293

H
Hongze Cheng 已提交
294
      for (;;) {
H
Hongze Cheng 已提交
295
        ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
H
Hongze Cheng 已提交
296 297
        if (ret < 0) break;

H
Hongze Cheng 已提交
298 299 300
        // std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
        // std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
        // std::cout << std::endl;
H
Hongze Cheng 已提交
301 302 303 304 305 306

        count++;
      }

      GTEST_ASSERT_EQ(count, nData);

H
Hongze Cheng 已提交
307
      tdbTbcClose(pDBC);
H
Hongze Cheng 已提交
308

H
Hongze Cheng 已提交
309 310
      tdbFree(pKey);
      tdbFree(pVal);
H
Hongze Cheng 已提交
311 312 313 314 315 316 317
    }
  }

  // commit the transaction
  tdbCommit(pEnv, &txn);
  tdbTxnClose(&txn);

H
Hongze Cheng 已提交
318
  ret = tdbTbDrop(pDb);
H
Hongze Cheng 已提交
319 320 321
  GTEST_ASSERT_EQ(ret, 0);

  // Close a database
H
Hongze Cheng 已提交
322
  tdbTbClose(pDb);
H
Hongze Cheng 已提交
323

H
Hongze Cheng 已提交
324
  // Close Env
H
Hongze Cheng 已提交
325
  ret = tdbClose(pEnv);
H
Hongze Cheng 已提交
326
  GTEST_ASSERT_EQ(ret, 0);
H
Hongze Cheng 已提交
327 328
}

H
Hongze Cheng 已提交
329
TEST(tdb_test, DISABLED_simple_delete1) {
H
Hongze Cheng 已提交
330
  int       ret;
H
Hongze Cheng 已提交
331
  TTB      *pDb;
H
Hongze Cheng 已提交
332 333 334
  char      key[128];
  char      data[128];
  TXN       txn;
H
Hongze Cheng 已提交
335
  TDB      *pEnv;
H
Hongze Cheng 已提交
336 337 338 339
  SPoolMem *pPool;
  void     *pKey = NULL;
  void     *pData = NULL;
  int       nKey;
H
Hongze Cheng 已提交
340
  TBC      *pDbc;
H
Hongze Cheng 已提交
341
  int       nData;
H
Hongze Cheng 已提交
342
  int       nKV = 69;
H
Hongze Cheng 已提交
343 344 345 346 347 348

  taosRemoveDir("tdb");

  pPool = openPool();

  // open env
H
Hongze Cheng 已提交
349
  ret = tdbOpen("tdb", 1024, 256, &pEnv);
H
Hongze Cheng 已提交
350 351 352
  GTEST_ASSERT_EQ(ret, 0);

  // open database
H
Hongze Cheng 已提交
353
  ret = tdbTbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb);
H
Hongze Cheng 已提交
354 355 356 357 358 359 360 361 362
  GTEST_ASSERT_EQ(ret, 0);

  tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
  tdbBegin(pEnv, &txn);

  // loop to insert batch data
  for (int iData = 0; iData < nKV; iData++) {
    sprintf(key, "key%d", iData);
    sprintf(data, "data%d", iData);
H
Hongze Cheng 已提交
363
    ret = tdbTbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
H
Hongze Cheng 已提交
364 365 366 367 368 369 370 371
    GTEST_ASSERT_EQ(ret, 0);
  }

  // query the data
  for (int iData = 0; iData < nKV; iData++) {
    sprintf(key, "key%d", iData);
    sprintf(data, "data%d", iData);

H
Hongze Cheng 已提交
372
    ret = tdbTbGet(pDb, key, strlen(key), &pData, &nData);
H
Hongze Cheng 已提交
373 374 375 376 377
    GTEST_ASSERT_EQ(ret, 0);
    GTEST_ASSERT_EQ(memcmp(data, pData, nData), 0);
  }

  // loop to delete some data
H
Hongze Cheng 已提交
378
  for (int iData = nKV - 1; iData > 30; iData--) {
H
Hongze Cheng 已提交
379 380
    sprintf(key, "key%d", iData);

H
Hongze Cheng 已提交
381
    ret = tdbTbDelete(pDb, key, strlen(key), &txn);
H
Hongze Cheng 已提交
382 383 384 385
    GTEST_ASSERT_EQ(ret, 0);
  }

  // query the data
H
Hongze Cheng 已提交
386 387 388
  for (int iData = 0; iData < nKV; iData++) {
    sprintf(key, "key%d", iData);

H
Hongze Cheng 已提交
389
    ret = tdbTbGet(pDb, key, strlen(key), &pData, &nData);
H
Hongze Cheng 已提交
390 391 392 393 394 395 396 397
    if (iData <= 30) {
      GTEST_ASSERT_EQ(ret, 0);
    } else {
      GTEST_ASSERT_EQ(ret, -1);
    }
  }

  // loop to iterate the data
H
Hongze Cheng 已提交
398
  tdbTbcOpen(pDb, &pDbc, NULL);
H
Hongze Cheng 已提交
399

H
Hongze Cheng 已提交
400
  ret = tdbTbcMoveToFirst(pDbc);
H
Hongze Cheng 已提交
401 402 403 404 405
  GTEST_ASSERT_EQ(ret, 0);

  pKey = NULL;
  pData = NULL;
  for (;;) {
H
Hongze Cheng 已提交
406
    ret = tdbTbcNext(pDbc, &pKey, &nKey, &pData, &nData);
H
Hongze Cheng 已提交
407 408 409 410 411 412 413
    if (ret < 0) break;

    std::cout.write((char *)pKey, nKey) /* << " " << kLen */ << " ";
    std::cout.write((char *)pData, nData) /* << " " << vLen */;
    std::cout << std::endl;
  }

H
Hongze Cheng 已提交
414
  tdbTbcClose(pDbc);
H
Hongze Cheng 已提交
415 416 417 418 419

  tdbCommit(pEnv, &txn);

  closePool(pPool);

H
Hongze Cheng 已提交
420 421
  tdbTbClose(pDb);
  tdbClose(pEnv);
H
Hongze Cheng 已提交
422 423
}

H
Hongze Cheng 已提交
424
TEST(tdb_test, DISABLED_simple_upsert1) {
H
Hongze Cheng 已提交
425
  int       ret;
H
Hongze Cheng 已提交
426 427
  TDB      *pEnv;
  TTB      *pDb;
H
Hongze Cheng 已提交
428 429 430 431 432 433 434 435 436 437
  int       nData = 100000;
  char      key[64];
  char      data[64];
  void     *pData = NULL;
  SPoolMem *pPool;
  TXN       txn;

  taosRemoveDir("tdb");

  // open env
H
Hongze Cheng 已提交
438
  ret = tdbOpen("tdb", 4096, 64, &pEnv);
H
Hongze Cheng 已提交
439 440 441
  GTEST_ASSERT_EQ(ret, 0);

  // open database
H
Hongze Cheng 已提交
442
  ret = tdbTbOpen("db.db", -1, -1, NULL, pEnv, &pDb);
H
Hongze Cheng 已提交
443 444 445 446 447 448 449 450 451 452
  GTEST_ASSERT_EQ(ret, 0);

  pPool = openPool();
  // insert some data
  tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
  tdbBegin(pEnv, &txn);

  for (int iData = 0; iData < nData; iData++) {
    sprintf(key, "key%d", iData);
    sprintf(data, "data%d", iData);
H
Hongze Cheng 已提交
453
    ret = tdbTbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
H
Hongze Cheng 已提交
454 455 456 457 458 459 460
    GTEST_ASSERT_EQ(ret, 0);
  }

  // query the data
  for (int iData = 0; iData < nData; iData++) {
    sprintf(key, "key%d", iData);
    sprintf(data, "data%d", iData);
H
Hongze Cheng 已提交
461
    ret = tdbTbGet(pDb, key, strlen(key), &pData, &nData);
H
Hongze Cheng 已提交
462 463 464 465 466 467 468 469
    GTEST_ASSERT_EQ(ret, 0);
    GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0);
  }

  // upsert some data
  for (int iData = 0; iData < nData; iData++) {
    sprintf(key, "key%d", iData);
    sprintf(data, "data%d-u", iData);
H
Hongze Cheng 已提交
470
    ret = tdbTbUpsert(pDb, key, strlen(key), data, strlen(data), &txn);
H
Hongze Cheng 已提交
471 472 473 474 475 476 477 478 479
    GTEST_ASSERT_EQ(ret, 0);
  }

  tdbCommit(pEnv, &txn);

  // query the data
  for (int iData = 0; iData < nData; iData++) {
    sprintf(key, "key%d", iData);
    sprintf(data, "data%d-u", iData);
H
Hongze Cheng 已提交
480
    ret = tdbTbGet(pDb, key, strlen(key), &pData, &nData);
H
Hongze Cheng 已提交
481 482 483 484
    GTEST_ASSERT_EQ(ret, 0);
    GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0);
  }

H
Hongze Cheng 已提交
485 486
  tdbTbClose(pDb);
  tdbClose(pEnv);
H
Hongze Cheng 已提交
487 488
}

H
Hongze Cheng 已提交
489
TEST(tdb_test, multi_thread_query) {
H
Hongze Cheng 已提交
490 491 492 493
  int           ret;
  TDB          *pEnv;
  TTB          *pDb;
  tdb_cmpr_fn_t compFunc;
H
Hongze Cheng 已提交
494
  int           nData = 1000000;
H
Hongze Cheng 已提交
495 496 497 498 499
  TXN           txn;

  taosRemoveDir("tdb");

  // Open Env
H
Hongze Cheng 已提交
500
  ret = tdbOpen("tdb", 4096, 10, &pEnv);
H
Hongze Cheng 已提交
501 502 503 504 505 506 507 508 509
  GTEST_ASSERT_EQ(ret, 0);

  // Create a database
  compFunc = tKeyCmpr;
  ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
  GTEST_ASSERT_EQ(ret, 0);

  char      key[64];
  char      val[64];
H
Hongze Cheng 已提交
510
  int64_t   poolLimit = 4096 * 20;  // 1M pool limit
H
Hongze Cheng 已提交
511 512 513 514 515 516 517 518
  int64_t   txnid = 0;
  SPoolMem *pPool;

  // open the pool
  pPool = openPool();

  // start a transaction
  txnid++;
H
Hongze Cheng 已提交
519 520 521 522 523
  txn.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED;
  txn.txnId = -1;
  txn.xMalloc = poolMalloc;
  txn.xFree = poolFree;
  txn.xArg = pPool;
H
Hongze Cheng 已提交
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
  // tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, );
  tdbBegin(pEnv, &txn);

  for (int iData = 1; iData <= nData; iData++) {
    sprintf(key, "key%d", iData);
    sprintf(val, "value%d", iData);
    ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
    GTEST_ASSERT_EQ(ret, 0);
  }

  auto f = [](TTB *pDb, int nData) {
    TBC  *pDBC;
    void *pKey = NULL;
    void *pVal = NULL;
    int   vLen, kLen;
    int   count = 0;
    int   ret;
    TXN   txn;

    SPoolMem *pPool = openPool();
H
Hongze Cheng 已提交
544 545 546 547 548
    txn.flags = 0;
    txn.txnId = 0;
    txn.xMalloc = poolMalloc;
    txn.xFree = poolFree;
    txn.xArg = pPool;
H
Hongze Cheng 已提交
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600

    ret = tdbTbcOpen(pDb, &pDBC, &txn);
    GTEST_ASSERT_EQ(ret, 0);

    tdbTbcMoveToFirst(pDBC);

    for (;;) {
      ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
      if (ret < 0) break;

      // std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
      // std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
      // std::cout << std::endl;

      count++;
    }

    GTEST_ASSERT_EQ(count, nData);

    tdbTbcClose(pDBC);

    tdbFree(pKey);
    tdbFree(pVal);
  };

  // tdbCommit(pEnv, &txn);

  // multi-thread query
  int                      nThreads = 20;
  std::vector<std::thread> threads;
  for (int i = 0; i < nThreads; i++) {
    if (i == 0) {
      threads.push_back(std::thread(tdbCommit, pEnv, &txn));
    } else {
      threads.push_back(std::thread(f, pDb, nData));
    }
  }

  for (auto &th : threads) {
    th.join();
  }

  // commit the transaction
  tdbCommit(pEnv, &txn);
  tdbTxnClose(&txn);

  // Close a database
  tdbTbClose(pDb);

  // Close Env
  ret = tdbClose(pEnv);
  GTEST_ASSERT_EQ(ret, 0);
H
Hongze Cheng 已提交
601 602
}

H
Hongze Cheng 已提交
603
TEST(tdb_test, DISABLED_multi_thread1) {
H
Hongze Cheng 已提交
604
#if 0
H
Hongze Cheng 已提交
605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727
  int           ret;
  TDB          *pDb;
  TTB          *pTb;
  tdb_cmpr_fn_t compFunc;
  int           nData = 10000000;
  TXN           txn;

  std::shared_timed_mutex mutex;

  taosRemoveDir("tdb");

  // Open Env
  ret = tdbOpen("tdb", 512, 1, &pDb);
  GTEST_ASSERT_EQ(ret, 0);

  ret = tdbTbOpen("db.db", -1, -1, NULL, pDb, &pTb);
  GTEST_ASSERT_EQ(ret, 0);

  auto insert = [](TDB *pDb, TTB *pTb, int nData, int *stop, std::shared_timed_mutex *mu) {
    TXN       txn = {0};
    char      key[128];
    char      val[128];
    SPoolMem *pPool = openPool();

    txn.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED;
    txn.txnId = -1;
    txn.xMalloc = poolMalloc;
    txn.xFree = poolFree;
    txn.xArg = pPool;
    tdbBegin(pDb, &txn);
    for (int iData = 1; iData <= nData; iData++) {
      sprintf(key, "key%d", iData);
      sprintf(val, "value%d", iData);
      {
        std::lock_guard<std::shared_timed_mutex> wmutex(*mu);

        int ret = tdbTbInsert(pTb, key, strlen(key), val, strlen(val), &txn);

        GTEST_ASSERT_EQ(ret, 0);
      }

      if (pPool->size > 1024 * 1024) {
        tdbCommit(pDb, &txn);

        clearPool(pPool);
        tdbBegin(pDb, &txn);
      }
    }

    tdbCommit(pDb, &txn);
    closePool(pPool);

    *stop = 1;
  };

  auto query = [](TTB *pTb, int *stop, std::shared_timed_mutex *mu) {
    TBC  *pDBC;
    void *pKey = NULL;
    void *pVal = NULL;
    int   vLen, kLen;
    int   ret;
    TXN   txn;

    SPoolMem *pPool = openPool();
    txn.flags = 0;
    txn.txnId = 0;
    txn.xMalloc = poolMalloc;
    txn.xFree = poolFree;
    txn.xArg = pPool;

    for (;;) {
      if (*stop) break;

      clearPool(pPool);
      int count = 0;
      {
        std::shared_lock<std::shared_timed_mutex> rMutex(*mu);

        ret = tdbTbcOpen(pTb, &pDBC, &txn);
        GTEST_ASSERT_EQ(ret, 0);

        tdbTbcMoveToFirst(pDBC);

        for (;;) {
          ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
          if (ret < 0) break;
          count++;
        }

        std::cout << count << std::endl;

        tdbTbcClose(pDBC);
      }

      usleep(500000);
    }

    closePool(pPool);
    tdbFree(pKey);
    tdbFree(pVal);
  };

  std::vector<std::thread> threads;
  int                      nThreads = 10;
  int                      stop = 0;
  for (int i = 0; i < nThreads; i++) {
    if (i == 0) {
      threads.push_back(std::thread(insert, pDb, pTb, nData, &stop, &mutex));
    } else {
      threads.push_back(std::thread(query, pTb, &stop, &mutex));
    }
  }

  for (auto &th : threads) {
    th.join();
  }

  // Close a database
  tdbTbClose(pTb);

  // Close Env
  ret = tdbClose(pDb);
  GTEST_ASSERT_EQ(ret, 0);
H
Hongze Cheng 已提交
728
#endif
H
Hongze Cheng 已提交
729
}