提交 8a722f72 编写于 作者: S Shengliang Guan

[TD-777] add lock while open vnodes

上级 03aa64e9
......@@ -39,6 +39,15 @@
#define MPEER_CONTENT_LEN 2000
typedef struct {
pthread_t thread;
int32_t threadIndex;
int32_t failed;
int32_t opened;
int32_t vnodeNum;
int32_t * vnodeList;
} SOpenVnodeThread;
void * tsDnodeTmr = NULL;
static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime;
......@@ -242,28 +251,85 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
return TSDB_CODE_SUCCESS;
}
static int32_t dnodeOpenVnodes() {
static void *dnodeOpenVnode(void *param) {
SOpenVnodeThread *pThread = param;
char vnodeDir[TSDB_FILENAME_LEN * 3];
int32_t failed = 0;
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES);
int32_t numOfVnodes;
int32_t status;
status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
int32_t vgId = pThread->vnodeList[v];
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vgId);
if (vnodeOpen(vgId, vnodeDir) < 0) {
dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
pThread->failed++;
} else {
dDebug("vgId:%d, is openned by thread:%d", vgId, pThread->threadIndex);
pThread->opened++;
}
}
dDebug("thread:%d, total vnodes:%d, openned:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
pThread->failed);
return NULL;
}
static int32_t dnodeOpenVnodes() {
int32_t *vnodeList = calloc(TSDB_MAX_VNODES, sizeof(int32_t));
int32_t numOfVnodes;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("Get dnode list failed");
dInfo("get dnode list failed");
free(vnodeList);
return status;
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]);
if (vnodeOpen(vnodeList[i], vnodeDir) < 0) failed++;
int32_t threadNum = tsNumOfCores;
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread));
for (int32_t t = 0; t < threadNum; ++t) {
threads[t].threadIndex = t;
threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t));
}
for (int32_t v = 0; v < numOfVnodes; ++v) {
int32_t t = v % threadNum;
SOpenVnodeThread *pThread = &threads[t];
pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
}
dDebug("start %d threads to open %d vnodes", threadNum, numOfVnodes);
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum == 0) continue;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnode, pThread) != 0) {
dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
}
pthread_attr_destroy(&thAttr);
}
int32_t openVnodes = 0;
int32_t failedVnodes = 0;
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && pThread->thread) {
pthread_join(pThread->thread, NULL);
}
openVnodes += pThread->opened;
failedVnodes += pThread->failed;
free(pThread->vnodeList);
}
free(vnodeList);
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, numOfVnodes-failed, failed);
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);
return TSDB_CODE_SUCCESS;
}
......@@ -273,7 +339,7 @@ void dnodeStartStream() {
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("Get dnode list failed");
dInfo("get dnode list failed");
return;
}
......@@ -292,7 +358,7 @@ static void dnodeCloseVnodes() {
status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("Get dnode list failed");
dInfo("get dnode list failed");
free(vnodeList);
return;
}
......
......@@ -36,6 +36,7 @@ typedef struct {
int32_t min; // min number of workers
int32_t num; // current number of workers
SReadWorker *readWorker;
pthread_mutex_t mutex;
} SReadWorkerPool;
static void *dnodeProcessReadQueue(void *param);
......@@ -51,27 +52,28 @@ int32_t dnodeInitVnodeRead() {
readPool.min = 2;
readPool.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min;
readPool.readWorker = (SReadWorker *) calloc(sizeof(SReadWorker), readPool.max);
readPool.readWorker = (SReadWorker *)calloc(sizeof(SReadWorker), readPool.max);
pthread_mutex_init(&readPool.mutex, NULL);
if (readPool.readWorker == NULL) return -1;
for (int i=0; i < readPool.max; ++i) {
for (int i = 0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i;
pWorker->workerId = i;
}
dInfo("dnode read is opened");
dInfo("dnode read is opened, min worker:%d max worker:%d", readPool.min, readPool.max);
return 0;
}
void dnodeCleanupVnodeRead() {
for (int i=0; i < readPool.max; ++i) {
for (int i = 0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(readQset);
}
}
for (int i=0; i < readPool.max; ++i) {
for (int i = 0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
......@@ -80,6 +82,7 @@ void dnodeCleanupVnodeRead() {
free(readPool.readWorker);
taosCloseQset(readQset);
pthread_mutex_destroy(&readPool.mutex);
dInfo("dnode read is closed");
}
......@@ -136,8 +139,12 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
}
void *dnodeAllocateVnodeRqueue(void *pVnode) {
pthread_mutex_lock(&readPool.mutex);
taos_queue queue = taosOpenQueue();
if (queue == NULL) return NULL;
if (queue == NULL) {
pthread_mutex_unlock(&readPool.mutex);
return NULL;
}
taosAddIntoQset(readQset, queue, pVnode);
......@@ -160,6 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) {
} while (readPool.num < readPool.min);
}
pthread_mutex_unlock(&readPool.mutex);
dDebug("pVnode:%p, read queue:%p is allocated", pVnode, queue);
return queue;
......
......@@ -47,6 +47,7 @@ typedef struct {
int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *writeWorker;
pthread_mutex_t mutex;
} SWriteWorkerPool;
static void *dnodeProcessWriteQueue(void *param);
......@@ -58,25 +59,26 @@ int32_t dnodeInitVnodeWrite() {
wWorkerPool.max = tsNumOfCores;
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
if (wWorkerPool.writeWorker == NULL) return -1;
pthread_mutex_init(&wWorkerPool.mutex, NULL);
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
wWorkerPool.writeWorker[i].workerId = i;
}
dInfo("dnode write is opened");
dInfo("dnode write is opened, max worker %d", wWorkerPool.max);
return 0;
}
void dnodeCleanupVnodeWrite() {
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(pWorker->qset);
}
}
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
taosFreeQall(pWorker->qall);
......@@ -84,6 +86,7 @@ void dnodeCleanupVnodeWrite() {
}
}
pthread_mutex_destroy(&wWorkerPool.mutex);
free(wWorkerPool.writeWorker);
dInfo("dnode write is closed");
}
......@@ -124,14 +127,19 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
}
void *dnodeAllocateVnodeWqueue(void *pVnode) {
pthread_mutex_lock(&wWorkerPool.mutex);
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
void *queue = taosOpenQueue();
if (queue == NULL) return NULL;
if (queue == NULL) {
pthread_mutex_unlock(&wWorkerPool.mutex);
return NULL;
}
if (pWorker->qset == NULL) {
pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) {
taosCloseQueue(queue);
pthread_mutex_unlock(&wWorkerPool.mutex);
return NULL;
}
......@@ -140,6 +148,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
if (pWorker->qall == NULL) {
taosCloseQset(pWorker->qset);
taosCloseQueue(queue);
pthread_mutex_unlock(&wWorkerPool.mutex);
return NULL;
}
pthread_attr_t thAttr;
......@@ -163,6 +172,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
}
pthread_mutex_unlock(&wWorkerPool.mutex);
dDebug("pVnode:%p, write queue:%p is allocated", pVnode, queue);
return queue;
......@@ -201,6 +211,8 @@ static void *dnodeProcessWriteQueue(void *param) {
int type;
void *pVnode, *item;
dDebug("write worker:%d is running", pWorker->workerId);
while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs == 0) {
......
......@@ -112,7 +112,7 @@ echo "numOfLogLines 100000000" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册