/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "tlinearhash.h" #include "tdef.h" #include "taoserror.h" #include "tpagedbuf.h" #define LHASH_CAP_RATIO 0.85 // Always located in memory typedef struct SLHashBucket { SArray *pPageIdList; int32_t size; // the number of element in this entry } SLHashBucket; typedef struct SLHashObj { SDiskbasedBuf *pBuf; _hash_fn_t hashFn; SLHashBucket **pBucket; // entry list int32_t tuplesPerPage; int32_t numOfAlloc; // number of allocated bucket ptr slot int32_t bits; // the number of bits used in hash int32_t numOfBuckets; // the number of buckets int64_t size; // the number of total items } SLHashObj; /** * the data struct for each hash node * +-----------+-------+--------+ * | SLHashNode| key | data | * +-----------+-------+--------+ */ typedef struct SLHashNode { uint16_t keyLen; uint16_t dataLen; } SLHashNode; #define GET_LHASH_NODE_KEY(_n) (((char*)(_n)) + sizeof(SLHashNode)) #define GET_LHASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen) #define GET_LHASH_NODE_LEN(_n) (sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen + ((SLHashNode*)(_n))->dataLen) static int32_t doAddNewBucket(SLHashObj* pHashObj); static int32_t doGetBucketIdFromHashVal(int32_t hashv, int32_t bits) { return hashv & ((1ul << (bits)) - 1); } static int32_t doGetAlternativeBucketId(int32_t bucketId, int32_t bits, int32_t numOfBuckets) { int32_t v = bucketId - (1ul << (bits - 1)); ASSERT(v < numOfBuckets); return v; } static int32_t doGetRelatedSplitBucketId(int32_t bucketId, int32_t bits) { int32_t splitBucketId = (1ul << (bits - 1)) ^ bucketId; return splitBucketId; } static void doCopyObject(char* p, const void* key, int32_t keyLen, const void* data, int32_t size) { *(uint16_t*) p = keyLen; p += sizeof(uint16_t); *(uint16_t*) p = size; p += sizeof(uint16_t); memcpy(p, key, keyLen); p += keyLen; memcpy(p, data, size); } static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t index, const void* key, int32_t keyLen, const void* data, int32_t size) { int32_t pageId = *(int32_t*)taosArrayGetLast(pBucket->pPageIdList); SFilePage* pPage = getBufPage(pHashObj->pBuf, pageId); ASSERT (pPage != NULL); // put to current buf page size_t nodeSize = sizeof(SLHashNode) + keyLen + size; ASSERT(nodeSize + sizeof(SFilePage) <= getBufPageSize(pHashObj->pBuf)); if (pPage->num + nodeSize > getBufPageSize(pHashObj->pBuf)) { releaseBufPage(pHashObj->pBuf, pPage); // allocate the overflow buffer page to hold this k/v. int32_t newPageId = -1; SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId); if (pNewPage == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } taosArrayPush(pBucket->pPageIdList, &newPageId); doCopyObject(pNewPage->data, key, keyLen, data, size); pNewPage->num = sizeof(SFilePage) + nodeSize; setBufPageDirty(pNewPage, true); releaseBufPage(pHashObj->pBuf, pNewPage); } else { char* p = (char*) pPage + pPage->num; doCopyObject(p, key, keyLen, data, size); pPage->num += nodeSize; setBufPageDirty(pPage, true); releaseBufPage(pHashObj->pBuf, pPage); } pBucket->size += 1; // printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key); return TSDB_CODE_SUCCESS; } static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket* pBucket) { ASSERT(pPage != NULL && pNode != NULL && pBucket->size >= 1); int32_t len = GET_LHASH_NODE_LEN(pNode); char* p = (char*) pNode + len; char* pEnd = (char*)pPage + pPage->num; memmove(pNode, p, (pEnd - p)); pPage->num -= len; if (pPage->num == 0) { // this page is empty, could be recycle in the future. } setBufPageDirty(pPage, true); pBucket->size -= 1; } static void doTrimBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) { size_t numOfPages = taosArrayGetSize(pBucket->pPageIdList); if (numOfPages <= 1) { return; } int32_t* firstPage = taosArrayGet(pBucket->pPageIdList, 0); SFilePage* pFirst = getBufPage(pHashObj->pBuf, *firstPage); int32_t* pageId = taosArrayGetLast(pBucket->pPageIdList); SFilePage* pLast = getBufPage(pHashObj->pBuf, *pageId); if (pLast->num <= sizeof(SFilePage)) { // this is empty dBufSetBufPageRecycled(pHashObj->pBuf, pLast); releaseBufPage(pHashObj->pBuf, pFirst); taosArrayRemove(pBucket->pPageIdList, numOfPages - 1); return; } char* pStart = pLast->data; int32_t nodeSize = GET_LHASH_NODE_LEN(pStart); while (1) { if (pFirst->num + nodeSize < getBufPageSize(pHashObj->pBuf)) { char* p = ((char*)pFirst) + pFirst->num; SLHashNode* pNode = (SLHashNode*)pStart; doCopyObject(p, GET_LHASH_NODE_KEY(pStart), pNode->keyLen, GET_LHASH_NODE_DATA(pStart), pNode->dataLen); setBufPageDirty(pFirst, true); setBufPageDirty(pLast, true); ASSERT(pLast->num >= nodeSize + sizeof(SFilePage)); pFirst->num += nodeSize; pLast->num -= nodeSize; pStart += nodeSize; if (pLast->num <= sizeof(SFilePage)) { // this is empty dBufSetBufPageRecycled(pHashObj->pBuf, pLast); releaseBufPage(pHashObj->pBuf, pFirst); taosArrayRemove(pBucket->pPageIdList, numOfPages - 1); break; } nodeSize = GET_LHASH_NODE_LEN(pStart); } else { // move to the front of pLast page if (pStart != pLast->data) { memmove(pLast->data, pStart, (((char*)pLast) + pLast->num - pStart)); setBufPageDirty(pLast, true); } releaseBufPage(pHashObj->pBuf, pLast); releaseBufPage(pHashObj->pBuf, pFirst); break; } } } static int32_t doAddNewBucket(SLHashObj* pHashObj) { if (pHashObj->numOfBuckets + 1 > pHashObj->numOfAlloc) { int32_t newLen = pHashObj->numOfAlloc * 1.25; if (newLen == pHashObj->numOfAlloc) { newLen += 4; } char* p = taosMemoryRealloc(pHashObj->pBucket, POINTER_BYTES * newLen); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } memset(p + POINTER_BYTES * pHashObj->numOfBuckets, 0, newLen - pHashObj->numOfBuckets); pHashObj->pBucket = (SLHashBucket**) p; pHashObj->numOfAlloc = newLen; } SLHashBucket* pBucket = taosMemoryCalloc(1, sizeof(SLHashBucket)); pHashObj->pBucket[pHashObj->numOfBuckets] = pBucket; pBucket->pPageIdList = taosArrayInit(2, sizeof(int32_t)); if (pBucket->pPageIdList == NULL || pBucket == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } int32_t pageId = -1; SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId); p->num = sizeof(SFilePage); setBufPageDirty(p, true); releaseBufPage(pHashObj->pBuf, p); taosArrayPush(pBucket->pPageIdList, &pageId); pHashObj->numOfBuckets += 1; // printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets); return TSDB_CODE_SUCCESS; } SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_t numOfTuplePerPage) { SLHashObj* pHashObj = taosMemoryCalloc(1, sizeof(SLHashObj)); if (pHashObj == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, "/tmp"); if (code != 0) { terrno = code; return NULL; } // disable compress when flushing to disk setBufPageCompressOnDisk(pHashObj->pBuf, false); /** * The number of bits in the hash value, which is used to decide the exact bucket where the object should be located in. * The initial value is 0. */ pHashObj->bits = 0; pHashObj->hashFn = fn; pHashObj->tuplesPerPage = numOfTuplePerPage; pHashObj->numOfAlloc = 4; // initial allocated array list pHashObj->pBucket = taosMemoryCalloc(pHashObj->numOfAlloc, POINTER_BYTES); code = doAddNewBucket(pHashObj); if (code != TSDB_CODE_SUCCESS) { destroyDiskbasedBuf(pHashObj->pBuf); taosMemoryFreeClear(pHashObj); terrno = code; return NULL; } return pHashObj; } void* tHashCleanup(SLHashObj* pHashObj) { destroyDiskbasedBuf(pHashObj->pBuf); for(int32_t i = 0; i < pHashObj->numOfBuckets; ++i) { taosArrayDestroy(pHashObj->pBucket[i]->pPageIdList); taosMemoryFreeClear(pHashObj->pBucket[i]); } taosMemoryFreeClear(pHashObj->pBucket); taosMemoryFreeClear(pHashObj); return NULL; } int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data, size_t size) { ASSERT(pHashObj != NULL && key != NULL); if (pHashObj->bits == 0) { SLHashBucket* pBucket = pHashObj->pBucket[0]; doAddToBucket(pHashObj, pBucket, 0, key, keyLen, data, size); } else { int32_t hashVal = pHashObj->hashFn(key, keyLen); int32_t v = doGetBucketIdFromHashVal(hashVal, pHashObj->bits); if (v >= pHashObj->numOfBuckets) { int32_t newBucketId = doGetAlternativeBucketId(v, pHashObj->bits, pHashObj->numOfBuckets); // printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId); v = newBucketId; } SLHashBucket* pBucket = pHashObj->pBucket[v]; int32_t code = doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size); if (code != TSDB_CODE_SUCCESS) { return code; } } pHashObj->size += 1; // Too many records, needs to bucket split if ((pHashObj->numOfBuckets * LHASH_CAP_RATIO * pHashObj->tuplesPerPage) < pHashObj->size) { int32_t newBucketId = pHashObj->numOfBuckets; int32_t code = doAddNewBucket(pHashObj); if (code != TSDB_CODE_SUCCESS) { return code; } int32_t numOfBits = ceil(log(pHashObj->numOfBuckets) / log(2)); if (numOfBits > pHashObj->bits) { // printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId); ASSERT(numOfBits == pHashObj->bits + 1); pHashObj->bits = numOfBits; } int32_t splitBucketId = doGetRelatedSplitBucketId(newBucketId, pHashObj->bits); // load all data in this bucket and check if the data needs to relocated into the new bucket SLHashBucket* pBucket = pHashObj->pBucket[splitBucketId]; // printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId); for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) { int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i); SFilePage* p = getBufPage(pHashObj->pBuf, pageId); char* pStart = p->data; while (pStart - ((char*) p) < p->num) { SLHashNode* pNode = (SLHashNode*)pStart; ASSERT(pNode->keyLen > 0 && pNode->dataLen >= 0); char* k = GET_LHASH_NODE_KEY(pNode); int32_t hashv = pHashObj->hashFn(k, pNode->keyLen); int32_t v1 = doGetBucketIdFromHashVal(hashv, pHashObj->bits); if (v1 != splitBucketId) { // place it into the new bucket ASSERT(v1 == newBucketId); // printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1); SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId]; doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen, GET_LHASH_NODE_KEY(pNode), pNode->dataLen); doRemoveFromBucket(p, pNode, pBucket); } else { // printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1); int32_t nodeSize = GET_LHASH_NODE_LEN(pStart); pStart += nodeSize; } } releaseBufPage(pHashObj->pBuf, p); } doTrimBucketPages(pHashObj, pBucket); } return TSDB_CODE_SUCCESS; } char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) { ASSERT(pHashObj != NULL && key != NULL && keyLen > 0); int32_t hashv = pHashObj->hashFn(key, keyLen); int32_t bucketId = doGetBucketIdFromHashVal(hashv, pHashObj->bits); if (bucketId >= pHashObj->numOfBuckets) { bucketId = doGetAlternativeBucketId(bucketId, pHashObj->bits, pHashObj->numOfBuckets); } SLHashBucket* pBucket = pHashObj->pBucket[bucketId]; for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) { int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i); SFilePage* p = getBufPage(pHashObj->pBuf, pageId); char* pStart = p->data; while (pStart - p->data < p->num) { SLHashNode* pNode = (SLHashNode*)pStart; char* k = GET_LHASH_NODE_KEY(pNode); if (pNode->keyLen == keyLen && (memcmp(key, k, keyLen) == 0)) { releaseBufPage(pHashObj->pBuf, p); return GET_LHASH_NODE_DATA(pNode); } else { pStart += GET_LHASH_NODE_LEN(pStart); } } releaseBufPage(pHashObj->pBuf, p); } return NULL; } int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen) { // todo return TSDB_CODE_SUCCESS; } void tHashPrint(const SLHashObj* pHashObj, int32_t type) { printf("==================== linear hash ====================\n"); printf("total bucket:%d, size:%" PRId64 ", ratio:%.2f\n", pHashObj->numOfBuckets, pHashObj->size, LHASH_CAP_RATIO); dBufSetPrintInfo(pHashObj->pBuf); if (type == LINEAR_HASH_DATA) { for (int32_t i = 0; i < pHashObj->numOfBuckets; ++i) { // printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size, // (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList)); } } else { dBufPrintStatis(pHashObj->pBuf); } }