未验证 提交 f5e2d44f 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4197 from taosdata/feature/wal

Feature/wal
......@@ -324,6 +324,8 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int
int tsdbInitCommitQueue(int nthreads);
void tsdbDestroyCommitQueue();
int tsdbSyncCommit(TSDB_REPO_T *repo);
void tsdbIncCommitRef(int vgId);
void tsdbDecCommitRef(int vgId);
#ifdef __cplusplus
}
......
......@@ -132,7 +132,7 @@ int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size) {
// if (leftbytes > 1000000000) leftbytes = 1000000000;
sentbytes = sendfile(dfd, sfd, offset, leftbytes);
if (sentbytes == -1) {
if (errno == EINTR) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
} else {
return -1;
......
......@@ -1222,7 +1222,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
// always update version
nodeVersion = pWalHead->version;
sDebug("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica,
sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica,
syncRole[nodeRole], qtypeStr[qtype], pWalHead->version);
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
......
......@@ -15,6 +15,7 @@
#include "os.h"
#include "tlist.h"
#include "tref.h"
#include "tsdbMain.h"
typedef struct {
......@@ -22,6 +23,7 @@ typedef struct {
pthread_mutex_t lock;
pthread_cond_t queueNotEmpty;
int nthreads;
int refCount;
SList * queue;
pthread_t * threads;
} SCommitQueue;
......@@ -103,7 +105,7 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) {
pthread_mutex_lock(&(pQueue->lock));
ASSERT(!pQueue->stop);
// ASSERT(pQueue->stop);
tdListAppendNode(pQueue->queue, pNode);
pthread_cond_signal(&(pQueue->queueNotEmpty));
......@@ -123,7 +125,7 @@ static void *tsdbLoopCommit(void *arg) {
while (true) {
pNode = tdListPopHead(pQueue->queue);
if (pNode == NULL) {
if (pQueue->stop) {
if (pQueue->stop && pQueue->refCount <= 0) {
pthread_mutex_unlock(&(pQueue->lock));
goto _exit;
} else {
......@@ -145,3 +147,14 @@ static void *tsdbLoopCommit(void *arg) {
_exit:
return NULL;
}
void tsdbIncCommitRef(int vgId) {
int refCount = atomic_add_fetch_32(&tsCommitQueue.refCount, 1);
tsdbDebug("vgId:%d, inc commit queue ref to %d", vgId, refCount);
}
void tsdbDecCommitRef(int vgId) {
int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1);
pthread_cond_broadcast(&(tsCommitQueue.queueNotEmpty));
tsdbDebug("vgId:%d, dec commit queue ref to %d", vgId, refCount);
}
\ No newline at end of file
......@@ -355,6 +355,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
tsdbIncCommitRef(pVnode->vgId);
taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
return TSDB_CODE_SUCCESS;
......@@ -446,6 +447,7 @@ void vnodeRelease(void *pVnodeRaw) {
tsem_destroy(&pVnode->sem);
free(pVnode);
tsdbDecCommitRef(vgId);
int32_t count = taosHashGetSize(tsVnodesHash);
vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册