提交 7aa1ea29 编写于 作者: H Hongze Cheng

TD-353

上级 21a5beab
...@@ -123,8 +123,6 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { ...@@ -123,8 +123,6 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex)); pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex));
} }
ASSERT(!POOL_IS_EMPTY(pBufPool));
SListNode * pNode = tdListPopHead(pBufPool->bufBlockList); SListNode * pNode = tdListPopHead(pBufPool->bufBlockList);
STsdbBufBlock *pBufBlock = NULL; STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pBufPool->bufBlockList, pNode, (void *)(&pBufBlock)); tdListNodeGetData(pBufPool->bufBlockList, pNode, (void *)(&pBufBlock));
......
...@@ -97,7 +97,6 @@ int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { ...@@ -97,7 +97,6 @@ int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
// Need to lock the repository // Need to lock the repository
int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
ASSERT(IS_REPO_LOCKED(pRepo));
ASSERT(pMemTable != NULL); ASSERT(pMemTable != NULL);
if (T_REF_DEC(pMemTable) == 0) { if (T_REF_DEC(pMemTable) == 0) {
...@@ -105,12 +104,15 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { ...@@ -105,12 +104,15 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
STsdbBufPool *pBufPool = pRepo->pPool; STsdbBufPool *pBufPool = pRepo->pPool;
SListNode *pNode = NULL; SListNode *pNode = NULL;
// TODO: check the correctness of this code part
if (tsdbLockRepo(pRepo) < 0) return -1;
while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) { while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
tdListAppendNode(pBufPool->bufBlockList, pNode); tdListAppendNode(pBufPool->bufBlockList, pNode);
if (pthread_cond_signal(&pBufPool->poolNotEmpty) != 0) { if (pthread_cond_signal(&pBufPool->poolNotEmpty) != 0) {
// TODO // TODO
} }
} }
if (tsdbUnlockRepo(pRepo) < 0) return -1;
for (int i = 0; i < pCfg->maxTables; i++) { for (int i = 0; i < pCfg->maxTables; i++) {
if (pMemTable->tData[i] != NULL) { if (pMemTable->tData[i] != NULL) {
...@@ -127,7 +129,8 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { ...@@ -127,7 +129,8 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
// ---------------- LOCAL FUNCTIONS ---------------- // ---------------- LOCAL FUNCTIONS ----------------
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
if (pRepo == NULL || pRepo->mem == NULL) return NULL; ASSERT(pRepo != NULL);
if (pRepo->mem == NULL) return NULL;
SListNode *pNode = listTail(pRepo->mem); SListNode *pNode = listTail(pRepo->mem);
if (pNode == NULL) return NULL; if (pNode == NULL) return NULL;
...@@ -141,45 +144,40 @@ static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { ...@@ -141,45 +144,40 @@ static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
static void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { static void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
STsdbCfg * pCfg = &pRepo->config; STsdbCfg * pCfg = &pRepo->config;
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
int code = 0;
if (pBufBlock != NULL && pBufBlock->remain < bytes) { if (pBufBlock != NULL && pBufBlock->remain < bytes) {
if (listNEles(pRepo->mem) >= pCfg->totalBlocks / 2) { // need to trigger commit if (listNEles(pRepo->mem) >= pCfg->totalBlocks / 2) { // need to commit mem
if (pRepo->imem != NULL) { if (pRepo->imem) {
if (pRepo->commit) pthread_join(pRepo->commitThread, NULL); code = pthread_join(pRepo->commitThread, NULL);
if (code != 0) {
ASSERT(pRepo->commit == 0); tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
SMemTable *pIMem = pRepo->imem;
if (tsdbLockRepo(pRepo) < 0) {
// TODO
return NULL;
}
pRepo->imem = pRepo->mem;
pRepo->mem = NULL;
pRepo->commit = 1;
if (pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo) != 0) {
// TODO
tsdbUnlockRepo(pRepo);
return NULL;
}
if (tsdbUnlockRepo(pRepo) < 0) {
// TODO
return NULL; return NULL;
} }
tsdbUnRefMemTable(pRepo, pIMem);
} }
} else {
if (tsdbLockRepo(pRepo) < 0) { ASSERT(pRepo->commit == 0);
tsdbFreeMemTable(pMemTable); SMemTable *pImem = pRepo->imem;
if (tsdbLockRepo(pRepo) < 0) return NULL;
pRepo->imem = pRepo->mem;
pRepo->mem = NULL;
pRepo->commit = 1;
code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo);
if (code != 0) {
tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
tsdbUnlockRepo(pRepo);
return NULL; return NULL;
} }
if (tsdbUnlockRepo(pRepo) < 0) return NULL;
if (pImem && tsdbUnRefMemTable(pRepo, pImem) < 0) return NULL;
} else {
if (tsdbLockRepo(pRepo) < 0) return NULL;
SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo); SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo);
tdListAppendNode(pMemTable->bufBlockList, pNode); tdListAppendNode(pRepo->mem->bufBlockList, pNode);
pRepo->mem = pMemTable;
if (tsdbUnlockRepo(pRepo) < 0) return NULL; if (tsdbUnlockRepo(pRepo) < 0) return NULL;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册