提交 6b69b1a7 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/2.0tsdb

......@@ -39,6 +39,15 @@
#define MPEER_CONTENT_LEN 2000
typedef struct {
pthread_t thread;
int32_t threadIndex;
int32_t failed;
int32_t opened;
int32_t vnodeNum;
int32_t * vnodeList;
} SOpenVnodeThread;
void * tsDnodeTmr = NULL;
static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime;
......@@ -242,28 +251,85 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
return TSDB_CODE_SUCCESS;
}
static int32_t dnodeOpenVnodes() {
static void *dnodeOpenVnode(void *param) {
SOpenVnodeThread *pThread = param;
char vnodeDir[TSDB_FILENAME_LEN * 3];
int32_t failed = 0;
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES);
int32_t numOfVnodes;
int32_t status;
status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
int32_t vgId = pThread->vnodeList[v];
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vgId);
if (vnodeOpen(vgId, vnodeDir) < 0) {
dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
pThread->failed++;
} else {
dDebug("vgId:%d, is openned by thread:%d", vgId, pThread->threadIndex);
pThread->opened++;
}
}
dDebug("thread:%d, total vnodes:%d, openned:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
pThread->failed);
return NULL;
}
static int32_t dnodeOpenVnodes() {
int32_t *vnodeList = calloc(TSDB_MAX_VNODES, sizeof(int32_t));
int32_t numOfVnodes;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("Get dnode list failed");
dInfo("get dnode list failed");
free(vnodeList);
return status;
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]);
if (vnodeOpen(vnodeList[i], vnodeDir) < 0) failed++;
int32_t threadNum = tsNumOfCores;
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread));
for (int32_t t = 0; t < threadNum; ++t) {
threads[t].threadIndex = t;
threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t));
}
for (int32_t v = 0; v < numOfVnodes; ++v) {
int32_t t = v % threadNum;
SOpenVnodeThread *pThread = &threads[t];
pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
}
dDebug("start %d threads to open %d vnodes", threadNum, numOfVnodes);
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum == 0) continue;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnode, pThread) != 0) {
dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
}
pthread_attr_destroy(&thAttr);
}
int32_t openVnodes = 0;
int32_t failedVnodes = 0;
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && pThread->thread) {
pthread_join(pThread->thread, NULL);
}
openVnodes += pThread->opened;
failedVnodes += pThread->failed;
free(pThread->vnodeList);
}
free(vnodeList);
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, numOfVnodes-failed, failed);
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);
return TSDB_CODE_SUCCESS;
}
......@@ -273,7 +339,7 @@ void dnodeStartStream() {
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("Get dnode list failed");
dInfo("get dnode list failed");
return;
}
......@@ -292,7 +358,7 @@ static void dnodeCloseVnodes() {
status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("Get dnode list failed");
dInfo("get dnode list failed");
free(vnodeList);
return;
}
......
......@@ -36,6 +36,7 @@ typedef struct {
int32_t min; // min number of workers
int32_t num; // current number of workers
SReadWorker *readWorker;
pthread_mutex_t mutex;
} SReadWorkerPool;
static void *dnodeProcessReadQueue(void *param);
......@@ -51,27 +52,28 @@ int32_t dnodeInitVnodeRead() {
readPool.min = 2;
readPool.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min;
readPool.readWorker = (SReadWorker *) calloc(sizeof(SReadWorker), readPool.max);
readPool.readWorker = (SReadWorker *)calloc(sizeof(SReadWorker), readPool.max);
pthread_mutex_init(&readPool.mutex, NULL);
if (readPool.readWorker == NULL) return -1;
for (int i=0; i < readPool.max; ++i) {
for (int i = 0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i;
pWorker->workerId = i;
}
dInfo("dnode read is opened");
dInfo("dnode read is opened, min worker:%d max worker:%d", readPool.min, readPool.max);
return 0;
}
void dnodeCleanupVnodeRead() {
for (int i=0; i < readPool.max; ++i) {
for (int i = 0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(readQset);
}
}
for (int i=0; i < readPool.max; ++i) {
for (int i = 0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
......@@ -80,6 +82,7 @@ void dnodeCleanupVnodeRead() {
free(readPool.readWorker);
taosCloseQset(readQset);
pthread_mutex_destroy(&readPool.mutex);
dInfo("dnode read is closed");
}
......@@ -136,8 +139,12 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
}
void *dnodeAllocateVnodeRqueue(void *pVnode) {
pthread_mutex_lock(&readPool.mutex);
taos_queue queue = taosOpenQueue();
if (queue == NULL) return NULL;
if (queue == NULL) {
pthread_mutex_unlock(&readPool.mutex);
return NULL;
}
taosAddIntoQset(readQset, queue, pVnode);
......@@ -160,6 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) {
} while (readPool.num < readPool.min);
}
pthread_mutex_unlock(&readPool.mutex);
dDebug("pVnode:%p, read queue:%p is allocated", pVnode, queue);
return queue;
......
......@@ -47,6 +47,7 @@ typedef struct {
int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *writeWorker;
pthread_mutex_t mutex;
} SWriteWorkerPool;
static void *dnodeProcessWriteQueue(void *param);
......@@ -58,25 +59,26 @@ int32_t dnodeInitVnodeWrite() {
wWorkerPool.max = tsNumOfCores;
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
if (wWorkerPool.writeWorker == NULL) return -1;
pthread_mutex_init(&wWorkerPool.mutex, NULL);
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
wWorkerPool.writeWorker[i].workerId = i;
}
dInfo("dnode write is opened");
dInfo("dnode write is opened, max worker %d", wWorkerPool.max);
return 0;
}
void dnodeCleanupVnodeWrite() {
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(pWorker->qset);
}
}
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
taosFreeQall(pWorker->qall);
......@@ -84,6 +86,7 @@ void dnodeCleanupVnodeWrite() {
}
}
pthread_mutex_destroy(&wWorkerPool.mutex);
free(wWorkerPool.writeWorker);
dInfo("dnode write is closed");
}
......@@ -124,14 +127,19 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
}
void *dnodeAllocateVnodeWqueue(void *pVnode) {
pthread_mutex_lock(&wWorkerPool.mutex);
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
void *queue = taosOpenQueue();
if (queue == NULL) return NULL;
if (queue == NULL) {
pthread_mutex_unlock(&wWorkerPool.mutex);
return NULL;
}
if (pWorker->qset == NULL) {
pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) {
taosCloseQueue(queue);
pthread_mutex_unlock(&wWorkerPool.mutex);
return NULL;
}
......@@ -140,6 +148,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
if (pWorker->qall == NULL) {
taosCloseQset(pWorker->qset);
taosCloseQueue(queue);
pthread_mutex_unlock(&wWorkerPool.mutex);
return NULL;
}
pthread_attr_t thAttr;
......@@ -163,6 +172,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
}
pthread_mutex_unlock(&wWorkerPool.mutex);
dDebug("pVnode:%p, write queue:%p is allocated", pVnode, queue);
return queue;
......@@ -201,6 +211,8 @@ static void *dnodeProcessWriteQueue(void *param) {
int type;
void *pVnode, *item;
dDebug("write worker:%d is running", pWorker->workerId);
while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs == 0) {
......
......@@ -287,7 +287,7 @@ void sdbUpdateSync() {
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
if (pDnode != NULL) {
syncCfg.nodeInfo[index].nodePort = pDnode->dnodePort + TSDB_PORT_SYNC;
strcpy(syncCfg.nodeInfo[index].nodeFqdn, pDnode->dnodeEp);
tstrncpy(syncCfg.nodeInfo[index].nodeFqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN);
index++;
}
......@@ -976,11 +976,11 @@ static void *sdbWorkerFp(void *param) {
tstrerror(pOper->retCode));
}
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
if (pOper != NULL) {
sdbDecRef(pOper->table, pOper->pObj);
}
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
}
taosFreeQitem(item);
}
......
......@@ -1254,13 +1254,13 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
char * pWrite;
int32_t cols = 0;
SSuperTableObj *pTable = NULL;
char prefix[20] = {0};
char prefix[64] = {0};
int32_t prefixLen;
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0;
strcpy(prefix, pDb->name);
tstrncpy(prefix, pDb->name, 64);
strcat(prefix, TS_PATH_DELIMITER);
prefixLen = strlen(prefix);
......@@ -1558,10 +1558,10 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
if (pTable != NULL) {
mDebug("app:%p:%p, table:%s, create table in id:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code));
}
assert(pTable);
mDebug("app:%p:%p, table:%s, create table in id:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code));
if (code != TSDB_CODE_SUCCESS) return code;
......@@ -2370,7 +2370,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char prefix[64] = {0};
strcpy(prefix, pDb->name);
tstrncpy(prefix, pDb->name, 64);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = strlen(prefix);
......@@ -2560,7 +2560,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char prefix[64] = {0};
strcpy(prefix, pDb->name);
tstrncpy(prefix, pDb->name, 64);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = strlen(prefix);
......
......@@ -60,7 +60,7 @@ int64_t user_mktime64(const unsigned int year0, const unsigned int mon0,
// year*365 - 719499)*24 + hour)*60 + min)*60 + sec);
int64_t res;
res = 367*((int64_t)mon)/12;
res += year/4 - year/100 + year/400 + day + year*365 - 719499;
res += year/4 - year/100 + year/400 + day + ((int64_t)year)*365 - 719499;
res = res*24;
res = ((res + hour) * 60 + min) * 60 + sec;
......
......@@ -601,10 +601,11 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
content = calloc(1, maxLen + 1);
if (content == NULL) goto PARSE_OVER;
int len = fread(content, 1, maxLen, fp);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId);
free(content);
fclose(fp);
return errno;
}
......@@ -649,7 +650,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
}
pVnode->tsdbCfg.maxTables = maxTables->valueint;
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, daysPerFile not found", pVnode->vgId);
goto PARSE_OVER;
......
......@@ -112,7 +112,7 @@ echo "numOfLogLines 100000000" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册