tlinearhash.c 14.1 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tlinearhash.h"
S
Shengliang Guan 已提交
17
#include "tdef.h"
H
Haojun Liao 已提交
18 19 20 21 22 23 24 25 26 27 28
#include "taoserror.h"
#include "tpagedbuf.h"

#define LHASH_CAP_RATIO 0.85

// Always located in memory
typedef struct SLHashBucket {
  SArray        *pPageIdList;
  int32_t        size;            // the number of element in this entry
} SLHashBucket;

29
struct SLHashObj {
H
Haojun Liao 已提交
30 31 32
  SDiskbasedBuf *pBuf;
  _hash_fn_t     hashFn;
  SLHashBucket **pBucket;         // entry list
H
Haojun Liao 已提交
33
  int32_t        tuplesPerPage;
H
Haojun Liao 已提交
34 35 36 37
  int32_t        numOfAlloc;      // number of allocated bucket ptr slot
  int32_t        bits;            // the number of bits used in hash
  int32_t        numOfBuckets;    // the number of buckets
  int64_t        size;            // the number of total items
38
};
H
Haojun Liao 已提交
39 40 41 42 43 44 45 46

/**
 * the data struct for each hash node
 * +-----------+-------+--------+
 * | SLHashNode|  key  |  data  |
 * +-----------+-------+--------+
 */
typedef struct SLHashNode {
H
Haojun Liao 已提交
47 48
  uint16_t   keyLen;
  uint16_t   dataLen;
H
Haojun Liao 已提交
49 50 51
} SLHashNode;

#define GET_LHASH_NODE_KEY(_n)      (((char*)(_n)) + sizeof(SLHashNode))
H
Haojun Liao 已提交
52
#define GET_LHASH_NODE_DATA(_n)     ((char*)(_n) + sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen)
H
Haojun Liao 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66
#define GET_LHASH_NODE_LEN(_n)      (sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen + ((SLHashNode*)(_n))->dataLen)

static int32_t doAddNewBucket(SLHashObj* pHashObj);

static int32_t doGetBucketIdFromHashVal(int32_t hashv, int32_t bits) {
  return hashv & ((1ul << (bits)) - 1);
}

static int32_t doGetAlternativeBucketId(int32_t bucketId, int32_t bits, int32_t numOfBuckets) {
  int32_t v = bucketId - (1ul << (bits - 1));
  ASSERT(v < numOfBuckets);
  return v;
}

H
Haojun Liao 已提交
67 68 69
static int32_t doGetRelatedSplitBucketId(int32_t bucketId, int32_t bits) {
  int32_t splitBucketId = (1ul << (bits - 1)) ^ bucketId;
  return splitBucketId;
H
Haojun Liao 已提交
70 71 72
}

static void doCopyObject(char* p, const void* key, int32_t keyLen, const void* data, int32_t size) {
H
Haojun Liao 已提交
73 74 75 76
  *(uint16_t*) p = keyLen;
  p += sizeof(uint16_t);
  *(uint16_t*) p = size;
  p += sizeof(uint16_t);
H
Haojun Liao 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92

  memcpy(p, key, keyLen);
  p += keyLen;

  memcpy(p, data, size);
}

