提交 09e91f9a 编写于 作者: H Hongze Cheng

TD-1437

上级 dee16dd4
...@@ -411,9 +411,9 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -411,9 +411,9 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
} }
if (cond.start != NULL) { if (cond.start != NULL) {
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->keyInfo.type, TSDB_ORDER_ASC); iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_ASC);
} else { } else {
iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->keyInfo.type, TSDB_ORDER_DESC); iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->type, TSDB_ORDER_DESC);
} }
if (cond.start != NULL) { if (cond.start != NULL) {
...@@ -468,7 +468,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -468,7 +468,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
comp = true; comp = true;
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->keyInfo.type, TSDB_ORDER_DESC); iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_DESC);
while(tSkipListIterNext(iter)) { while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter); SSkipListNode* pNode = tSkipListIterGet(iter);
comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0); comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
......
...@@ -37,38 +37,25 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); ...@@ -37,38 +37,25 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
STsdbCfg * pCfg = &pRepo->config; STsdbCfg * pCfg = &pRepo->config;
STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbMeta * pMeta = pRepo->tsdbMeta;
int32_t level = 0;
int32_t headSize = 0;
TSKEY key = dataRowKey(row); TSKEY key = dataRowKey(row);
SMemTable * pMemTable = pRepo->mem; SMemTable * pMemTable = pRepo->mem;
STableData *pTableData = NULL; STableData *pTableData = NULL;
SSkipList * pSList = NULL; // SSkipList * pSList = NULL;
if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL && // if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) { // pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) {
pTableData = pMemTable->tData[TABLE_TID(pTable)]; // pTableData = pMemTable->tData[TABLE_TID(pTable)];
pSList = pTableData->pData; // pSList = pTableData->pData;
} // }
tSkipListNewNodeInfo(pSList, &level, &headSize);
SSkipListNode *pNode = (SSkipListNode *)malloc(headSize + sizeof(SDataRow *));
if (pNode == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row)); void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row));
if (pRow == NULL) { if (pRow == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s", tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno)); REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno));
free(pNode);
return -1; return -1;
} }
pNode->level = level;
dataRowCpy(pRow, row); dataRowCpy(pRow, row);
*(SDataRow *)SL_GET_NODE_DATA(pNode) = pRow;
// Operations above may change pRepo->mem, retake those values // Operations above may change pRepo->mem, retake those values
ASSERT(pRepo->mem != NULL); ASSERT(pRepo->mem != NULL);
...@@ -77,7 +64,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ...@@ -77,7 +64,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
if (TABLE_TID(pTable) >= pMemTable->maxTables) { if (TABLE_TID(pTable) >= pMemTable->maxTables) {
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
tsdbFreeBytes(pRepo, pRow, dataRowLen(row)); tsdbFreeBytes(pRepo, pRow, dataRowLen(row));
free(pNode);
return -1; return -1;
} }
} }
...@@ -97,7 +83,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ...@@ -97,7 +83,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
" to table %s while create new table data object since %s", " to table %s while create new table data object since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno)); REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
free(pNode);
return -1; return -1;
} }
...@@ -106,20 +91,19 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ...@@ -106,20 +91,19 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
if (tSkipListPut(pTableData->pData, pNode) == NULL) { int64_t oldSize = SL_GET_SIZE(pTableData->pData);
if (tSkipListPut(pTableData->pData, (void *)(&pRow), sizeof(void *)) == NULL) {
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
free(pNode);
} else { } else {
int64_t deltaSize = SL_GET_SIZE(pTableData->pData) - oldSize;
if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key; if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
if (pMemTable->keyFirst > key) pMemTable->keyFirst = key; if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
if (pMemTable->keyLast < key) pMemTable->keyLast = key; if (pMemTable->keyLast < key) pMemTable->keyLast = key;
pMemTable->numOfRows++; pMemTable->numOfRows += deltaSize;
if (pTableData->keyFirst > key) pTableData->keyFirst = key; if (pTableData->keyFirst > key) pTableData->keyFirst = key;
if (pTableData->keyLast < key) pTableData->keyLast = key; if (pTableData->keyLast < key) pTableData->keyLast = key;
pTableData->numOfRows++; pTableData->numOfRows += deltaSize;
ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData));
} }
tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
...@@ -439,7 +423,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { ...@@ -439,7 +423,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
pTableData->numOfRows = 0; pTableData->numOfRows = 0;
pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP,
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 1, tsdbGetTsTupleKey); TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], /*SL_DISCARD_DUP_KEY*/ SL_APPEND_DUP_KEY, tsdbGetTsTupleKey);
if (pTableData->pData == NULL) { if (pTableData->pData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
......
...@@ -690,7 +690,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { ...@@ -690,7 +690,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
} }
pTable->tagVal = NULL; pTable->tagVal = NULL;
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN); STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), 1, 0, 1, getTagIndexKey); pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), SL_ALLOW_DUP_KEY, getTagIndexKey);
if (pTable->pIndex == NULL) { if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
...@@ -892,23 +892,8 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper ...@@ -892,23 +892,8 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper
pTable->pSuper = pSTable; pTable->pSuper = pSTable;
int32_t level = 0; tSkipListPut(pSTable->pIndex, (void *)(&pTable), sizeof(STable *));
int32_t headSize = 0;
tSkipListNewNodeInfo(pSTable->pIndex, &level, &headSize);
// NOTE: do not allocate the space for key, since in each skip list node, only keep the pointer to pTable, not the
// actual key value, and the key value will be retrieved during query through the pTable and getTagIndexKey function
SSkipListNode *pNode = calloc(1, headSize + sizeof(STable *));
if (pNode == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pNode->level = level;
memcpy(SL_GET_NODE_DATA(pNode), &pTable, sizeof(STable *));
tSkipListPut(pSTable->pIndex, pNode);
if (refSuper) T_REF_INC(pSTable); if (refSuper) T_REF_INC(pSTable);
return 0; return 0;
} }
...@@ -1165,7 +1150,7 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) { ...@@ -1165,7 +1150,7 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
buf = tdDecodeSchema(buf, &(pTable->tagSchema)); buf = tdDecodeSchema(buf, &(pTable->tagSchema));
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN); STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
pTable->pIndex = pTable->pIndex =
tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), 1, 0, 1, getTagIndexKey); tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), SL_ALLOW_DUP_KEY, getTagIndexKey);
if (pTable->pIndex == NULL) { if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbFreeTable(pTable); tsdbFreeTable(pTable);
...@@ -1191,7 +1176,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) { ...@@ -1191,7 +1176,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) {
tlen = sizeof(SListNode) + sizeof(SActObj) + sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM); tlen = sizeof(SListNode) + sizeof(SActObj) + sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM);
} else { } else {
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tlen = (int)((sizeof(SListNode) + sizeof(SActObj)) * (tSkipListGetSize(pTable->pIndex) + 1)); tlen = (int)((sizeof(SListNode) + sizeof(SActObj)) * (SL_GET_SIZE(pTable->pIndex) + 1));
} else { } else {
tlen = sizeof(SListNode) + sizeof(SActObj); tlen = sizeof(SListNode) + sizeof(SActObj);
} }
......
...@@ -7,7 +7,7 @@ TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4) ...@@ -7,7 +7,7 @@ TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4)
IF (TD_LINUX) IF (TD_LINUX)
TARGET_LINK_LIBRARIES(tutil m rt) TARGET_LINK_LIBRARIES(tutil m rt)
ADD_SUBDIRECTORY(tests) # ADD_SUBDIRECTORY(tests)
FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/) FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/)
IF (ICONV_INCLUDE_EXIST) IF (ICONV_INCLUDE_EXIST)
......
...@@ -27,6 +27,14 @@ extern "C" { ...@@ -27,6 +27,14 @@ extern "C" {
#define MAX_SKIP_LIST_LEVEL 15 #define MAX_SKIP_LIST_LEVEL 15
#define SKIP_LIST_RECORD_PERFORMANCE 0 #define SKIP_LIST_RECORD_PERFORMANCE 0
// For key property setting
#define SL_ALLOW_DUP_KEY (uint8_t)0x0 // Allow duplicate key exists
#define SL_DISCARD_DUP_KEY (uint8_t)0x1 // Discard duplicate key
#define SL_UPDATA_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert
#define SL_APPEND_DUP_KEY (uint8_t)0x3 // Update duplicate key by append
// For thread safety setting
#define SL_THREAD_SAFE (uint8_t)0x4
typedef char *SSkipListKey; typedef char *SSkipListKey;
typedef char *(*__sl_key_fn_t)(const void *); typedef char *(*__sl_key_fn_t)(const void *);
...@@ -41,6 +49,9 @@ typedef struct SSkipListNode { ...@@ -41,6 +49,9 @@ typedef struct SSkipListNode {
uint8_t level; uint8_t level;
} SSkipListNode; } SSkipListNode;
#define SL_IS_THREAD_SAFE(flags) ((flags)&SL_THREAD_SAFE)
#define SL_DUP_MODE(flags) ((flags) & ((((uint8_t)1) << 2) - 1))
#define SL_NODE_HEADER_SIZE(_l) (sizeof(SSkipListNode) + ((_l) << 1u) * POINTER_BYTES) #define SL_NODE_HEADER_SIZE(_l) (sizeof(SSkipListNode) + ((_l) << 1u) * POINTER_BYTES)
#define SL_GET_FORWARD_POINTER(n, _l) ((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode)))[(_l)] #define SL_GET_FORWARD_POINTER(n, _l) ((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode)))[(_l)]
...@@ -54,6 +65,8 @@ typedef struct SSkipListNode { ...@@ -54,6 +65,8 @@ typedef struct SSkipListNode {
#define SL_GET_SL_MAX_KEY(s) (SL_GET_NODE_KEY((s), SL_GET_BACKWARD_POINTER((s)->pTail, 0))) #define SL_GET_SL_MAX_KEY(s) (SL_GET_NODE_KEY((s), SL_GET_BACKWARD_POINTER((s)->pTail, 0)))
#define SL_GET_NODE_LEVEL(n) *(uint8_t *)((n)) #define SL_GET_NODE_LEVEL(n) *(uint8_t *)((n))
#define SL_GET_SIZE(s) (s)->size
#define SL_GET_TSIZE(s) (s)->tsize
/* /*
* @version 0.3 * @version 0.3
...@@ -113,11 +126,14 @@ typedef struct SSkipListKeyInfo { ...@@ -113,11 +126,14 @@ typedef struct SSkipListKeyInfo {
typedef struct SSkipList { typedef struct SSkipList {
__compar_fn_t comparFn; __compar_fn_t comparFn;
__sl_key_fn_t keyFn; __sl_key_fn_t keyFn;
uint32_t size; pthread_rwlock_t *lock;
uint16_t len;
uint8_t maxLevel; uint8_t maxLevel;
uint8_t flags;
uint8_t type; // static info above
uint8_t level; uint8_t level;
SSkipListKeyInfo keyInfo; uint32_t size; // not including duplicate keys
pthread_rwlock_t *lock; uint32_t tsize; // including duplicate keys
SSkipListNode * pHead; // point to the first element SSkipListNode * pHead; // point to the first element
SSkipListNode * pTail; // point to the last element SSkipListNode * pTail; // point to the last element
#if SKIP_LIST_RECORD_PERFORMANCE #if SKIP_LIST_RECORD_PERFORMANCE
...@@ -145,8 +161,7 @@ typedef struct SSkipListIterator { ...@@ -145,8 +161,7 @@ typedef struct SSkipListIterator {
* @param dupKey allow the duplicated key in the skip list * @param dupKey allow the duplicated key in the skip list
* @return * @return
*/ */
SSkipList *tSkipListCreate(uint8_t nMaxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t threadsafe, SSkipList *tSkipListCreate(uint8_t nMaxLevel, uint8_t keyType, uint16_t keyLen, uint8_t flags, __sl_key_fn_t fn);
uint8_t freeNode, __sl_key_fn_t fn);
/** /**
* *
...@@ -164,30 +179,34 @@ void *tSkipListDestroy(SSkipList *pSkipList); ...@@ -164,30 +179,34 @@ void *tSkipListDestroy(SSkipList *pSkipList);
void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize); void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize);
/** /**
* put the skip list node into the skip list. * put the data into the skiplist
* If failed, NULL will be returned, otherwise, the pNode will be returned. * If failed, NULL will be returned, otherwise, the pNode will be returned.
* *
* @param pSkipList * @param pSkipList
* @param pNode * @param pData
* @param dataLen
* @return * @return
*/ */
SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode); SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData, int dataLen);
/** /**
* get *all* nodes which key are equivalent to pKey * put the skip list node into the skip list.
* If failed, NULL will be returned, otherwise, the pNode will be returned.
* *
* @param pSkipList * @param pSkipList
* @param pKey * @param pNode
* @return * @return
*/ */
SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey); SSkipListNode *tSkipListPutNode(SSkipList *pSkipList, SSkipListNode *pNode);
/** /**
* get the size of skip list * get *all* nodes which key are equivalent to pKey
*
* @param pSkipList * @param pSkipList
* @param pKey
* @return * @return
*/ */
size_t tSkipListGetSize(const SSkipList *pSkipList); SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey);
/** /**
* display skip list of the given level, for debug purpose only * display skip list of the given level, for debug purpose only
......
...@@ -13,318 +13,649 @@ ...@@ -13,318 +13,649 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tskiplist.h"
#include "os.h" #include "os.h"
#include "tcompare.h"
#include "tulog.h" #include "tulog.h"
#include "tskiplist.h"
#include "tutil.h" #include "tutil.h"
#include "tcompare.h"
UNUSED_FUNC static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level) { // record link count in each level #define DO_MEMSET_PTR_AREA(n) \
#if SKIP_LIST_RECORD_PERFORMANCE do { \
for (int32_t i = 0; i < level; ++i) { int32_t _l = (n)->level; \
pSkipList->state.nLevelNodeCnt[i]++; memset(pNode, 0, SL_NODE_HEADER_SIZE(_l)); \
(n)->level = _l; \
} while (0)
static int initForwardBackwardPtr(SSkipList *pSkipList);
static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_t order);
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode);
static void tSkipListCorrectLevel(SSkipList *pSkipList);
static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList);
static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList);
static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList);
static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList);
static FORCE_INLINE SSkipListNode *tSkipListPutNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode);
SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, uint8_t flags, __sl_key_fn_t fn) {
SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList));
if (pSkipList == NULL) {
return NULL;
} }
#endif
}
UNUSED_FUNC static FORCE_INLINE void removeNodeEachLevel(SSkipList *pSkipList, int32_t level) { if (maxLevel > MAX_SKIP_LIST_LEVEL) {
#if SKIP_LIST_RECORD_PERFORMANCE maxLevel = MAX_SKIP_LIST_LEVEL;
for (int32_t i = 0; i < level; ++i) {
pSkipList->state.nLevelNodeCnt[i]--;
} }
pSkipList->maxLevel = maxLevel;
pSkipList->type = keyType;
pSkipList->len = keyLen;
pSkipList->flags = flags;
pSkipList->keyFn = fn;
pSkipList->comparFn = getKeyComparFunc(keyType);
// pSkipList->level = 1; // TODO: check if 1 is valid
if (initForwardBackwardPtr(pSkipList) < 0) {
taosTFree(pSkipList);
return NULL;
}
if (SL_IS_THREAD_SAFE(flags)) {
pSkipList->lock = (pthread_rwlock_t *)calloc(1, sizeof(pthread_rwlock_t));
if (pthread_rwlock_init(pSkipList->lock, NULL) != 0) {
taosTFree(pSkipList->pHead);
taosTFree(pSkipList);
return NULL;
}
}
srand((uint32_t)time(NULL));
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList->state.nTotalMemSize += sizeof(SSkipList);
#endif #endif
return pSkipList;
} }
static FORCE_INLINE int32_t getSkipListNodeRandomHeight(SSkipList *pSkipList) { void *tSkipListDestroy(SSkipList *pSkipList) {
const uint32_t factor = 4; if (pSkipList == NULL) return NULL;
int32_t n = 1; tSkipListWLock(pSkipList);
while ((rand() % factor) == 0 && n <= pSkipList->maxLevel) {
n++; SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0);
while (pNode != pSkipList->pTail) {
SSkipListNode *pTemp = pNode;
pNode = SL_GET_FORWARD_POINTER(pNode, 0);
taosTFree(pTemp);
} }
return n; tSkipListUnlock(pSkipList);
if (pSkipList->lock != NULL) {
pthread_rwlock_destroy(pSkipList->lock);
taosTFree(pSkipList->lock);
}
taosTFree(pSkipList->pHead);
taosTFree(pSkipList);
return NULL;
} }
static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList) { void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize) {
int32_t level = getSkipListNodeRandomHeight(pSkipList); if (pSkipList == NULL) {
if (pSkipList->size == 0) { *level = 1;
level = 1; *headSize = SL_NODE_HEADER_SIZE(*level);
pSkipList->level = 1; return;
} else {
if (level > pSkipList->level) {
if (pSkipList->level < pSkipList->maxLevel) {
level = (++pSkipList->level);
} else {
level = pSkipList->level;
} }
*level = getSkipListRandLevel(pSkipList);
*headSize = SL_NODE_HEADER_SIZE(*level);
}
SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData, int dataLen) {
if (pSkipList == NULL || pData == NULL) return NULL;
tSkipListWLock(pSkipList);
int32_t level = getSkipListRandLevel(pSkipList);
SSkipListNode *pNode = (SSkipListNode *)malloc(SL_NODE_HEADER_SIZE(level) + dataLen);
if (pNode == NULL) {
tSkipListUnlock(pSkipList);
return NULL;
} }
pNode->level = level;
memcpy(SL_GET_NODE_DATA(pNode), pData, dataLen);
if (tSkipListPutNodeImpl(pSkipList, pNode) == NULL) {
tSkipListUnlock(pSkipList);
taosTFree(pNode);
return NULL;
} }
assert(level <= pSkipList->maxLevel); tSkipListUnlock(pSkipList);
return level;
return pNode;
} }
#define DO_MEMSET_PTR_AREA(n) do {\ SSkipListNode *tSkipListPutNode(SSkipList *pSkipList, SSkipListNode *pNode) {
int32_t _l = (n)->level;\ SSkipListNode *pRetNode = NULL;
memset(pNode, 0, SL_NODE_HEADER_SIZE(_l));\
(n)->level = _l;\
} while(0)
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode); if (pSkipList == NULL || pNode == NULL) return NULL;
static SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode);
static SSkipListNode* tSkipListPushFront(SSkipList* pSkipList, SSkipListNode *pNode);
static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
tSkipListWLock(pSkipList);
pRetNode = tSkipListPutNodeImpl(pSkipList, pNode);
tSkipListUnlock(pSkipList);
// when order is TSDB_ORDER_ASC, return the last node with key less than val return pRetNode;
// when order is TSDB_ORDER_DESC, return the first node with key large than val }
static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order) {
__compar_fn_t comparFn = pSkipList->comparFn;
SSkipListNode *pNode = NULL;
if (order == TSDB_ORDER_ASC) { SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey key) {
pNode = pSkipList->pHead; SArray *sa = taosArrayInit(1, POINTER_BYTES);
for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i); tSkipListRLock(pSkipList);
while (p != pSkipList->pTail) {
char *key = SL_GET_NODE_KEY(pSkipList, p); SSkipListNode *pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC);
if (comparFn(key, val) < 0) { while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0);
if (p == pSkipList->pTail) {
break;
}
if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) {
break;
}
taosArrayPush(sa, &p);
pNode = p; pNode = p;
p = SL_GET_FORWARD_POINTER(p, i); }
tSkipListUnlock(pSkipList);
return sa;
}
uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) {
uint32_t count = 0;
tSkipListWLock(pSkipList);
SSkipListNode *pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC);
while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0);
if (p == pSkipList->pTail) {
break;
}
if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) {
break;
}
tSkipListRemoveNodeImpl(pSkipList, p);
++count;
}
tSkipListCorrectLevel(pSkipList);
tSkipListUnlock(pSkipList);
return count;
}
void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) {
tSkipListWLock(pSkipList);
tSkipListRemoveNodeImpl(pSkipList, pNode);
tSkipListCorrectLevel(pSkipList);
tSkipListUnlock(pSkipList);
}
SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList) {
if (pSkipList == NULL) return NULL;
return doCreateSkipListIterator(pSkipList, TSDB_ORDER_ASC);
}
SSkipListIterator *tSkipListCreateIterFromVal(SSkipList *pSkipList, const char *val, int32_t type, int32_t order) {
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
assert(pSkipList != NULL);
SSkipListIterator *iter = doCreateSkipListIterator(pSkipList, order);
if (val == NULL) {
return iter;
}
tSkipListRLock(pSkipList);
iter->cur = getPriorNode(pSkipList, val, order);
tSkipListUnlock(pSkipList);
return iter;
}
bool tSkipListIterNext(SSkipListIterator *iter) {
if (iter->pSkipList == NULL) return false;
SSkipList *pSkipList = iter->pSkipList;
uint8_t dupMod = SL_DUP_MODE(pSkipList->flags);
tSkipListRLock(pSkipList);
if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate
if (dupMod == SL_APPEND_DUP_KEY) {
if (iter->cur == pSkipList->pHead) {
iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0);
iter->step++;
} else {
while (true) {
iter->step++;
SSkipListNode *pNode = iter->cur;
iter->cur = SL_GET_FORWARD_POINTER(pNode, 0);
if (iter->cur == pSkipList->pTail) break;
if (pSkipList->comparFn(SL_GET_NODE_KEY(pSkipList, pNode), SL_GET_NODE_KEY(pSkipList, iter->cur)) == 0) {
continue;
} else { } else {
break; break;
} }
} }
} }
if (iter->cur != pSkipList->pTail) {
while (true) {
SSkipListNode *pNode = SL_GET_FORWARD_POINTER(iter->cur, 0);
if (pNode == pSkipList->pTail) break;
if (pSkipList->comparFn(SL_GET_NODE_KEY(pSkipList, pNode), SL_GET_NODE_KEY(pSkipList, iter->cur)) == 0) {
iter->step++;
iter->cur = pNode;
} else { } else {
pNode = pSkipList->pTail; break;
for (int32_t i = pSkipList->level - 1; i >= 0; --i) { }
SSkipListNode *p = SL_GET_BACKWARD_POINTER(pNode, i); }
while (p != pSkipList->pHead) { }
char *key = SL_GET_NODE_KEY(pSkipList, p); } else {
if (comparFn(key, val) > 0) { iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0);
pNode = p; iter->step++;
p = SL_GET_BACKWARD_POINTER(p, i); }
} else { // descending order iterate
if (dupMod == SL_APPEND_DUP_KEY) {
if (iter->cur == pSkipList->pTail) {
iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0);
iter->step++;
} else {
while (true) {
SSkipListNode *pNode = iter->cur;
iter->cur = SL_GET_BACKWARD_POINTER(pNode, 0);
if (iter->cur == pSkipList->pHead) break;
if (pSkipList->comparFn(SL_GET_NODE_KEY(pSkipList, pNode), SL_GET_NODE_KEY(pSkipList, iter->cur)) == 0) {
iter->cur = pNode;
continue;
} else { } else {
break; break;
} }
} }
} }
} else {
iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0);
iter->step++;
}
} }
return pNode; tSkipListUnlock(pSkipList);
return (iter->order == TSDB_ORDER_ASC) ? (iter->cur != pSkipList->pTail) : (iter->cur != pSkipList->pHead);
} }
SSkipListNode *tSkipListIterGet(SSkipListIterator *iter) {
if (iter == NULL || iter->cur == iter->pSkipList->pTail || iter->cur == iter->pSkipList->pHead) {
return NULL;
} else {
return iter->cur;
}
}
static bool initForwardBackwardPtr(SSkipList* pSkipList) { void *tSkipListDestroyIter(SSkipListIterator *iter) {
uint32_t maxLevel = pSkipList->maxLevel; if (iter == NULL) {
return NULL;
}
// head info taosTFree(iter);
pSkipList->pHead = (SSkipListNode *)calloc(1, SL_NODE_HEADER_SIZE(maxLevel) * 2); return NULL;
if (pSkipList->pHead == NULL) { }
return false;
void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) {
return;
} }
pSkipList->pHead->level = pSkipList->maxLevel; SSkipListNode *p = SL_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1);
// tail info int32_t id = 1;
pSkipList->pTail = (SSkipListNode*) ((char*) pSkipList->pHead + SL_NODE_HEADER_SIZE(maxLevel)); char * prev = NULL;
pSkipList->pTail->level = pSkipList->maxLevel;
for (uint32_t i = 0; i < maxLevel; ++i) { while (p != pSkipList->pTail) {
SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pSkipList->pTail; char *key = SL_GET_NODE_KEY(pSkipList, p);
SL_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pSkipList->pHead; if (prev != NULL) {
assert(pSkipList->comparFn(prev, key) < 0);
} }
return true; switch (pSkipList->type) {
} case TSDB_DATA_TYPE_INT:
fprintf(stdout, "%d: %d\n", id++, *(int32_t *)key);
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BIGINT:
fprintf(stdout, "%d: %" PRId64 " \n", id++, *(int64_t *)key);
break;
case TSDB_DATA_TYPE_BINARY:
fprintf(stdout, "%d: %s \n", id++, key);
break;
case TSDB_DATA_TYPE_DOUBLE:
fprintf(stdout, "%d: %lf \n", id++, *(double *)key);
break;
default:
fprintf(stdout, "\n");
}
SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t lock, prev = SL_GET_NODE_KEY(pSkipList, p);
uint8_t freeNode, __sl_key_fn_t fn) {
SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList)); p = SL_GET_FORWARD_POINTER(p, nlevel - 1);
if (pSkipList == NULL) {
return NULL;
} }
}
if (maxLevel > MAX_SKIP_LIST_LEVEL) { static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode, bool hasDupKey) {
maxLevel = MAX_SKIP_LIST_LEVEL; uint8_t dupMode = SL_DUP_MODE(pSkipList->flags);
// FIXME: this may cause the level of skiplist change
if (dupMode == SL_UPDATA_DUP_KEY && hasDupKey) {
tSkipListRemoveNodeImpl(pSkipList, forward[0]);
tSkipListCorrectLevel(pSkipList);
} }
pSkipList->keyInfo.type = keyType; DO_MEMSET_PTR_AREA(pNode);
pSkipList->keyInfo.len = keyLen;
pSkipList->keyInfo.dupKey = dupKey;
pSkipList->keyInfo.freeNode = freeNode;
pSkipList->keyFn = fn; for (int32_t i = 0; i < pNode->level; ++i) {
pSkipList->comparFn = getKeyComparFunc(keyType); SSkipListNode *x = forward[i];
pSkipList->maxLevel = maxLevel; SL_GET_BACKWARD_POINTER(pNode, i) = x;
pSkipList->level = 1;
if (!initForwardBackwardPtr(pSkipList)) { SSkipListNode *next = SL_GET_FORWARD_POINTER(x, i);
taosTFree(pSkipList); SL_GET_BACKWARD_POINTER(next, i) = pNode;
return NULL;
SL_GET_FORWARD_POINTER(pNode, i) = next;
SL_GET_FORWARD_POINTER(x, i) = pNode;
} }
if (lock) { if (!(dupMode == SL_APPEND_DUP_KEY && hasDupKey)) {
pSkipList->lock = calloc(1, sizeof(pthread_rwlock_t)); pSkipList->size += 1;
}
pSkipList->tsize += 1;
}
if (pthread_rwlock_init(pSkipList->lock, NULL) != 0) { static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order) {
taosTFree(pSkipList->pHead); SSkipListIterator *iter = calloc(1, sizeof(SSkipListIterator));
taosTFree(pSkipList);
return NULL; iter->pSkipList = pSkipList;
} iter->order = order;
if (order == TSDB_ORDER_ASC) {
iter->cur = pSkipList->pHead;
} else {
iter->cur = pSkipList->pTail;
} }
srand((uint32_t)time(NULL)); return iter;
}
#if SKIP_LIST_RECORD_PERFORMANCE static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList) {
pSkipList->state.nTotalMemSize += sizeof(SSkipList); if (SL_IS_THREAD_SAFE(pSkipList->flags)) {
#endif return pthread_rwlock_wrlock(pSkipList->lock);
}
return 0;
}
return pSkipList; static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList) {
if (SL_IS_THREAD_SAFE(pSkipList->flags)) {
return pthread_rwlock_rdlock(pSkipList->lock);
}
return 0;
} }
void *tSkipListDestroy(SSkipList *pSkipList) { static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList) {
if (pSkipList == NULL) { if (SL_IS_THREAD_SAFE(pSkipList->flags)) {
return NULL; return pthread_rwlock_unlock(pSkipList->lock);
} }
return 0;
}
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) {
int compare = 1;
bool hasDupKey = false;
char * pNodeKey = SL_GET_NODE_KEY(pSkipList, pNode);
uint8_t dupMode = SL_DUP_MODE(pSkipList->flags);
if (pSkipList->lock) { if (pSkipList->size == 0) {
pthread_rwlock_wrlock(pSkipList->lock); for (int i = 0; i < pNode->level; i++) {
forward[i] = pSkipList->pHead;
} }
} else {
char *pKey = NULL;
if (pSkipList->keyInfo.freeNode) { // Compare min key
SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); pKey = SL_GET_SL_MIN_KEY(pSkipList);
compare = pSkipList->comparFn(pNodeKey, pKey);
if ((dupMode == SL_APPEND_DUP_KEY && compare < 0) || (dupMode != SL_APPEND_DUP_KEY && compare <= 0)) {
for (int i = 0; i < pNode->level; i++) {
forward[i] = pSkipList->pHead;
}
return (compare == 0);
}
while (pNode != pSkipList->pTail) { // Compare max key
SSkipListNode *pTemp = pNode; pKey = SL_GET_SL_MAX_KEY(pSkipList);
pNode = SL_GET_FORWARD_POINTER(pNode, 0); compare = pSkipList->comparFn(pNodeKey, pKey);
taosTFree(pTemp); if ((dupMode == SL_DISCARD_DUP_KEY && compare > 0) || (dupMode != SL_DISCARD_DUP_KEY && compare >= 0)) {
for (int i = 0; i < pNode->level; i++) {
forward[i] = SL_GET_BACKWARD_POINTER(pSkipList->pTail, i);
} }
return (compare == 0);
} }
if (pSkipList->lock) { SSkipListNode *px = pSkipList->pHead;
pthread_rwlock_unlock(pSkipList->lock); for (int i = pSkipList->level - 1; i >= 0; --i) {
pthread_rwlock_destroy(pSkipList->lock); SSkipListNode *p = SL_GET_FORWARD_POINTER(px, i);
while (p != pSkipList->pTail) {
pKey = SL_GET_NODE_KEY(pSkipList, p);
taosTFree(pSkipList->lock); compare = pSkipList->comparFn(pKey, pNodeKey);
if (compare == 0 && hasDupKey == false) hasDupKey = true;
if ((dupMode == SL_APPEND_DUP_KEY && compare > 0) || (dupMode != SL_APPEND_DUP_KEY && compare >= 0)) {
break;
} else {
px = p;
p = SL_GET_FORWARD_POINTER(px, i);
}
} }
taosTFree(pSkipList->pHead); forward[i] = px;
taosTFree(pSkipList); }
return NULL; }
return hasDupKey;
} }
void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize) { static bool tSkipListIsNodeDup(SSkipList *pSkipList, SSkipListNode *pNode) {
if (pSkipList == NULL) { SSkipListNode *pPrevNode = SL_GET_BACKWARD_POINTER(pNode, 0);
*level = 1; SSkipListNode *pNextNode = SL_GET_FORWARD_POINTER(pNode, 0);
*headSize = SL_NODE_HEADER_SIZE(*level); char * pNodeKey = SL_GET_NODE_KEY(pSkipList, pNode);
return; char * pPrevNodeKey = (pPrevNode == pSkipList->pHead) ? NULL : SL_GET_NODE_KEY(pSkipList, pPrevNode);
char * pNextNodeKey = (pNextNode == pSkipList->pTail) ? NULL : SL_GET_NODE_KEY(pSkipList, pNextNode);
return ((pPrevNodeKey != NULL && pSkipList->comparFn(pNodeKey, pPrevNodeKey) == 0) ||
(pNextNodeKey != NULL && pSkipList->comparFn(pNodeKey, pNextNodeKey) == 0));
}
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) {
int32_t level = pNode->level;
uint8_t dupMode = SL_DUP_MODE(pSkipList->flags);
bool sizeReduce = !(dupMode == SL_APPEND_DUP_KEY && tSkipListIsNodeDup(pSkipList, pNode));
for (int32_t j = level - 1; j >= 0; --j) {
SSkipListNode *prev = SL_GET_BACKWARD_POINTER(pNode, j);
SSkipListNode *next = SL_GET_FORWARD_POINTER(pNode, j);
SL_GET_FORWARD_POINTER(prev, j) = next;
SL_GET_BACKWARD_POINTER(next, j) = prev;
} }
*level = getSkipListRandLevel(pSkipList); taosTFree(pNode);
*headSize = SL_NODE_HEADER_SIZE(*level);
if (sizeReduce) pSkipList->size--;
pSkipList->tsize--;
} }
SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { // Function must be called after calling tSkipListRemoveNodeImpl() function
if (pSkipList == NULL || pNode == NULL) { static void tSkipListCorrectLevel(SSkipList *pSkipList) {
return NULL; while (pSkipList->level > 0 && SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == pSkipList->pTail) {
pSkipList->level -= 1;
}
}
UNUSED_FUNC static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList,
int32_t level) { // record link count in each level
#if SKIP_LIST_RECORD_PERFORMANCE
for (int32_t i = 0; i < level; ++i) {
pSkipList->state.nLevelNodeCnt[i]++;
} }
#endif
}
if (pSkipList->lock) { UNUSED_FUNC static FORCE_INLINE void removeNodeEachLevel(SSkipList *pSkipList, int32_t level) {
pthread_rwlock_wrlock(pSkipList->lock); #if SKIP_LIST_RECORD_PERFORMANCE
for (int32_t i = 0; i < level; ++i) {
pSkipList->state.nLevelNodeCnt[i]--;
} }
#endif
}
// if the new key is greater than the maximum key of skip list, push back this node at the end of skip list static FORCE_INLINE int32_t getSkipListNodeRandomHeight(SSkipList *pSkipList) {
char *newDatakey = SL_GET_NODE_KEY(pSkipList, pNode); const uint32_t factor = 4;
if (pSkipList->size == 0 || pSkipList->comparFn(SL_GET_SL_MAX_KEY(pSkipList), newDatakey) < 0) {
return tSkipListPushBack(pSkipList, pNode); int32_t n = 1;
while ((rand() % factor) == 0 && n <= pSkipList->maxLevel) {
n++;
} }
// if the new key is less than the minimum key of skip list, push front this node at the front of skip list return n;
assert(pSkipList->size > 0); }
char* minKey = SL_GET_SL_MIN_KEY(pSkipList);
if (pSkipList->comparFn(newDatakey, minKey) < 0) { static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList) {
return tSkipListPushFront(pSkipList, pNode); int32_t level = 0;
if (pSkipList->size == 0) {
level = 1;
} else {
level = getSkipListNodeRandomHeight(pSkipList);
if (level > pSkipList->level) {
if (pSkipList->level < pSkipList->maxLevel) {
level = pSkipList->level + 1;
} else {
level = pSkipList->level;
}
}
} }
// find the appropriated position to insert data assert(level <= pSkipList->maxLevel);
SSkipListNode *px = pSkipList->pHead; return level;
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; }
int32_t ret = -1; // when order is TSDB_ORDER_ASC, return the last node with key less than val
// when order is TSDB_ORDER_DESC, return the first node with key large than val
static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_t order) {
__compar_fn_t comparFn = pSkipList->comparFn;
SSkipListNode *pNode = NULL;
if (order == TSDB_ORDER_ASC) {
pNode = pSkipList->pHead;
for (int32_t i = pSkipList->level - 1; i >= 0; --i) { for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(px, i); SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i);
while (p != pSkipList->pTail) { while (p != pSkipList->pTail) {
char *key = SL_GET_NODE_KEY(pSkipList, p); char *key = SL_GET_NODE_KEY(pSkipList, p);
if (comparFn(key, val) < 0) {
// if the forward element is less than the specified key, forward one step pNode = p;
ret = pSkipList->comparFn(key, newDatakey); p = SL_GET_FORWARD_POINTER(p, i);
if (ret < 0) {
px = p;
p = SL_GET_FORWARD_POINTER(px, i);
} else { } else {
break; break;
} }
} }
forward[i] = px;
} }
} else {
// if the skip list does not allowed identical key inserted, the new data will be discarded. pNode = pSkipList->pTail;
if (pSkipList->keyInfo.dupKey == 0 && ret == 0) { for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
if (pSkipList->lock) { SSkipListNode *p = SL_GET_BACKWARD_POINTER(pNode, i);
pthread_rwlock_unlock(pSkipList->lock); while (p != pSkipList->pHead) {
char *key = SL_GET_NODE_KEY(pSkipList, p);
if (comparFn(key, val) > 0) {
pNode = p;
p = SL_GET_BACKWARD_POINTER(p, i);
} else {
break;
}
}
} }
return NULL;
} }
tSkipListDoInsert(pSkipList, forward, pNode);
return pNode; return pNode;
} }
static int initForwardBackwardPtr(SSkipList *pSkipList) {
uint32_t maxLevel = pSkipList->maxLevel;
// head info
SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey key) { pSkipList->pHead = (SSkipListNode *)malloc(SL_NODE_HEADER_SIZE(maxLevel) * 2);
SArray* sa = taosArrayInit(1, POINTER_BYTES); if (pSkipList->pHead == NULL) {
return -1;
if (pSkipList->lock) {
pthread_rwlock_wrlock(pSkipList->lock);
} }
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); pSkipList->pHead->level = maxLevel;
while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); // tail info
if (p == pSkipList->pTail) { pSkipList->pTail = (SSkipListNode *)POINTER_SHIFT(pSkipList->pHead, SL_NODE_HEADER_SIZE(maxLevel));
break; pSkipList->pTail->level = maxLevel;
}
if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) {
break;
}
taosArrayPush(sa, &p);
pNode = p;
}
if (pSkipList->lock) { for (uint32_t i = 0; i < maxLevel; ++i) {
pthread_rwlock_unlock(pSkipList->lock); SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pSkipList->pTail;
SL_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pSkipList->pHead;
} }
return sa; return 0;
} }
static FORCE_INLINE SSkipListNode *tSkipListPutNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) {
SSkipListNode *pRetNode = NULL;
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
int hasDupKey = tSkipListGetPosToPut(pSkipList, forward, pNode);
if (SL_DUP_MODE(pSkipList->flags) == SL_DISCARD_DUP_KEY && hasDupKey) {
pRetNode = NULL;
} else {
pRetNode = pNode;
tSkipListDoInsert(pSkipList, forward, pNode, hasDupKey);
size_t tSkipListGetSize(const SSkipList* pSkipList) { if (pNode->level > pSkipList->level) pSkipList->level = pNode->level;
if (pSkipList == NULL) {
return 0;
} }
return pSkipList->size; return pRetNode;
} }
// static void tSkipListSeek(SSkipList *pSkipList, char *key, int order) {
// // TODO
// }
// static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey, // static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey,
// int32_t cond, SSkipListNode ***pRes) { // int32_t cond, SSkipListNode ***pRes) {
// pthread_rwlock_rdlock(&pSkipList->lock); // pthread_rwlock_rdlock(&pSkipList->lock);
...@@ -444,158 +775,7 @@ size_t tSkipListGetSize(const SSkipList* pSkipList) { ...@@ -444,158 +775,7 @@ size_t tSkipListGetSize(const SSkipList* pSkipList) {
// //
// return true; // return true;
//} //}
//
uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) {
uint32_t count = 0;
if (pSkipList->lock) {
pthread_rwlock_wrlock(pSkipList->lock);
}
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC);
while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0);
if (p == pSkipList->pTail) {
break;
}
if (pSkipList->comparFn(key, SL_GET_NODE_KEY(pSkipList, p)) != 0) {
break;
}
for (int32_t j = p->level - 1; j >= 0; --j) {
SSkipListNode* prev = SL_GET_BACKWARD_POINTER(p, j);
SSkipListNode* next = SL_GET_FORWARD_POINTER(p, j);
SL_GET_FORWARD_POINTER(prev, j) = next;
SL_GET_BACKWARD_POINTER(next, j) = prev;
}
if (pSkipList->keyInfo.freeNode) {
taosTFree(p);
}
++count;
}
// compress the minimum level of skip list
while (pSkipList->level > 0) {
if (SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) != NULL) {
break;
}
pSkipList->level--;
}
pSkipList->size -= count;
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return count;
}
void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) {
int32_t level = pNode->level;
if (pSkipList->lock) {
pthread_rwlock_wrlock(pSkipList->lock);
}
for (int32_t j = level - 1; j >= 0; --j) {
SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pNode, j);
SSkipListNode* next = SL_GET_FORWARD_POINTER(pNode, j);
SL_GET_FORWARD_POINTER(prev, j) = next;
SL_GET_BACKWARD_POINTER(next, j) = prev;
}
if (pSkipList->keyInfo.freeNode) {
taosTFree(pNode);
}
atomic_sub_fetch_32(&pSkipList->size, 1);
// compress the minimum level of skip list
while (pSkipList->level > 0 && SL_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == NULL) {
pSkipList->level -= 1;
}
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
}
SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList) {
if (pSkipList == NULL) {
return NULL;
}
return doCreateSkipListIterator(pSkipList, TSDB_ORDER_ASC);
}
SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* val, int32_t type, int32_t order) {
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
assert(pSkipList != NULL);
SSkipListIterator* iter = doCreateSkipListIterator(pSkipList, order);
if (val == NULL) {
return iter;
}
if (pSkipList->lock) {
pthread_rwlock_rdlock(pSkipList->lock);
}
iter->cur = getPriorNode(pSkipList, val, order);
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return iter;
}
bool tSkipListIterNext(SSkipListIterator *iter) {
if (iter->pSkipList == NULL) {
return false;
}
SSkipList *pSkipList = iter->pSkipList;
if (pSkipList->lock) {
pthread_rwlock_rdlock(pSkipList->lock);
}
if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate
iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0);
} else { // descending order iterate
iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0);
}
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
iter->step += 1;
return (iter->order == TSDB_ORDER_ASC)? (iter->cur != pSkipList->pTail) : (iter->cur != pSkipList->pHead);
}
SSkipListNode *tSkipListIterGet(SSkipListIterator *iter) {
if (iter == NULL || iter->cur == iter->pSkipList->pTail || iter->cur == iter->pSkipList->pHead) {
return NULL;
} else {
return iter->cur;
}
}
void* tSkipListDestroyIter(SSkipListIterator* iter) {
if (iter == NULL) {
return NULL;
}
taosTFree(iter);
return NULL;
}
// bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey) { // bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey) {
// SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; // SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType); // __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType);
...@@ -615,109 +795,3 @@ void* tSkipListDestroyIter(SSkipListIterator* iter) { ...@@ -615,109 +795,3 @@ void* tSkipListDestroyIter(SSkipListIterator* iter) {
// //
// return ret; // return ret;
//} //}
\ No newline at end of file
void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) {
return;
}
SSkipListNode *p = SL_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1);
int32_t id = 1;
char* prev = NULL;
while (p != pSkipList->pTail) {
char *key = SL_GET_NODE_KEY(pSkipList, p);
if (prev != NULL) {
assert(pSkipList->comparFn(prev, key) < 0);
}
switch (pSkipList->keyInfo.type) {
case TSDB_DATA_TYPE_INT:
fprintf(stdout, "%d: %d\n", id++, *(int32_t *)key);
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BIGINT:
fprintf(stdout, "%d: %" PRId64 " \n", id++, *(int64_t *)key);
break;
case TSDB_DATA_TYPE_BINARY:
fprintf(stdout, "%d: %s \n", id++, key);
break;
case TSDB_DATA_TYPE_DOUBLE:
fprintf(stdout, "%d: %lf \n", id++, *(double *)key);
break;
default:
fprintf(stdout, "\n");
}
prev = SL_GET_NODE_KEY(pSkipList, p);
p = SL_GET_FORWARD_POINTER(p, nlevel - 1);
}
}
void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) {
DO_MEMSET_PTR_AREA(pNode);
for (int32_t i = 0; i < pNode->level; ++i) {
SSkipListNode *x = forward[i];
SL_GET_BACKWARD_POINTER(pNode, i) = x;
SSkipListNode *next = SL_GET_FORWARD_POINTER(x, i);
SL_GET_BACKWARD_POINTER(next, i) = pNode;
SL_GET_FORWARD_POINTER(pNode, i) = next;
SL_GET_FORWARD_POINTER(x, i) = pNode;
}
atomic_add_fetch_32(&pSkipList->size, 1);
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
}
SSkipListNode* tSkipListPushFront(SSkipList* pSkipList, SSkipListNode *pNode) {
SSkipListNode* forward[MAX_SKIP_LIST_LEVEL] = {0};
for(int32_t i = 0; i < pSkipList->level; ++i) {
forward[i] = pSkipList->pHead;
}
tSkipListDoInsert(pSkipList, forward, pNode);
return pNode;
}
SSkipListNode* tSkipListPushBack(SSkipList *pSkipList, SSkipListNode *pNode) {
// do clear pointer area
DO_MEMSET_PTR_AREA(pNode);
for(int32_t i = 0; i < pNode->level; ++i) {
SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pSkipList->pTail, i);
SL_GET_FORWARD_POINTER(prev, i) = pNode;
SL_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail;
SL_GET_BACKWARD_POINTER(pNode, i) = prev;
SL_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode;
}
atomic_add_fetch_32(&pSkipList->size, 1);
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return pNode;
}
SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order) {
SSkipListIterator* iter = calloc(1, sizeof(SSkipListIterator));
iter->pSkipList = pSkipList;
iter->order = order;
if(order == TSDB_ORDER_ASC) {
iter->cur = pSkipList->pHead;
} else {
iter->cur = pSkipList->pTail;
}
return iter;
}
\ No newline at end of file
...@@ -247,7 +247,7 @@ void skiplistPerformanceTest() { ...@@ -247,7 +247,7 @@ void skiplistPerformanceTest() {
printf("total:%" PRIu64 " ms, avg:%f\n", e - s, (e - s) / (double)size); printf("total:%" PRIu64 " ms, avg:%f\n", e - s, (e - s) / (double)size);
printf("max level of skiplist:%d, actually level:%d\n ", pSkipList->maxLevel, pSkipList->level); printf("max level of skiplist:%d, actually level:%d\n ", pSkipList->maxLevel, pSkipList->level);
assert(tSkipListGetSize(pSkipList) == size); assert(SL_GET_SIZE(pSkipList) == size);
// printf("the level of skiplist is:\n"); // printf("the level of skiplist is:\n");
// //
...@@ -273,7 +273,7 @@ void skiplistPerformanceTest() { ...@@ -273,7 +273,7 @@ void skiplistPerformanceTest() {
int64_t et = taosGetTimestampMs(); int64_t et = taosGetTimestampMs();
printf("delete %d data from skiplist, elapased time:%" PRIu64 "ms\n", 10000, et - st); printf("delete %d data from skiplist, elapased time:%" PRIu64 "ms\n", 10000, et - st);
assert(tSkipListGetSize(pSkipList) == size); assert(SL_GET_SIZE(pSkipList) == size);
tSkipListDestroy(pSkipList); tSkipListDestroy(pSkipList);
taosTFree(total); taosTFree(total);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册