提交 e0b1585e 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/TD-1413

......@@ -241,8 +241,12 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {
SCqObj* pObj = (SCqObj*)param;
SCqContext* pContext = pObj->pContext;
SSqlObj* pSql = (SSqlObj*)result;
pContext->dbConn = pSql->pTscObj;
if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) {
taos_close(pSql->pTscObj);
}
pthread_mutex_lock(&pContext->mutex);
cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex);
}
static void cqProcessCreateTimer(void *param, void *tmrId) {
......@@ -253,7 +257,9 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
cDebug("vgId:%d, try connect to TDengine", pContext->vgId);
taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL);
} else {
pthread_mutex_lock(&pContext->mutex);
cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex);
}
}
......@@ -267,12 +273,14 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
}
pObj->tmrId = 0;
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL);
if (pObj->pStream) {
pContext->num++;
cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL);
if (pObj->pStream) {
pContext->num++;
cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
}
}
}
......
......@@ -925,6 +925,8 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst);
}
ASSERT((blkIdx == pIdx->numOfBlocks -1) || (!pCompBlock->last));
tsdbDebug("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
blkIdx);
......@@ -1050,6 +1052,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast;
pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last;
ASSERT((blkIdx == pIdx->numOfBlocks-1) || (!pCompBlock->last));
tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
blkIdx);
......@@ -1630,11 +1634,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
pCfg->update);
if (pDataCols->numOfRows == 0) break;
if (tblkIdx == pIdx->numOfBlocks - 1) {
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
} else {
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
}
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
if (round == 0) {
if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
......
......@@ -93,17 +93,23 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) {
return ret;
}
int64_t tfFsync(int64_t tfd) {
int32_t tfFsync(int64_t tfd) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
return fsync(fd);
int32_t code = fsync(fd);
taosReleaseRef(tsFileRsetId, tfd);
return code;
}
bool tfValid(int64_t tfd) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
return p != NULL;
if (p == NULL) return false;
taosReleaseRef(tsFileRsetId, tfd);
return true;
}
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence) {
......@@ -111,7 +117,10 @@ int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence) {
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
return taosLSeek(fd, offset, whence);
int64_t ret = taosLSeek(fd, offset, whence);
taosReleaseRef(tsFileRsetId, tfd);
return ret;
}
int32_t tfFtruncate(int64_t tfd, int64_t length) {
......@@ -119,5 +128,8 @@ int32_t tfFtruncate(int64_t tfd, int64_t length) {
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
return taosFtruncate(fd, length);
int32_t code = taosFtruncate(fd, length);
taosReleaseRef(tsFileRsetId, tfd);
return code;
}
......@@ -560,6 +560,37 @@ void taosTmrCleanUp(void* handle) {
tmrDebug("%s timer controller is cleaned up.", ctrl->label);
ctrl->label[0] = 0;
// cancel all timers of this controller
for (size_t i = 0; i < timerMap.size; i++) {
timer_list_t* list = timerMap.slots + i;
lockTimerList(list);
tmr_obj_t* t = list->timers;
tmr_obj_t* prev = NULL;
while (t != NULL) {
tmr_obj_t* next = t->mnext;
if (t->ctrl != ctrl) {
prev = t;
t = next;
continue;
}
uint8_t state = atomic_val_compare_exchange_8(&t->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
if (state == TIMER_STATE_WAITING) {
removeFromWheel(t);
}
timerDecRef(t);
if (prev == NULL) {
list->timers = next;
} else {
prev->mnext = next;
}
t = next;
}
unlockTimerList(list);
}
pthread_mutex_lock(&tmrCtrlMutex);
ctrl->next = unusedTmrCtrl;
numOfTmrCtrl--;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册