static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t index, const void* key, int32_t keyLen,
                             const void* data, int32_t size) {
  int32_t pageId = *(int32_t*)taosArrayGetLast(pBucket->pPageIdList);

  SFilePage* pPage = getBufPage(pHashObj->pBuf, pageId);
  ASSERT (pPage != NULL);

  // put to current buf page
  size_t nodeSize = sizeof(SLHashNode) + keyLen + size;
H
Haojun Liao 已提交
93
  ASSERT(nodeSize + sizeof(SFilePage) <= getBufPageSize(pHashObj->pBuf));
H
Haojun Liao 已提交
94 95 96 97 98 99 100

  if (pPage->num + nodeSize > getBufPageSize(pHashObj->pBuf)) {
    releaseBufPage(pHashObj->pBuf, pPage);

    // allocate the overflow buffer page to hold this k/v.
    int32_t newPageId = -1;
    SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId);
H
Haojun Liao 已提交
101
    if (pNewPage == NULL) {
102
      return terrno;
H
Haojun Liao 已提交
103 104 105 106 107
    }

    taosArrayPush(pBucket->pPageIdList, &newPageId);

    doCopyObject(pNewPage->data, key, keyLen, data, size);
H
Haojun Liao 已提交
108
    pNewPage->num = sizeof(SFilePage) + nodeSize;
H
Haojun Liao 已提交
109 110 111 112

    setBufPageDirty(pNewPage, true);
    releaseBufPage(pHashObj->pBuf, pNewPage);
  } else {
H
Haojun Liao 已提交
113
    char* p = (char*) pPage + pPage->num;
H
Haojun Liao 已提交
114 115 116 117 118 119 120
    doCopyObject(p, key, keyLen, data, size);
    pPage->num += nodeSize;
    setBufPageDirty(pPage, true);
    releaseBufPage(pHashObj->pBuf, pPage);
  }

  pBucket->size += 1;
H
Haojun Liao 已提交
121
//  printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key);
H
Haojun Liao 已提交
122 123

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
124 125 126
}

static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket* pBucket) {
H
Haojun Liao 已提交
127
  ASSERT(pPage != NULL && pNode != NULL && pBucket->size >= 1);
H
Haojun Liao 已提交
128 129 130 131

  int32_t len = GET_LHASH_NODE_LEN(pNode);
  char* p = (char*) pNode + len;

H
Haojun Liao 已提交
132
  char* pEnd = (char*)pPage + pPage->num;
H
Haojun Liao 已提交
133 134 135 136 137 138 139 140 141 142 143
  memmove(pNode, p, (pEnd - p));

  pPage->num -= len;
  if (pPage->num == 0) {
    // this page is empty, could be recycle in the future.
  }

  setBufPageDirty(pPage, true);
  pBucket->size -= 1;
}

H
Haojun Liao 已提交
144
static void doTrimBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) {
H
Haojun Liao 已提交
145 146 147 148 149 150 151 152 153 154 155
  size_t numOfPages = taosArrayGetSize(pBucket->pPageIdList);
  if (numOfPages <= 1) {
    return;
  }

  int32_t*   firstPage = taosArrayGet(pBucket->pPageIdList, 0);
  SFilePage* pFirst = getBufPage(pHashObj->pBuf, *firstPage);

  int32_t*   pageId = taosArrayGetLast(pBucket->pPageIdList);
  SFilePage* pLast = getBufPage(pHashObj->pBuf, *pageId);

H
Haojun Liao 已提交
156 157 158 159 160 161 162 163
  if (pLast->num <= sizeof(SFilePage)) {
    // this is empty
    dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
    releaseBufPage(pHashObj->pBuf, pFirst);
    taosArrayRemove(pBucket->pPageIdList, numOfPages - 1);
    return;
  }

H
Haojun Liao 已提交
164 165 166 167 168 169 170 171
  char*   pStart = pLast->data;
  int32_t nodeSize = GET_LHASH_NODE_LEN(pStart);
  while (1) {
    if (pFirst->num + nodeSize < getBufPageSize(pHashObj->pBuf)) {
      char* p = ((char*)pFirst) + pFirst->num;

      SLHashNode* pNode = (SLHashNode*)pStart;
      doCopyObject(p, GET_LHASH_NODE_KEY(pStart), pNode->keyLen, GET_LHASH_NODE_DATA(pStart), pNode->dataLen);
H
Haojun Liao 已提交
172

H
Haojun Liao 已提交
173
      setBufPageDirty(pFirst, true);
H
Haojun Liao 已提交
174 175 176
      setBufPageDirty(pLast, true);

      ASSERT(pLast->num >= nodeSize + sizeof(SFilePage));
H
Haojun Liao 已提交
177 178 179

      pFirst->num += nodeSize;
      pLast->num -= nodeSize;
H
Haojun Liao 已提交
180

H
Haojun Liao 已提交
181
      pStart += nodeSize;
H
Haojun Liao 已提交
182
      if (pLast->num <= sizeof(SFilePage)) {
H
Haojun Liao 已提交
183 184
        // this is empty
        dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
H
Haojun Liao 已提交
185
        releaseBufPage(pHashObj->pBuf, pFirst);
H
Haojun Liao 已提交
186 187 188 189 190 191
        taosArrayRemove(pBucket->pPageIdList, numOfPages - 1);
        break;
      }

      nodeSize = GET_LHASH_NODE_LEN(pStart);
    } else { // move to the front of pLast page
H
Haojun Liao 已提交
192 193 194 195 196 197 198
      if (pStart != pLast->data) {
        memmove(pLast->data, pStart, (((char*)pLast) + pLast->num - pStart));
        setBufPageDirty(pLast, true);
      }

      releaseBufPage(pHashObj->pBuf, pLast);
      releaseBufPage(pHashObj->pBuf, pFirst);
H
Haojun Liao 已提交
199 200 201 202 203
      break;
    }
  }
}

