提交 e9a3969b 编写于 作者: L lichuang

[TD-3963]async commit a msg to save config

上级 12224101
...@@ -66,6 +66,7 @@ int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pAT ...@@ -66,6 +66,7 @@ int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pAT
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot); void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot);
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbAsyncCommitConfig(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
void* tsdbCommitData(STsdbRepo* pRepo); void* tsdbCommitData(STsdbRepo* pRepo);
......
...@@ -154,6 +154,7 @@ static void *tsdbLoopCommit(void *arg) { ...@@ -154,6 +154,7 @@ static void *tsdbLoopCommit(void *arg) {
SCommitQueue *pQueue = &tsCommitQueue; SCommitQueue *pQueue = &tsCommitQueue;
SListNode * pNode = NULL; SListNode * pNode = NULL;
STsdbRepo * pRepo = NULL; STsdbRepo * pRepo = NULL;
bool config_changed = false;
while (true) { while (true) {
pthread_mutex_lock(&(pQueue->lock)); pthread_mutex_lock(&(pQueue->lock));
...@@ -177,11 +178,17 @@ static void *tsdbLoopCommit(void *arg) { ...@@ -177,11 +178,17 @@ static void *tsdbLoopCommit(void *arg) {
pRepo = ((SCommitReq *)pNode->data)->pRepo; pRepo = ((SCommitReq *)pNode->data)->pRepo;
// check if need to apply new config // check if need to apply new config
config_changed = pRepo->config_changed;
if (pRepo->config_changed) { if (pRepo->config_changed) {
tsdbApplyRepoConfig(pRepo); tsdbApplyRepoConfig(pRepo);
} }
if (config_changed && pRepo->imem == NULL) {
tsem_post(&(pRepo->readyToCommit));
} else {
tsdbCommitData(pRepo); tsdbCommitData(pRepo);
}
listNodeFree(pNode); listNodeFree(pNode);
} }
......
...@@ -271,7 +271,7 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { ...@@ -271,7 +271,7 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
pthread_mutex_unlock(&repo->save_mutex); pthread_mutex_unlock(&repo->save_mutex);
// schedule a commit msg then the new config will be applied immediatly // schedule a commit msg then the new config will be applied immediatly
tsdbAsyncCommit(repo); tsdbAsyncCommitConfig(repo);
return 0; return 0;
#if 0 #if 0
......
...@@ -271,10 +271,25 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { ...@@ -271,10 +271,25 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
return ptr; return ptr;
} }
int tsdbAsyncCommitConfig(STsdbRepo* pRepo) {
ASSERT(pRepo->config_changed == true);
tsem_wait(&(pRepo->readyToCommit));
if (pRepo->code != TSDB_CODE_SUCCESS) {
tsdbWarn("vgId:%d try to commit config when TSDB not in good state: %s", REPO_ID(pRepo), tstrerror(terrno));
}
if (tsdbLockRepo(pRepo) < 0) return -1;
tsdbScheduleCommit(pRepo);
if (tsdbUnlockRepo(pRepo) < 0) return -1;
return 0;
}
int tsdbAsyncCommit(STsdbRepo *pRepo) { int tsdbAsyncCommit(STsdbRepo *pRepo) {
tsem_wait(&(pRepo->readyToCommit)); tsem_wait(&(pRepo->readyToCommit));
//ASSERT(pRepo->imem == NULL); ASSERT(pRepo->imem == NULL);
if (pRepo->mem == NULL) { if (pRepo->mem == NULL) {
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册