提交 176d247e 编写于 作者: H Hongze Cheng

resolve some static scan error

上级 ec149ac4
...@@ -516,7 +516,7 @@ void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) { ...@@ -516,7 +516,7 @@ void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) {
SFile file; SFile file;
SFile * pFile = &file; SFile * pFile = &file;
strncpy(pFile->fname, fname, TSDB_FILENAME_LEN); strncpy(pFile->fname, fname, TSDB_FILENAME_LEN - 1);
pFile->fd = -1; pFile->fd = -1;
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err; if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_DATA_SKIPLIST_LEVEL 5
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable); static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
...@@ -41,6 +40,7 @@ static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIte ...@@ -41,6 +40,7 @@ static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIte
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock); static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter); static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter);
static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter);
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey, static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now); TSKEY now);
...@@ -97,7 +97,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { ...@@ -97,7 +97,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
} }
int code = pthread_cond_signal(&pBufPool->poolNotEmpty); int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
if (code != 0) { if (code != 0) {
tsdbUnlockRepo(pRepo); if (tsdbUnlockRepo(pRepo) < 0) return -1;
tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code)); tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code));
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
return -1; return -1;
...@@ -134,6 +134,8 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) { ...@@ -134,6 +134,8 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
} }
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) { void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) {
tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem);
if (pMem != NULL) { if (pMem != NULL) {
taosRUnLockLatch(&(pMem->latch)); taosRUnLockLatch(&(pMem->latch));
tsdbUnRefMemTable(pRepo, pMem); tsdbUnRefMemTable(pRepo, pMem);
...@@ -142,8 +144,6 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) ...@@ -142,8 +144,6 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem)
if (pIMem != NULL) { if (pIMem != NULL) {
tsdbUnRefMemTable(pRepo, pIMem); tsdbUnRefMemTable(pRepo, pIMem);
} }
tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem);
} }
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
...@@ -175,6 +175,10 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { ...@@ -175,6 +175,10 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
ASSERT(pRepo->mem->extraBuffList != NULL); ASSERT(pRepo->mem->extraBuffList != NULL);
SListNode *pNode = (SListNode *)malloc(sizeof(SListNode) + bytes); SListNode *pNode = (SListNode *)malloc(sizeof(SListNode) + bytes);
if (pNode == NULL) { if (pNode == NULL) {
if (listNEles(pRepo->mem->extraBuffList) == 0) {
tdListFree(pRepo->mem->extraBuffList);
pRepo->mem->extraBuffList = NULL;
}
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -205,18 +209,18 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { ...@@ -205,18 +209,18 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
} }
int tsdbAsyncCommit(STsdbRepo *pRepo) { int tsdbAsyncCommit(STsdbRepo *pRepo) {
if (pRepo->mem == NULL) return 0;
SMemTable *pIMem = pRepo->imem; SMemTable *pIMem = pRepo->imem;
if (pRepo->mem != NULL) { sem_wait(&(pRepo->readyToCommit));
sem_wait(&(pRepo->readyToCommit));
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
if (tsdbLockRepo(pRepo) < 0) return -1; if (tsdbLockRepo(pRepo) < 0) return -1;
pRepo->imem = pRepo->mem; pRepo->imem = pRepo->mem;
pRepo->mem = NULL; pRepo->mem = NULL;
tsdbScheduleCommit(pRepo); tsdbScheduleCommit(pRepo);
if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnlockRepo(pRepo) < 0) return -1;
}
if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1; if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
...@@ -414,25 +418,6 @@ _exit: ...@@ -414,25 +418,6 @@ _exit:
} }
// ---------------- LOCAL FUNCTIONS ---------------- // ---------------- LOCAL FUNCTIONS ----------------
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
ASSERT(pRepo->mem != NULL);
if (pRepo->mem->extraBuffList == NULL) {
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
ASSERT(pBufBlock != NULL);
pBufBlock->offset -= bytes;
pBufBlock->remain += bytes;
ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
} else {
SListNode *pNode = (SListNode *)POINTER_SHIFT(ptr, -(int)(sizeof(SListNode)));
ASSERT(listTail(pRepo->mem->extraBuffList) == pNode);
tdListPopNode(pRepo->mem->extraBuffList, pNode);
free(pNode);
tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes);
}
}
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) { static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
...@@ -922,8 +907,8 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * ...@@ -922,8 +907,8 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
tsdbInitSubmitBlkIter(pBlock, &blkIter); tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) { if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) {
free(rows); tsdbFreeRows(pRepo, rows, rowCounter);
return -1; goto _err;
} }
(*affectedrows)++; (*affectedrows)++;
...@@ -935,8 +920,7 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * ...@@ -935,8 +920,7 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
} }
if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) { if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
free(rows); goto _err;
return -1;
} }
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion); STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
...@@ -945,6 +929,10 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * ...@@ -945,6 +929,10 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
free(rows); free(rows);
return 0; return 0;
_err:
free(rows);
return -1;
} }
static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow) { static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow) {
...@@ -1104,9 +1092,7 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro ...@@ -1104,9 +1092,7 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro
if (TABLE_TID(pTable) >= pMemTable->maxTables) { if (TABLE_TID(pTable) >= pMemTable->maxTables) {
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
for (int i = rowCounter - 1; i >= 0; i--) { tsdbFreeRows(pRepo, rows, rowCounter);
tsdbFreeBytes(pRepo, rows[i], dataRowLen(rows[i]));
}
return -1; return -1;
} }
} }
...@@ -1124,9 +1110,7 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro ...@@ -1124,9 +1110,7 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro
if (pTableData == NULL) { if (pTableData == NULL) {
tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo), tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno)); TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno));
for (int i = rowCounter - 1; i >= 0; i--) { tsdbFreeRows(pRepo, rows, rowCounter);
tsdbFreeBytes(pRepo, rows[i], dataRowLen(rows[i]));
}
return -1; return -1;
} }
...@@ -1151,4 +1135,43 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro ...@@ -1151,4 +1135,43 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro
if (TABLE_LASTKEY(pTable) < dataRowKey(rows[rowCounter-1])) TABLE_LASTKEY(pTable) = dataRowKey(rows[rowCounter-1]); if (TABLE_LASTKEY(pTable) < dataRowKey(rows[rowCounter-1])) TABLE_LASTKEY(pTable) = dataRowKey(rows[rowCounter-1]);
return 0; return 0;
}
static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
ASSERT(pRepo->mem != NULL);
STsdbBufPool *pBufPool = pRepo->pPool;
for (int i = rowCounter - 1; i >= 0; --i) {
SDataRow row = (SDataRow)rows[i];
int bytes = (int)dataRowLen(row);
if (pRepo->mem->extraBuffList == NULL) {
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
ASSERT(pBufBlock != NULL && pBufBlock->offset >= bytes);
pBufBlock->offset -= bytes;
pBufBlock->remain += bytes;
ASSERT(row == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
if (pBufBlock->offset == 0) { // return the block to buffer pool
tsdbLockRepo(pRepo);
SListNode *pNode = tdListPopTail(pRepo->mem->bufBlockList);
tdListPrependNode(pBufPool->bufBlockList, pNode);
tsdbUnlockRepo(pRepo);
}
} else {
ASSERT(listNEles(pRepo->mem->extraBuffList) > 0);
SListNode *pNode = tdListPopTail(pRepo->mem->extraBuffList);
ASSERT(row == pNode->data);
free(pNode);
tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes);
if (listNEles(pRepo->mem->extraBuffList) == 0) {
tdListFree(pRepo->mem->extraBuffList);
pRepo->mem->extraBuffList = NULL;
}
}
}
} }
\ No newline at end of file
...@@ -1595,7 +1595,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1595,7 +1595,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
tblkIdx++; tblkIdx++;
} else if (oBlock.numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed == 0) { } else if (oBlock.numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed == 0) {
// Delete the block and do some stuff // Delete the block and do some stuff
ASSERT(pMergeInfo->keyFirst == INT64_MAX && pMergeInfo->keyFirst == INT64_MIN); // ASSERT(pMergeInfo->keyFirst == INT64_MAX && pMergeInfo->keyFirst == INT64_MIN);
if (tsdbDeleteSuperBlock(pHelper, tblkIdx) < 0) return -1; if (tsdbDeleteSuperBlock(pHelper, tblkIdx) < 0) return -1;
*pCommitIter->pIter = slIter; *pCommitIter->pIter = slIter;
if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册