H
Haojun Liao 已提交
204 205 206 207 208 209 210
static int32_t doAddNewBucket(SLHashObj* pHashObj) {
  if (pHashObj->numOfBuckets + 1 > pHashObj->numOfAlloc) {
    int32_t newLen = pHashObj->numOfAlloc * 1.25;
    if (newLen == pHashObj->numOfAlloc) {
      newLen += 4;
    }

wafwerar's avatar
wafwerar 已提交
211
    char* p = taosMemoryRealloc(pHashObj->pBucket, POINTER_BYTES * newLen);
H
Haojun Liao 已提交
212 213 214 215 216 217 218 219 220
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    memset(p + POINTER_BYTES * pHashObj->numOfBuckets, 0, newLen - pHashObj->numOfBuckets);
    pHashObj->pBucket = (SLHashBucket**) p;
    pHashObj->numOfAlloc = newLen;
  }

wafwerar's avatar
wafwerar 已提交
221
  SLHashBucket* pBucket = taosMemoryCalloc(1, sizeof(SLHashBucket));
H
Haojun Liao 已提交
222 223 224 225 226 227 228 229 230
  pHashObj->pBucket[pHashObj->numOfBuckets] = pBucket;

  pBucket->pPageIdList = taosArrayInit(2, sizeof(int32_t));
  if (pBucket->pPageIdList == NULL || pBucket == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t pageId = -1;
  SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId);
231 232 233 234
  if (p == NULL) {
    return terrno;
  }

H
Haojun Liao 已提交
235 236
  p->num = sizeof(SFilePage);
  setBufPageDirty(p, true);
H
Haojun Liao 已提交
237

H
Haojun Liao 已提交
238
  releaseBufPage(pHashObj->pBuf, p);
H
Haojun Liao 已提交
239 240 241
  taosArrayPush(pBucket->pPageIdList, &pageId);

  pHashObj->numOfBuckets += 1;
H
Haojun Liao 已提交
242
//  printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets);
H
Haojun Liao 已提交
243 244 245
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
246
SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_t numOfTuplePerPage) {
wafwerar's avatar
wafwerar 已提交
247
  SLHashObj* pHashObj = taosMemoryCalloc(1, sizeof(SLHashObj));
H
Haojun Liao 已提交
248 249 250 251 252
  if (pHashObj == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
253 254 255 256 257
  if (!osTempSpaceAvailable()) {
    terrno = TSDB_CODE_NO_AVAIL_DISK;
    printf("tHash Init failed since %s", terrstr(terrno));
    return NULL;
  }
258 259

  int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, "", tsTempDir);
H
Haojun Liao 已提交
260 261 262 263 264
  if (code != 0) {
    terrno = code;
    return NULL;
  }

H
Haojun Liao 已提交
265
  // disable compress when flushing to disk
H
Haojun Liao 已提交
266 267 268 269 270 271 272 273 274 275 276
  setBufPageCompressOnDisk(pHashObj->pBuf, false);

  /**
   * The number of bits in the hash value, which is used to decide the exact bucket where the object should be located in.
   * The initial value is 0.
   */
  pHashObj->bits   = 0;
  pHashObj->hashFn = fn;
  pHashObj->tuplesPerPage = numOfTuplePerPage;

  pHashObj->numOfAlloc   = 4;  // initial allocated array list
wafwerar's avatar
wafwerar 已提交
277
  pHashObj->pBucket = taosMemoryCalloc(pHashObj->numOfAlloc, POINTER_BYTES);
H
Haojun Liao 已提交
278 279 280 281

  code = doAddNewBucket(pHashObj);
  if (code != TSDB_CODE_SUCCESS) {
    destroyDiskbasedBuf(pHashObj->pBuf);
wafwerar's avatar
wafwerar 已提交
282
    taosMemoryFreeClear(pHashObj);
H
Haojun Liao 已提交
283 284 285 286 287 288 289 290 291 292 293
    terrno = code;
    return NULL;
  }

  return pHashObj;
}

