未验证 提交 1c892e3e 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17894 from taosdata/fix/TD-20161

enh: adjust tqueue and tworker log
......@@ -735,19 +735,17 @@ To prevent system resource from being exhausted by multiple concurrent streams,
| 44 | numOfVnodeQueryThreads | No | Yes |
| 45 | numOfVnodeStreamThreads | No | Yes |
| 46 | numOfVnodeFetchThreads | No | Yes |
| 47 | numOfVnodeWriteThreads | No | Yes |
| 48 | numOfVnodeSyncThreads | No | Yes |
| 49 | numOfVnodeRsmaThreads | No | Yes |
| 50 | numOfQnodeQueryThreads | No | Yes |
| 51 | numOfQnodeFetchThreads | No | Yes |
| 52 | numOfSnodeSharedThreads | No | Yes |
| 53 | numOfSnodeUniqueThreads | No | Yes |
| 54 | rpcQueueMemoryAllowed | No | Yes |
| 55 | logDir | Yes | Yes |
| 56 | minimalLogDirGB | Yes | Yes |
| 57 | numOfLogLines | Yes | Yes |
| 58 | asyncLog | Yes | Yes |
| 59 | logKeepDays | Yes | Yes |
| 47 | numOfVnodeRsmaThreads | No | Yes |
| 48 | numOfQnodeQueryThreads | No | Yes |
| 49 | numOfQnodeFetchThreads | No | Yes |
| 50 | numOfSnodeSharedThreads | No | Yes |
| 51 | numOfSnodeUniqueThreads | No | Yes |
| 52 | rpcQueueMemoryAllowed | No | Yes |
| 53 | logDir | Yes | Yes |
| 54 | minimalLogDirGB | Yes | Yes |
| 55 | numOfLogLines | Yes | Yes |
| 56 | asyncLog | Yes | Yes |
| 57 | logKeepDays | Yes | Yes |
| 60 | debugFlag | Yes | Yes |
| 61 | tmrDebugFlag | Yes | Yes |
| 62 | uDebugFlag | Yes | Yes |
......
......@@ -711,19 +711,17 @@ charset 的有效值是 UTF-8。
| 44 | numOfVnodeQueryThreads | 否 | 是 | |
| 45 | numOfVnodeStreamThreads | 否 | 是 | |
| 46 | numOfVnodeFetchThreads | 否 | 是 | |
| 47 | numOfVnodeWriteThreads | 否 | 是 | |
| 48 | numOfVnodeSyncThreads | 否 | 是 | |
| 49 | numOfVnodeRsmaThreads | 否 | 是 | |
| 50 | numOfQnodeQueryThreads | 否 | 是 | |
| 51 | numOfQnodeFetchThreads | 否 | 是 | |
| 52 | numOfSnodeSharedThreads | 否 | 是 | |
| 53 | numOfSnodeUniqueThreads | 否 | 是 | |
| 54 | rpcQueueMemoryAllowed | 否 | 是 | |
| 55 | logDir | 是 | 是 | |
| 56 | minimalLogDirGB | 是 | 是 | |
| 57 | numOfLogLines | 是 | 是 | |
| 58 | asyncLog | 是 | 是 | |
| 59 | logKeepDays | 是 | 是 | |
| 47 | numOfVnodeRsmaThreads | 否 | 是 | |
| 48 | numOfQnodeQueryThreads | 否 | 是 | |
| 49 | numOfQnodeFetchThreads | 否 | 是 | |
| 50 | numOfSnodeSharedThreads | 否 | 是 | |
| 51 | numOfSnodeUniqueThreads | 否 | 是 | |
| 52 | rpcQueueMemoryAllowed | 否 | 是 | |
| 53 | logDir | 是 | 是 | |
| 54 | minimalLogDirGB | 是 | 是 | |
| 55 | numOfLogLines | 是 | 是 | |
| 56 | asyncLog | 是 | 是 | |
| 57 | logKeepDays | 是 | 是 | |
| 60 | debugFlag | 是 | 是 | |
| 61 | tmrDebugFlag | 是 | 是 | |
| 62 | uDebugFlag | 是 | 是 | |
......
......@@ -55,8 +55,6 @@ extern int32_t tsNumOfMnodeReadThreads;
extern int32_t tsNumOfVnodeQueryThreads;
extern int32_t tsNumOfVnodeStreamThreads;
extern int32_t tsNumOfVnodeFetchThreads;
extern int32_t tsNumOfVnodeWriteThreads;
extern int32_t tsNumOfVnodeSyncThreads;
extern int32_t tsNumOfVnodeRsmaThreads;
extern int32_t tsNumOfQnodeQueryThreads;
extern int32_t tsNumOfQnodeFetchThreads;
......
......@@ -59,6 +59,47 @@ typedef enum {
typedef void (*FItem)(SQueueInfo *pInfo, void *pItem);
typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems);
typedef struct STaosQnode STaosQnode;
typedef struct STaosQnode {
STaosQnode *next;
STaosQueue *queue;
int64_t timestamp;
int32_t size;
int8_t itype;
int8_t reserved[3];
char item[];
} STaosQnode;
typedef struct STaosQueue {
STaosQnode *head;
STaosQnode *tail;
STaosQueue *next; // for queue set
STaosQset *qset; // for queue set
void *ahandle; // for queue set
FItem itemFp;
FItems itemsFp;
TdThreadMutex mutex;
int64_t memOfItems;
int32_t numOfItems;
int64_t threadId;
} STaosQueue;
typedef struct STaosQset {
STaosQueue *head;
STaosQueue *current;
TdThreadMutex mutex;
tsem_t sem;
int32_t numOfQueues;
int32_t numOfItems;
} STaosQset;
typedef struct STaosQall {
STaosQnode *current;
STaosQnode *start;
int32_t numOfItems;
} STaosQall;
STaosQueue *taosOpenQueue();
void taosCloseQueue(STaosQueue *queue);
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
......
......@@ -26,8 +26,9 @@ typedef struct SQWorkerPool SQWorkerPool;
typedef struct SWWorkerPool SWWorkerPool;
typedef struct SQWorker {
int32_t id; // worker ID
TdThread thread; // thread
int32_t id; // worker id
int64_t pid; // thread pid
TdThread thread; // thread id
SQWorkerPool *pool;
} SQWorker;
......@@ -43,9 +44,10 @@ typedef struct SQWorkerPool {
typedef struct SWWorker {
int32_t id; // worker id
TdThread thread; // thread
int64_t pid; // thread pid
TdThread thread; // thread id
STaosQall *qall;
STaosQset *qset; // queue set
STaosQset *qset;
SWWorkerPool *pool;
} SWWorker;
......
......@@ -50,8 +50,6 @@ int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 4;
int32_t tsNumOfVnodeStreamThreads = 2;
int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeWriteThreads = 2;
int32_t tsNumOfVnodeSyncThreads = 2;
int32_t tsNumOfVnodeRsmaThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 4;
int32_t tsNumOfQnodeFetchThreads = 1;
......@@ -374,14 +372,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeWriteThreads = tsNumOfCores;
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeSyncThreads = tsNumOfCores * 2;
tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 16);
if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeRsmaThreads = tsNumOfCores;
tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, 0) != 0) return -1;
......@@ -506,22 +496,6 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "numOfVnodeWriteThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfVnodeWriteThreads = numOfCores;
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
pItem->i32 = tsNumOfVnodeWriteThreads;
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "numOfVnodeSyncThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfVnodeSyncThreads = numOfCores * 2;
tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 16);
pItem->i32 = tsNumOfVnodeSyncThreads;
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "numOfVnodeRsmaThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfVnodeRsmaThreads = numOfCores;
......@@ -699,8 +673,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
tsNumOfVnodeStreamThreads = cfgGetItem(pCfg, "numOfVnodeStreamThreads")->i32;
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
......@@ -943,10 +915,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
} else if (strcasecmp("numOfVnodeFetchThreads", name) == 0) {
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
*/
} else if (strcasecmp("numOfVnodeWriteThreads", name) == 0) {
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
} else if (strcasecmp("numOfVnodeSyncThreads", name) == 0) {
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
} else if (strcasecmp("numOfVnodeRsmaThreads", name) == 0) {
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
......
......@@ -33,10 +33,6 @@ typedef struct SVnodeMgmt {
SQWorkerPool queryPool;
SQWorkerPool streamPool;
SWWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool syncCtrlPool;
SWWorkerPool writePool;
SWWorkerPool applyPool;
SSingleWorker mgmtWorker;
SHashObj *hash;
TdThreadRwlock lock;
......@@ -52,19 +48,19 @@ typedef struct {
} SWrapperCfg;
typedef struct {
int32_t vgId;
int32_t vgVersion;
int32_t refCount;
int8_t dropped;
char *path;
SVnode *pImpl;
STaosQueue *pWriteQ;
STaosQueue *pSyncQ;
STaosQueue *pSyncCtrlQ;
STaosQueue *pApplyQ;
STaosQueue *pQueryQ;
STaosQueue *pStreamQ;
STaosQueue *pFetchQ;
int32_t vgId;
int32_t vgVersion;
int32_t refCount;
int8_t dropped;
char *path;
SVnode *pImpl;
SMultiWorker pWriteW;
SMultiWorker pSyncW;
SMultiWorker pSyncCtrlW;
SMultiWorker pApplyW;
STaosQueue *pQueryQ;
STaosQueue *pStreamQ;
STaosQueue *pFetchQ;
} SVnodeObj;
typedef struct {
......
......@@ -79,29 +79,49 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
char path[TSDB_FILENAME_LEN] = {0};
vnodePreClose(pVnode->pImpl);
taosThreadRwlockWrlock(&pMgmt->lock);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
taosThreadRwlockUnlock(&pMgmt->lock);
vmReleaseVnode(pMgmt, pVnode);
dTrace("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
dInfo("vgId:%d, pre close", pVnode->vgId);
vnodePreClose(pVnode->pImpl);
dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
while (pVnode->refCount > 0) taosMsleep(10);
dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId);
while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
pVnode->pWriteW.queue->threadId);
tMultiWorkerCleanup(&pVnode->pWriteW);
dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
pVnode->pSyncW.queue->threadId);
tMultiWorkerCleanup(&pVnode->pSyncW);
dInfo("vgId:%d, wait for vnode sync ctrl queue:%p is empty, thread:%08" PRId64, pVnode->vgId,
pVnode->pSyncCtrlW.queue, pVnode->pSyncCtrlW.queue->threadId);
tMultiWorkerCleanup(&pVnode->pSyncCtrlW);
dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
pVnode->pApplyW.queue->threadId);
tMultiWorkerCleanup(&pVnode->pApplyW);
dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
pVnode->pFetchQ->threadId);
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
dTrace("vgId:%d, vnode queue is empty", pVnode->vgId);
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
vmFreeQueue(pMgmt, pVnode);
vnodeClose(pVnode->pImpl);
pVnode->pImpl = NULL;
dDebug("vgId:%d, vnode is closed", pVnode->vgId);
dInfo("vgId:%d, vnode is closed", pVnode->vgId);
if (pVnode->dropped) {
dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
......
......@@ -188,30 +188,20 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
} else {
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pWriteQ, pMsg);
#if 0 // tests for batch writes
if (pMsg->msgType == TDMT_VND_CREATE_TABLE) {
SRpcMsg *pDup = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
memcpy(pDup, pMsg, sizeof(SRpcMsg));
pDup->pCont = rpcMallocCont(pMsg->contLen);
memcpy(pDup->pCont, pMsg->pCont, pMsg->contLen);
pDup->info.handle = NULL;
taosWriteQitem(pVnode->pWriteQ, pDup);
}
#endif
taosWriteQitem(pVnode->pWriteW.queue, pMsg);
}
break;
case SYNC_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pSyncQ, pMsg);
taosWriteQitem(pVnode->pSyncW.queue, pMsg);
break;
case SYNC_CTRL_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-sync-ctrl queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pSyncCtrlQ, pMsg);
taosWriteQitem(pVnode->pSyncCtrlW.queue, pMsg);
break;
case APPLY_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pApplyQ, pMsg);
taosWriteQitem(pVnode->pApplyW.queue, pMsg);
break;
default:
code = -1;
......@@ -276,13 +266,13 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
if (pVnode != NULL) {
switch (qtype) {
case WRITE_QUEUE:
size = taosQueueItemSize(pVnode->pWriteQ);
size = taosQueueItemSize(pVnode->pWriteW.queue);
break;
case SYNC_QUEUE:
size = taosQueueItemSize(pVnode->pSyncQ);
size = taosQueueItemSize(pVnode->pSyncW.queue);
break;
case APPLY_QUEUE:
size = taosQueueItemSize(pVnode->pApplyQ);
size = taosQueueItemSize(pVnode->pApplyW.queue);
break;
case QUERY_QUEUE:
size = taosQueueItemSize(pVnode->pQueryQ);
......@@ -306,40 +296,44 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
}
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg);
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
pVnode->pSyncCtrlQ = tWWorkerAllocQueue(&pMgmt->syncCtrlPool, pVnode, (FItems)vmProcessSyncQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg);
SMultiWorkerCfg wcfg = {.max = 1, .name = "vnode-write", .fp = (FItems)vnodeProposeWriteMsg, .param = pVnode->pImpl};
SMultiWorkerCfg scfg = {.max = 1, .name = "vnode-sync", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-ctrl", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
SMultiWorkerCfg acfg = {.max = 1, .name = "vnode-apply", .fp = (FItems)vnodeApplyWriteMsg, .param = pVnode->pImpl};
(void)tMultiWorkerInit(&pVnode->pWriteW, &wcfg);
(void)tMultiWorkerInit(&pVnode->pSyncW, &scfg);
(void)tMultiWorkerInit(&pVnode->pSyncCtrlW, &sccfg);
(void)tMultiWorkerInit(&pVnode->pApplyW, &acfg);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL ||
pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) {
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncCtrlW.queue == NULL ||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("vgId:%d, write-queue:%p is alloced", pVnode->vgId, pVnode->pWriteQ);
dDebug("vgId:%d, sync-queue:%p is alloced", pVnode->vgId, pVnode->pSyncQ);
dDebug("vgId:%d, apply-queue:%p is alloced", pVnode->vgId, pVnode->pApplyQ);
dDebug("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ);
dDebug("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
dDebug("vgId:%d, fetch-queue:%p is alloced", pVnode->vgId, pVnode->pFetchQ);
dInfo("vgId:%d, write-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
pVnode->pWriteW.queue->threadId);
dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
pVnode->pSyncW.queue->threadId);
dInfo("vgId:%d, sync-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncCtrlW.queue,
pVnode->pSyncCtrlW.queue->threadId);
dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
pVnode->pApplyW.queue->threadId);
dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ);
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
pVnode->pFetchQ->threadId);
dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
return 0;
}
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ);
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
tWWorkerFreeQueue(&pMgmt->syncCtrlPool, pVnode->pSyncCtrlQ);
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
pVnode->pWriteQ = NULL;
pVnode->pSyncQ = NULL;
pVnode->pApplyQ = NULL;
pVnode->pQueryQ = NULL;
pVnode->pStreamQ = NULL;
pVnode->pFetchQ = NULL;
......@@ -364,26 +358,6 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pFPool->max = tsNumOfVnodeFetchThreads;
if (tWWorkerInit(pFPool) != 0) return -1;
SWWorkerPool *pWPool = &pMgmt->writePool;
pWPool->name = "vnode-write";
pWPool->max = tsNumOfVnodeWriteThreads;
if (tWWorkerInit(pWPool) != 0) return -1;
SWWorkerPool *pAPool = &pMgmt->applyPool;
pAPool->name = "vnode-apply";
pAPool->max = tsNumOfVnodeWriteThreads;
if (tWWorkerInit(pAPool) != 0) return -1;
SWWorkerPool *pSPool = &pMgmt->syncPool;
pSPool->name = "vnode-sync";
pSPool->max = tsNumOfVnodeSyncThreads;
if (tWWorkerInit(pSPool) != 0) return -1;
SWWorkerPool *pSCPool = &pMgmt->syncCtrlPool;
pSCPool->name = "vnode-sync-ctrl";
pSCPool->max = tsNumOfVnodeSyncThreads;
if (tWWorkerInit(pSCPool) != 0) return -1;
SSingleWorkerCfg mgmtCfg = {
.min = 1,
.max = 1,
......@@ -398,10 +372,6 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
}
void vmStopWorker(SVnodeMgmt *pMgmt) {
tWWorkerCleanup(&pMgmt->writePool);
tWWorkerCleanup(&pMgmt->applyPool);
tWWorkerCleanup(&pMgmt->syncPool);
tWWorkerCleanup(&pMgmt->syncCtrlPool);
tQWorkerCleanup(&pMgmt->queryPool);
tQWorkerCleanup(&pMgmt->streamPool);
tWWorkerCleanup(&pMgmt->fetchPool);
......
......@@ -97,6 +97,7 @@ bool vnodeShouldRollback(SVnode* pVnode);
// vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncPreClose(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode);
......
......@@ -242,12 +242,7 @@ _err:
return NULL;
}
void vnodePreClose(SVnode *pVnode) {
if (pVnode) {
syncLeaderTransfer(pVnode->sync);
syncPreStop(pVnode->sync);
}
}
void vnodePreClose(SVnode *pVnode) { vnodeSyncPreClose(pVnode); }
void vnodeClose(SVnode *pVnode) {
if (pVnode) {
......
......@@ -342,52 +342,26 @@ static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, cons
TMSG_INFO(pMsg->msgType));
}
#define USE_TSDB_SNAPSHOT
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam;
int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
return code;
#else
*ppReader = taosMemoryMalloc(32);
return 0;
#endif
}
static int32_t vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapReaderClose(pReader);
return code;
#else
taosMemoryFree(pReader);
return 0;
#endif
}
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
return code;
#else
static int32_t times = 0;
if (times++ < 5) {
*len = 64;
*ppBuf = taosMemoryMalloc(*len);
snprintf(*ppBuf, *len, "snapshot block %d", times);
} else {
*len = 0;
*ppBuf = NULL;
}
return 0;
#endif
}
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam;
......@@ -404,14 +378,9 @@ static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void
int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
return code;
#else
*ppWriter = taosMemoryMalloc(32);
return 0;
#endif
}
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
......@@ -419,22 +388,14 @@ static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
return code;
#else
taosMemoryFree(pWriter);
return 0;
#endif
}
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len);
int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
vDebug("vgId:%d, continue write vnode snapshot finished, len:%d", pVnode->config.vgId, len);
return code;
#else
return 0;
#endif
}
static void vnodeRestoreFinish(const SSyncFSM *pFsm) {
......@@ -461,7 +422,6 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, become follower", pVnode->config.vgId);
// clear old leader resource
taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) {
pVnode->blocked = false;
......@@ -474,15 +434,6 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, become leader", pVnode->config.vgId);
#if 0
taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) {
pVnode->blocked = false;
tsem_post(&pVnode->syncSem);
}
taosThreadMutexUnlock(&pVnode->lock);
#endif
}
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
......@@ -543,12 +494,25 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
}
void vnodeSyncStart(SVnode *pVnode) {
vDebug("vgId:%d, start sync", pVnode->config.vgId);
vInfo("vgId:%d, start sync", pVnode->config.vgId);
syncStart(pVnode->sync);
}
void vnodeSyncPreClose(SVnode *pVnode) {
vInfo("vgId:%d, pre close sync", pVnode->config.vgId);
syncLeaderTransfer(pVnode->sync);
syncPreStop(pVnode->sync);
taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) {
vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
pVnode->blocked = false;
tsem_post(&pVnode->syncSem);
}
taosThreadMutexUnlock(&pVnode->lock);
}
void vnodeSyncClose(SVnode *pVnode) {
vDebug("vgId:%d, close sync", pVnode->config.vgId);
vInfo("vgId:%d, close sync", pVnode->config.vgId);
syncStop(pVnode->sync);
}
......
......@@ -21,46 +21,6 @@
int64_t tsRpcQueueMemoryAllowed = 0;
int64_t tsRpcQueueMemoryUsed = 0;
typedef struct STaosQnode STaosQnode;
typedef struct STaosQnode {
STaosQnode *next;
STaosQueue *queue;
int64_t timestamp;
int32_t size;
int8_t itype;
int8_t reserved[3];
char item[];
} STaosQnode;
typedef struct STaosQueue {
STaosQnode *head;
STaosQnode *tail;
STaosQueue *next; // for queue set
STaosQset *qset; // for queue set
void *ahandle; // for queue set
FItem itemFp;
FItems itemsFp;
TdThreadMutex mutex;
int64_t memOfItems;
int32_t numOfItems;
} STaosQueue;
typedef struct STaosQset {
STaosQueue *head;
STaosQueue *current;
TdThreadMutex mutex;
tsem_t sem;
int32_t numOfQueues;
int32_t numOfItems;
} STaosQset;
typedef struct STaosQall {
STaosQnode *current;
STaosQnode *start;
int32_t numOfItems;
} STaosQall;
STaosQueue *taosOpenQueue() {
STaosQueue *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
if (queue == NULL) {
......
......@@ -36,14 +36,13 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
worker->pool = pool;
}
uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
return 0;
}
void tQWorkerCleanup(SQWorkerPool *pool) {
for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i;
// if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) {
taosQsetThreadResume(pool->qset);
}
......@@ -51,7 +50,6 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i;
// if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) {
taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread);
......@@ -73,11 +71,13 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
taosBlockSIGPIPE();
setThreadName(pool->name);
uDebug("worker:%s:%d is running", pool->name, worker->id);
worker->pid = taosGetSelfPthreadId();
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
while (1) {
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
worker->pid);
break;
}
......@@ -124,7 +124,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
}
taosThreadMutexUnlock(&pool->mutex);
uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
return queue;
}
......@@ -191,12 +191,14 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
taosBlockSIGPIPE();
setThreadName(pool->name);
uDebug("worker:%s:%d is running", pool->name, worker->id);
worker->pid = taosGetSelfPthreadId();
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
while (1) {
numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
if (numOfMsgs == 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
worker->pid);
break;
}
......@@ -244,7 +246,9 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
pool->nextId = (pool->nextId + 1) % pool->max;
}
uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
while (worker->pid <= 0) taosMsleep(10);
queue->threadId = worker->pid;
uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, queue->threadId);
code = 0;
_OVER:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册