提交 4cb813a9 编写于 作者: S Shengliang Guan

fix thread conflict

上级 3544df41
......@@ -71,13 +71,6 @@ int32_t bndGetLoad(SBnode *pBnode, SBnodeLoad *pLoad);
*/
int32_t bndProcessWMsgs(SBnode *pBnode, SArray *pMsgs);
/**
* @brief Drop a bnode.
*
* @param path Path of the bnode.
*/
void bndDestroy(const char *path);
#ifdef __cplusplus
}
#endif
......
......@@ -25,5 +25,3 @@ void bndClose(SBnode *pBnode) { free(pBnode); }
int32_t bndGetLoad(SBnode *pBnode, SBnodeLoad *pLoad) { return 0; }
int32_t bndProcessWMsgs(SBnode *pBnode, SArray *pMsgs) { return 0; }
void bndDestroy(const char *path) { taosRemoveDir(path); }
......@@ -29,7 +29,7 @@ typedef struct SDnodeMgmt {
SEpSet mnodeEpSet;
SHashObj *dnodeHash;
SArray *dnodeEps;
pthread_t *threadId;
TdThread *threadId;
SRWLatch latch;
SDnodeWorker mgmtWorker;
SDnodeWorker statusWorker;
......
......@@ -30,7 +30,7 @@ static void *dmThreadRoutine(void *param) {
setThreadName("dnode-hb");
while (true) {
pthread_testcancel();
taosThreadTestCancel();
taosMsleep(200);
if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pDnode->dropped) {
continue;
......
......@@ -28,7 +28,7 @@ class TestServer {
private:
SDnode* pDnode;
TdThread* threadId;
TdThread threadId;
char path[PATH_MAX];
char fqdn[TSDB_FQDN_LEN];
char firstEp[TSDB_EP_LEN];
......
......@@ -41,11 +41,11 @@ bool TestServer::DoStart() {
return false;
}
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
pthread_create(&threadId, &thAttr, serverLoop, pDnode);
pthread_attr_destroy(&thAttr);
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
taosThreadCreate(&threadId, &thAttr, serverLoop, pDnode);
taosThreadAttrDestroy(&thAttr);
taosMsleep(2100);
return true;
}
......@@ -69,7 +69,7 @@ bool TestServer::Start(const char* path, const char* fqdn, uint16_t port, const
void TestServer::Stop() {
dndHandleEvent(pDnode, DND_EVENT_STOP);
pthread_join(threadId, NULL);
taosThreadJoin(threadId, NULL);
if (pDnode != NULL) {
dndClose(pDnode);
......
......@@ -71,7 +71,7 @@ typedef struct {
int32_t opened;
int32_t failed;
int32_t threadIndex;
pthread_t thread;
TdThread thread;
SVnodesMgmt *pMgmt;
SWrapperCfg *pCfgs;
} SVnodeThread;
......
......@@ -193,20 +193,20 @@ static int32_t vmOpenVnodes(SVnodesMgmt *pMgmt) {
SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum == 0) continue;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pThread->thread, &thAttr, vmOpenVnodeFunc, pThread) != 0) {
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeFunc, pThread) != 0) {
dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
}
pthread_attr_destroy(&thAttr);
taosThreadAttrDestroy(&thAttr);
}
for (int32_t t = 0; t < threadNum; ++t) {
SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
pthread_join(pThread->thread, NULL);
taosThreadJoin(pThread->thread, NULL);
}
free(pThread->pCfgs);
}
......
......@@ -41,16 +41,16 @@ typedef struct SProcQueue {
ProcConsumeFp consumeFp;
void *pParent;
tsem_t sem;
pthread_mutex_t *mutex;
TdThreadMutex *mutex;
int32_t mutexShmid;
int32_t bufferShmid;
const char *name;
} SProcQueue;
typedef struct SProcObj {
pthread_t childThread;
TdThread childThread;
SProcQueue *pChildQueue;
pthread_t parentThread;
TdThread parentThread;
SProcQueue *pParentQueue;
const char *name;
int32_t pid;
......@@ -59,11 +59,11 @@ typedef struct SProcObj {
bool testFlag;
} SProcObj;
static int32_t taosProcInitMutex(pthread_mutex_t **ppMutex, int32_t *pShmid) {
pthread_mutex_t *pMutex = NULL;
pthread_mutexattr_t mattr = {0};
int32_t shmid = -1;
int32_t code = -1;
static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
TdThreadMutex *pMutex = NULL;
TdThreadMutexAttr mattr = {0};
int32_t shmid = -1;
int32_t code = -1;
if (pthread_mutexattr_init(&mattr) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -77,21 +77,21 @@ static int32_t taosProcInitMutex(pthread_mutex_t **ppMutex, int32_t *pShmid) {
goto _OVER;
}
shmid = shmget(IPC_PRIVATE, sizeof(pthread_mutex_t), 0600);
shmid = shmget(IPC_PRIVATE, sizeof(TdThreadMutex), 0600);
if (shmid <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex while shmget since %s", terrstr());
goto _OVER;
}
pMutex = (pthread_mutex_t *)shmat(shmid, NULL, 0);
pMutex = (TdThreadMutex *)shmat(shmid, NULL, 0);
if (pMutex == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex while shmat since %s", terrstr());
goto _OVER;
}
if (pthread_mutex_init(pMutex, &mattr) != 0) {
if (taosThreadMutexInit(pMutex, &mattr) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex since %s", terrstr());
goto _OVER;
......@@ -101,7 +101,7 @@ static int32_t taosProcInitMutex(pthread_mutex_t **ppMutex, int32_t *pShmid) {
_OVER:
if (code != 0) {
pthread_mutex_destroy(pMutex);
taosThreadMutexDestroy(pMutex);
shmctl(shmid, IPC_RMID, NULL);
} else {
*ppMutex = pMutex;
......@@ -112,9 +112,9 @@ _OVER:
return code;
}
static void taosProcDestroyMutex(pthread_mutex_t *pMutex, int32_t *pShmid) {
static void taosProcDestroyMutex(TdThreadMutex *pMutex, int32_t *pShmid) {
if (pMutex != NULL) {
pthread_mutex_destroy(pMutex);
taosThreadMutexDestroy(pMutex);
}
if (*pShmid > 0) {
shmctl(*pShmid, IPC_RMID, NULL);
......@@ -129,7 +129,7 @@ static int32_t taosProcInitBuffer(void **ppBuffer, int32_t size) {
return -1;
}
void *shmptr = (pthread_mutex_t *)shmat(shmid, NULL, 0);
void *shmptr = shmat(shmid, NULL, 0);
if (shmptr == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init buffer while shmat since %s", terrstr());
......@@ -204,9 +204,9 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8;
pthread_mutex_lock(pQueue->mutex);
taosThreadMutexLock(pQueue->mutex);
if (fullLen > pQueue->avail) {
pthread_mutex_unlock(pQueue->mutex);
taosThreadMutexUnlock(pQueue->mutex);
terrno = TSDB_CODE_OUT_OF_SHM_MEM;
return -1;
}
......@@ -252,7 +252,7 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
pQueue->avail -= fullLen;
pQueue->items++;
pthread_mutex_unlock(pQueue->mutex);
taosThreadMutexUnlock(pQueue->mutex);
tsem_post(&pQueue->sem);
uTrace("proc:%s, push msg:%p:%d cont:%p:%d to queue:%p", pQueue->name, pHead, rawHeadLen, pBody, rawBodyLen, pQueue);
......@@ -263,9 +263,9 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea
int32_t *pBodyLen) {
tsem_wait(&pQueue->sem);
pthread_mutex_lock(pQueue->mutex);
taosThreadMutexLock(pQueue->mutex);
if (pQueue->total - pQueue->avail <= 0) {
pthread_mutex_unlock(pQueue->mutex);
taosThreadMutexUnlock(pQueue->mutex);
tsem_post(&pQueue->sem);
terrno = TSDB_CODE_OUT_OF_SHM_MEM;
return -1;
......@@ -284,7 +284,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea
void *pHead = (*pQueue->mallocHeadFp)(headLen);
void *pBody = (*pQueue->mallocBodyFp)(bodyLen);
if (pHead == NULL || pBody == NULL) {
pthread_mutex_unlock(pQueue->mutex);
taosThreadMutexUnlock(pQueue->mutex);
tsem_post(&pQueue->sem);
(*pQueue->freeHeadFp)(pHead);
(*pQueue->freeBodyFp)(pBody);
......@@ -325,7 +325,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea
pQueue->avail = pQueue->avail + headLen + bodyLen + 8;
pQueue->items--;
pthread_mutex_unlock(pQueue->mutex);
taosThreadMutexUnlock(pQueue->mutex);
*ppHead = pHead;
*ppBody = pBody;
......@@ -409,12 +409,12 @@ static void taosProcThreadLoop(SProcQueue *pQueue) {
}
int32_t taosProcRun(SProcObj *pProc) {
pthread_attr_t thAttr = {0};
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pProc->isChild || pProc->testFlag) {
if (pthread_create(&pProc->childThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pChildQueue) != 0) {
if (taosThreadCreate(&pProc->childThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pChildQueue) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create thread since %s", terrstr());
return -1;
......@@ -423,7 +423,7 @@ int32_t taosProcRun(SProcObj *pProc) {
}
if (!pProc->isChild || pProc->testFlag) {
if (pthread_create(&pProc->parentThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pParentQueue) != 0) {
if (taosThreadCreate(&pProc->parentThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pParentQueue) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create thread since %s", terrstr());
return -1;
......
......@@ -27,6 +27,7 @@ class UtilTestQueue : public ::testing::Test {
static void TearDownTestSuite() {}
};
#if 0
TEST_F(UtilTestQueue, 01_fork) {
pid_t pid;
int shmid;
......@@ -84,7 +85,7 @@ TEST_F(UtilTestQueue, 01_fork) {
perror("fork error");
exit(1);
} else if (pid == 0) {
if ((err = pthread_mutex_lock(m)) < 0) {
if ((err = taosThreadMutexLock(m)) < 0) {
printf("lock error:%s\n", strerror(err));
exit(1);
}
......@@ -93,14 +94,14 @@ TEST_F(UtilTestQueue, 01_fork) {
(*shmptr2)++;
}
if ((err = pthread_mutex_unlock(m)) < 0) {
if ((err = taosThreadMutexUnlock(m)) < 0) {
printf("unlock error:%s\n", strerror(err));
exit(1);
}
exit(0);
} else {
if ((err = pthread_mutex_lock(m)) < 0) {
if ((err = taosThreadMutexLock(m)) < 0) {
printf("lock error:%s\n", strerror(err));
exit(1);
}
......@@ -108,7 +109,7 @@ TEST_F(UtilTestQueue, 01_fork) {
**shmptr2 = i;
(*shmptr2)++;
}
if ((err = pthread_mutex_unlock(m)) < 0) {
if ((err = taosThreadMutexUnlock(m)) < 0) {
printf("unlock error:%s\n", strerror(err));
exit(1);
}
......@@ -122,9 +123,11 @@ TEST_F(UtilTestQueue, 01_fork) {
printf("\n");
pthread_mutexattr_destroy(&mattr);
taosThreadAttrDestroy(&mattr);
//销毁mutex
pthread_mutex_destroy(m);
exit(0);
}
\ No newline at end of file
}
#endif
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册