void* tHashCleanup(SLHashObj* pHashObj) {
  destroyDiskbasedBuf(pHashObj->pBuf);
  for(int32_t i = 0; i < pHashObj->numOfBuckets; ++i) {
    taosArrayDestroy(pHashObj->pBucket[i]->pPageIdList);
wafwerar's avatar
wafwerar 已提交
294
    taosMemoryFreeClear(pHashObj->pBucket[i]);
H
Haojun Liao 已提交
295 296
  }

wafwerar's avatar
wafwerar 已提交
297 298
  taosMemoryFreeClear(pHashObj->pBucket);
  taosMemoryFreeClear(pHashObj);
H
Haojun Liao 已提交
299 300 301
  return NULL;
}

H
Haojun Liao 已提交
302 303 304 305 306 307 308 309 310 311
int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
  ASSERT(pHashObj != NULL && key != NULL);

  if (pHashObj->bits == 0) {
    SLHashBucket* pBucket = pHashObj->pBucket[0];
    doAddToBucket(pHashObj, pBucket, 0, key, keyLen, data, size);
  } else {
    int32_t hashVal = pHashObj->hashFn(key, keyLen);
    int32_t v = doGetBucketIdFromHashVal(hashVal, pHashObj->bits);

H
Haojun Liao 已提交
312 313
    if (v >= pHashObj->numOfBuckets) {
      int32_t newBucketId = doGetAlternativeBucketId(v, pHashObj->bits, pHashObj->numOfBuckets);
H
Haojun Liao 已提交
314
//      printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId);
H
Haojun Liao 已提交
315 316
      v = newBucketId;
    }
H
Haojun Liao 已提交
317

H
Haojun Liao 已提交
318 319 320 321
    SLHashBucket* pBucket = pHashObj->pBucket[v];
    int32_t code = doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
H
Haojun Liao 已提交
322 323 324 325 326 327 328 329 330 331
    }
  }

  pHashObj->size += 1;

  // Too many records, needs to bucket split
  if ((pHashObj->numOfBuckets * LHASH_CAP_RATIO * pHashObj->tuplesPerPage) < pHashObj->size) {
    int32_t newBucketId = pHashObj->numOfBuckets;

    int32_t code = doAddNewBucket(pHashObj);
H
Haojun Liao 已提交
332 333 334 335
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

H
Haojun Liao 已提交
336 337
    int32_t numOfBits = ceil(log(pHashObj->numOfBuckets) / log(2));
    if (numOfBits > pHashObj->bits) {
H
Haojun Liao 已提交
338
//      printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId);
H
Haojun Liao 已提交
339 340 341 342
      ASSERT(numOfBits == pHashObj->bits + 1);
      pHashObj->bits = numOfBits;
    }

H
Haojun Liao 已提交
343
    int32_t splitBucketId = doGetRelatedSplitBucketId(newBucketId, pHashObj->bits);
H
Haojun Liao 已提交
344 345 346

    // load all data in this bucket and check if the data needs to relocated into the new bucket
    SLHashBucket* pBucket = pHashObj->pBucket[splitBucketId];
H
Haojun Liao 已提交
347
//    printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId);
H
Haojun Liao 已提交
348 349 350 351 352 353

    for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) {
      int32_t    pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i);
      SFilePage* p = getBufPage(pHashObj->pBuf, pageId);

      char* pStart = p->data;
