From cae5edeae82360d0336f167a2a0fb8437ebe7877 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 5 Aug 2020 18:14:49 +0800 Subject: [PATCH] make skiplist node malloc --- src/tsdb/inc/tsdbMain.h | 2 +- src/tsdb/src/tsdbMemTable.c | 35 ++++++++++++++++++++++++----------- src/tsdb/src/tsdbRead.c | 12 ++++++------ src/tsdb/tests/tsdbTests.cpp | 2 +- 4 files changed, 32 insertions(+), 19 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 5a06dfa65a..d3c4a3d515 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -415,7 +415,7 @@ static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { SSkipListNode* node = tSkipListIterGet(pIter); if (node == NULL) return NULL; - return SL_GET_NODE_DATA(node); + return *(SDataRow *)SL_GET_NODE_DATA(node); } static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 44f787a8c0..33e91bd59c 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -45,7 +45,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { SMemTable * pMemTable = pRepo->mem; STableData *pTableData = NULL; SSkipList * pSList = NULL; - int bytes = 0; if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL && pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) { @@ -55,27 +54,39 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { tSkipListNewNodeInfo(pSList, &level, &headSize); - bytes = headSize + dataRowLen(row); - SSkipListNode *pNode = tsdbAllocBytes(pRepo, bytes); + SSkipListNode *pNode = (SSkipListNode *)malloc(headSize + sizeof(SDataRow *)); if (pNode == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row)); + if (pRow == NULL) { tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s", - REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), bytes, tstrerror(terrno)); + REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno)); + free(pNode); return -1; } + pNode->level = level; - dataRowCpy(SL_GET_NODE_DATA(pNode), row); + dataRowCpy(pRow, row); + *(SDataRow *)SL_GET_NODE_DATA(pNode) = pRow; // Operations above may change pRepo->mem, retake those values ASSERT(pRepo->mem != NULL); pMemTable = pRepo->mem; if (TABLE_TID(pTable) >= pMemTable->maxTables) { - if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) return -1;; + if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { + tsdbFreeBytes(pRepo, pRow, dataRowLen(row)); + free(pNode); + return -1; + } } pTableData = pMemTable->tData[TABLE_TID(pTable)]; if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) { - if (pTableData != NULL) { // destroy the table skiplist (may have race condition problem) + if (pTableData != NULL) { taosWLockLatch(&(pMemTable->latch)); pMemTable->tData[TABLE_TID(pTable)] = NULL; tsdbFreeTableData(pTableData); @@ -87,7 +98,8 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while create new table data object since %s", REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno)); - tsdbFreeBytes(pRepo, (void *)pNode, bytes); + tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); + free(pNode); return -1; } @@ -97,7 +109,8 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); if (tSkipListPut(pTableData->pData, pNode) == NULL) { - tsdbFreeBytes(pRepo, (void *)pNode, bytes); + tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); + free(pNode); } else { if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key; if (pMemTable->keyFirst > key) pMemTable->keyFirst = key; @@ -416,7 +429,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { pTableData->numOfRows = 0; pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, - TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, tsdbGetTsTupleKey); + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 1, tsdbGetTsTupleKey); if (pTableData->pData == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; @@ -436,7 +449,7 @@ static void tsdbFreeTableData(STableData *pTableData) { } } -static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); } +static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); } static void *tsdbCommitData(void *arg) { STsdbRepo * pRepo = (STsdbRepo *)arg; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index ccc631fb58..a34216d2fb 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -343,7 +343,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); assert(node != NULL); - SDataRow row = SL_GET_NODE_DATA(node); + SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node); TSKEY key = dataRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo); @@ -356,7 +356,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); assert(node != NULL); - SDataRow row = SL_GET_NODE_DATA(node); + SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node); TSKEY key = dataRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo); @@ -378,14 +378,14 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order) { if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); if (node != NULL) { - rmem = SL_GET_NODE_DATA(node); + rmem = *(SDataRow *)SL_GET_NODE_DATA(node); } } if (pCheckInfo->iiter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); if (node != NULL) { - rimem = SL_GET_NODE_DATA(node); + rimem = *(SDataRow *)SL_GET_NODE_DATA(node); } } @@ -1184,8 +1184,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* * copy them all to result buffer, since it may be overlapped with file data block. */ if (node == NULL || - ((dataRowKey(SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || - ((dataRowKey(SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) { + ((dataRowKey(*(SDataRow *)SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || + ((dataRowKey(*(SDataRow *)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) { // no data in cache or data in cache is greater than the ekey of time window, load data from file block if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = tsArray[pos]; diff --git a/src/tsdb/tests/tsdbTests.cpp b/src/tsdb/tests/tsdbTests.cpp index e504109cec..605586515b 100644 --- a/src/tsdb/tests/tsdbTests.cpp +++ b/src/tsdb/tests/tsdbTests.cpp @@ -35,7 +35,7 @@ static int insertData(SInsertInfo *pInfo) { for (int k = 0; k < pInfo->totalRows/pInfo->rowsPerSubmit; k++) { memset((void *)pMsg, 0, sizeof(SSubmitMsg)); - SSubmitBlk *pBlock = pMsg->blocks; + SSubmitBlk *pBlock = (SSubmitBlk *)pMsg->blocks; pBlock->uid = pInfo->uid; pBlock->tid = pInfo->tid; pBlock->sversion = pInfo->sversion; -- GitLab