H
Haojun Liao 已提交
354
      while (pStart - ((char*) p) < p->num) {
H
Haojun Liao 已提交
355
        SLHashNode* pNode = (SLHashNode*)pStart;
H
Haojun Liao 已提交
356
        ASSERT(pNode->keyLen > 0 && pNode->dataLen >= 0);
H
Haojun Liao 已提交
357 358 359

        char* k = GET_LHASH_NODE_KEY(pNode);
        int32_t hashv = pHashObj->hashFn(k, pNode->keyLen);
H
Haojun Liao 已提交
360
        int32_t v1 = doGetBucketIdFromHashVal(hashv, pHashObj->bits);
H
Haojun Liao 已提交
361 362 363

        if (v1 != splitBucketId) {  // place it into the new bucket
          ASSERT(v1 == newBucketId);
H
Haojun Liao 已提交
364
//          printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1);
H
Haojun Liao 已提交
365 366 367 368 369 370

          SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId];
          doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen,
                        GET_LHASH_NODE_KEY(pNode), pNode->dataLen);
          doRemoveFromBucket(p, pNode, pBucket);
        } else {
H
Haojun Liao 已提交
371
//          printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1);
H
Haojun Liao 已提交
372 373 374 375 376 377 378

          int32_t nodeSize = GET_LHASH_NODE_LEN(pStart);
          pStart += nodeSize;
        }
      }
      releaseBufPage(pHashObj->pBuf, p);
    }
H
Haojun Liao 已提交
379

H
Haojun Liao 已提交
380
    doTrimBucketPages(pHashObj, pBucket);
H
Haojun Liao 已提交
381
  }
H
Haojun Liao 已提交
382 383

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
384 385 386 387 388 389 390 391 392 393 394 395
}

char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) {
  ASSERT(pHashObj != NULL && key != NULL && keyLen > 0);
  int32_t hashv = pHashObj->hashFn(key, keyLen);

  int32_t bucketId = doGetBucketIdFromHashVal(hashv, pHashObj->bits);
  if (bucketId >= pHashObj->numOfBuckets) {
    bucketId = doGetAlternativeBucketId(bucketId, pHashObj->bits, pHashObj->numOfBuckets);
  }

  SLHashBucket* pBucket = pHashObj->pBucket[bucketId];
396 397 398
  int32_t num = taosArrayGetSize(pBucket->pPageIdList);

  for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
    int32_t    pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i);
    SFilePage* p = getBufPage(pHashObj->pBuf, pageId);

    char* pStart = p->data;
    while (pStart - p->data < p->num) {
      SLHashNode* pNode = (SLHashNode*)pStart;

      char* k = GET_LHASH_NODE_KEY(pNode);
      if (pNode->keyLen == keyLen && (memcmp(key, k, keyLen) == 0)) {
        releaseBufPage(pHashObj->pBuf, p);
        return GET_LHASH_NODE_DATA(pNode);
      } else {
        pStart += GET_LHASH_NODE_LEN(pStart);
      }
    }

    releaseBufPage(pHashObj->pBuf, p);
  }

  return NULL;
}

int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen) {
H
Haojun Liao 已提交
422
  // todo
423
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
424 425 426 427
}

void tHashPrint(const SLHashObj* pHashObj, int32_t type) {
  printf("==================== linear hash ====================\n");
428
  printf("total bucket:%d, size:%" PRId64 ", ratio:%.2f\n", pHashObj->numOfBuckets, pHashObj->size, LHASH_CAP_RATIO);
H
Haojun Liao 已提交
429 430 431 432 433

  dBufSetPrintInfo(pHashObj->pBuf);

  if (type == LINEAR_HASH_DATA) {
    for (int32_t i = 0; i < pHashObj->numOfBuckets; ++i) {
H
Haojun Liao 已提交
434 435
//      printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size,
//             (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList));
H
Haojun Liao 已提交
436 437 438 439
    }
  } else {
    dBufPrintStatis(pHashObj->pBuf);
  }
440
}