From 41d97eeea7257ce966d5e9c0a69e19b52f1afcd2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 9 Aug 2020 15:21:03 +0000 Subject: [PATCH] TD-1057 --- src/client/src/tscSql.c | 14 +++++++------- src/client/src/tscSub.c | 8 ++++---- src/client/src/tscSubquery.c | 4 ++-- src/client/src/tscUtil.c | 2 +- src/dnode/src/dnodeSystem.c | 8 ++++---- src/dnode/src/dnodeTelemetry.c | 8 ++++---- src/kit/taosdemo/taosdemo.c | 30 +++++++++++++++--------------- src/query/inc/qExecutor.h | 2 +- src/rpc/test/rclient.c | 10 +++++----- src/rpc/test/rsclient.c | 6 +++--- src/sync/src/tarbitrator.c | 8 ++++---- src/sync/test/syncClient.c | 10 +++++----- 12 files changed, 55 insertions(+), 55 deletions(-) diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 6b3653ff63..7bb9be5d5c 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -141,7 +141,7 @@ static void syncConnCallback(void *param, TAOS_RES *tres, int code) { SSqlObj *pSql = (SSqlObj *) tres; assert(pSql != NULL); - sem_post(&pSql->rspSem); + tsem_post(&pSql->rspSem); } TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) { @@ -156,7 +156,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha pSql->param = pSql; tscProcessSql(pSql); - sem_wait(&pSql->rspSem); + tsem_wait(&pSql->rspSem); if (pSql->res.code != TSDB_CODE_SUCCESS) { terrno = pSql->res.code; @@ -225,12 +225,12 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { assert(tres != NULL); SSqlObj *pSql = (SSqlObj *) tres; - sem_post(&pSql->rspSem); + tsem_post(&pSql->rspSem); } static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) { SSqlObj* pSql = (SSqlObj*) tres; - sem_post(&pSql->rspSem); + tsem_post(&pSql->rspSem); } TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) { @@ -439,7 +439,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { pCmd->command == TSDB_SQL_CLI_VERSION || pCmd->command == TSDB_SQL_CURRENT_USER )) { taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj); - sem_wait(&pSql->rspSem); + tsem_wait(&pSql->rspSem); } return doSetResultRowData(pSql, true); @@ -729,7 +729,7 @@ static void asyncCallback(void *param, TAOS_RES *tres, int code) { assert(param != NULL); SSqlObj *pSql = ((SSqlObj *)param); pSql->res.code = code; - sem_post(&pSql->rspSem); + tsem_post(&pSql->rspSem); } int taos_validate_sql(TAOS *taos, const char *sql) { @@ -780,7 +780,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { pSql->param = pSql; int code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - sem_wait(&pSql->rspSem); + tsem_wait(&pSql->rspSem); code = pSql->res.code; } if (code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index e9f2c1dc1d..608551c7f3 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -33,7 +33,7 @@ typedef struct SSubscriptionProgress { typedef struct SSub { void * signature; char topic[32]; - sem_t sem; + tsem_t sem; int64_t lastSyncTime; int64_t lastConsumeTime; TAOS * taos; @@ -85,7 +85,7 @@ static void asyncCallback(void *param, TAOS_RES *tres, int code) { assert(param != NULL); SSub *pSub = ((SSub *)param); pSub->pSql->res.code = code; - sem_post(&pSub->sem); + tsem_post(&pSub->sem); } @@ -154,7 +154,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* code = tsParseSql(pSql, false); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - sem_wait(&pSub->sem); + tsem_wait(&pSub->sem); code = pSql->res.code; } if (code != TSDB_CODE_SUCCESS) { @@ -451,7 +451,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { pSql->fetchFp = asyncCallback; pSql->param = pSub; tscDoQuery(pSql); - sem_wait(&pSub->sem); + tsem_wait(&pSub->sem); if (pRes->code != TSDB_CODE_SUCCESS) { continue; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 8a596d8893..4e188d4fb6 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2057,7 +2057,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { } doBuildResFromSubqueries(pSql); - sem_post(&pSql->rspSem); + tsem_post(&pSql->rspSem); return; @@ -2083,7 +2083,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { // free(pState); // // pRes->completed = true; // set query completed -// sem_post(&pSql->rspSem); +// tsem_post(&pSql->rspSem); // return; // } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 1b6d18be0c..582411fc0c 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -387,7 +387,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { pCmd->allocSize = 0; taosTFree(pSql->sqlstr); - sem_destroy(&pSql->rspSem); + tsem_destroy(&pSql->rspSem); free(pSql); } diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 2519684878..543e1c9639 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -22,7 +22,7 @@ #include "dnodeMain.h" static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context); -static sem_t exitSem; +static tsem_t exitSem; int32_t main(int32_t argc, char *argv[]) { // Set global configuration file @@ -88,7 +88,7 @@ int32_t main(int32_t argc, char *argv[]) { #endif } - if (sem_init(&exitSem, 0, 0) != 0) { + if (tsem_init(&exitSem, 0, 0) != 0) { printf("failed to create exit semphore\n"); exit(EXIT_FAILURE); } @@ -117,7 +117,7 @@ int32_t main(int32_t argc, char *argv[]) { syslog(LOG_INFO, "Started TDengine service successfully."); - for (int res = sem_wait(&exitSem); res != 0; res = sem_wait(&exitSem)) { + for (int res = tsem_wait(&exitSem); res != 0; res = tsem_wait(&exitSem)) { if (res != EINTR) { syslog(LOG_ERR, "failed to wait exit semphore: %d", res); break; @@ -157,5 +157,5 @@ static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) { sigaction(SIGUSR2, &act, NULL); // inform main thread to exit - sem_post(&exitSem); + tsem_post(&exitSem); } diff --git a/src/dnode/src/dnodeTelemetry.c b/src/dnode/src/dnodeTelemetry.c index 892fd1d903..8ed4a9518b 100644 --- a/src/dnode/src/dnodeTelemetry.c +++ b/src/dnode/src/dnodeTelemetry.c @@ -36,7 +36,7 @@ #include "dnodeInt.h" #include "dnodeTelemetry.h" -static sem_t tsExitSem; +static tsem_t tsExitSem; static pthread_t tsTelemetryThread; #define TELEMETRY_SERVER "telemetry.taosdata.com" @@ -266,7 +266,7 @@ int32_t dnodeInitTelemetry() { return 0; } - if (sem_init(&tsExitSem, 0, 0) == -1) { + if (tsem_init(&tsExitSem, 0, 0) == -1) { // just log the error, it is ok for telemetry to fail dTrace("failed to create semaphore for telemetry, reason:%s", strerror(errno)); return 0; @@ -291,8 +291,8 @@ void dnodeCleanupTelemetry() { } if (tsTelemetryThread) { - sem_post(&tsExitSem); + tsem_post(&tsExitSem); pthread_join(tsTelemetryThread, NULL); - sem_destroy(&tsExitSem); + tsem_destroy(&tsExitSem); } } diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 9d46ac5055..859e22a178 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -440,9 +440,9 @@ typedef struct { char* cols; bool use_metric; - sem_t mutex_sem; + tsem_t mutex_sem; int notFinished; - sem_t lock_sem; + tsem_t lock_sem; } info; typedef struct { @@ -459,9 +459,9 @@ typedef struct { int data_of_order; int data_of_rate; - sem_t *mutex_sem; - int *notFinished; - sem_t *lock_sem; + tsem_t *mutex_sem; + int *notFinished; + tsem_t *lock_sem; } sTable; /* ******************************* Global @@ -729,9 +729,9 @@ int main(int argc, char *argv[]) { t_info->end_table_id = i < b ? last + a : last + a - 1; last = t_info->end_table_id + 1; - sem_init(&(t_info->mutex_sem), 0, 1); + tsem_init(&(t_info->mutex_sem), 0, 1); t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1; - sem_init(&(t_info->lock_sem), 0, 0); + tsem_init(&(t_info->lock_sem), 0, 0); if (query_mode == SYNC) { pthread_create(pids + i, NULL, syncWrite, t_info); @@ -762,8 +762,8 @@ int main(int argc, char *argv[]) { for (int i = 0; i < threads; i++) { info *t_info = infos + i; taos_close(t_info->taos); - sem_destroy(&(t_info->mutex_sem)); - sem_destroy(&(t_info->lock_sem)); + tsem_destroy(&(t_info->mutex_sem)); + tsem_destroy(&(t_info->lock_sem)); } free(pids); @@ -1021,8 +1021,8 @@ void multiThreadCreateTable(char* cols, bool use_metric, int threads, int ntable for (int i = 0; i < threads; i++) { info *t_info = infos + i; - sem_destroy(&(t_info->mutex_sem)); - sem_destroy(&(t_info->lock_sem)); + tsem_destroy(&(t_info->mutex_sem)); + tsem_destroy(&(t_info->lock_sem)); } free(pids); @@ -1272,7 +1272,7 @@ void *asyncWrite(void *sarg) { taos_query_a(winfo->taos, "show databases", callBack, tb_info); } - sem_wait(&(winfo->lock_sem)); + tsem_wait(&(winfo->lock_sem)); free(tb_infos); return NULL; @@ -1292,10 +1292,10 @@ void callBack(void *param, TAOS_RES *res, int code) { // If finished; if (tb_info->counter >= tb_info->target) { - sem_wait(tb_info->mutex_sem); + tsem_wait(tb_info->mutex_sem); (*(tb_info->notFinished))--; - if (*(tb_info->notFinished) == 0) sem_post(tb_info->lock_sem); - sem_post(tb_info->mutex_sem); + if (*(tb_info->notFinished) == 0) tsem_post(tb_info->lock_sem); + tsem_post(tb_info->mutex_sem); return; } diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index bd2e0a4470..9757036783 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -186,7 +186,7 @@ typedef struct SQInfo { void* signature; int32_t pointsInterpo; int32_t code; // error code to returned to client -// sem_t dataReady; +//tsem_t dataReady; void* tsdb; int32_t vgId; diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 6ec2d82445..7a963e9ce4 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -26,8 +26,8 @@ typedef struct { int num; int numOfReqs; int msgSize; - sem_t rspSem; - sem_t *pOverSem; + tsem_t rspSem; + tsem_t *pOverSem; pthread_t thread; void *pRpc; } SInfo; @@ -39,7 +39,7 @@ static void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { if (pEpSet) pInfo->epSet = *pEpSet; rpcFreeCont(pMsg->pCont); - sem_post(&pInfo->rspSem); + tsem_post(&pInfo->rspSem); } static int tcount = 0; @@ -60,7 +60,7 @@ static void *sendRequest(void *param) { rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg); if ( pInfo->num % 20000 == 0 ) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); - sem_wait(&pInfo->rspSem); + tsem_wait(&pInfo->rspSem); } tDebug("thread:%d, it is over", pInfo->index); @@ -171,7 +171,7 @@ int main(int argc, char *argv[]) { pInfo->epSet = epSet; pInfo->numOfReqs = numOfReqs; pInfo->msgSize = msgSize; - sem_init(&pInfo->rspSem, 0, 0); + tsem_init(&pInfo->rspSem, 0, 0); pInfo->pRpc = pRpc; pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); pInfo++; diff --git a/src/rpc/test/rsclient.c b/src/rpc/test/rsclient.c index 6e6961784b..a152d8e4a5 100644 --- a/src/rpc/test/rsclient.c +++ b/src/rpc/test/rsclient.c @@ -27,8 +27,8 @@ typedef struct { int num; int numOfReqs; int msgSize; - sem_t rspSem; - sem_t *pOverSem; + tsem_t rspSem; + tsem_t *pOverSem; pthread_t thread; void *pRpc; } SInfo; @@ -171,7 +171,7 @@ int main(int argc, char *argv[]) { pInfo->epSet = epSet; pInfo->numOfReqs = numOfReqs; pInfo->msgSize = msgSize; - sem_init(&pInfo->rspSem, 0, 0); + tsem_init(&pInfo->rspSem, 0, 0); pInfo->pRpc = pRpc; pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); pInfo++; diff --git a/src/sync/src/tarbitrator.c b/src/sync/src/tarbitrator.c index 3c6db88a9c..3538391a94 100644 --- a/src/sync/src/tarbitrator.c +++ b/src/sync/src/tarbitrator.c @@ -31,7 +31,7 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp); static void arbProcessBrokenLink(void *param); static int arbProcessPeerMsg(void *param, void *buffer); -static sem_t tsArbSem; +static tsem_t tsArbSem; static ttpool_h tsArbTcpPool; typedef struct { @@ -61,7 +61,7 @@ int main(int argc, char *argv[]) { } } - if (sem_init(&tsArbSem, 0, 0) != 0) { + if (tsem_init(&tsArbSem, 0, 0) != 0) { printf("failed to create exit semphore\n"); exit(EXIT_FAILURE); } @@ -98,7 +98,7 @@ int main(int argc, char *argv[]) { sInfo("TAOS arbitrator: %s:%d is running", tsNodeFqdn, tsServerPort); - for (int res = sem_wait(&tsArbSem); res != 0; res = sem_wait(&tsArbSem)) { + for (int res = tsem_wait(&tsArbSem); res != 0; res = tsem_wait(&tsArbSem)) { if (res != EINTR) break; } @@ -185,6 +185,6 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) sInfo("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid); // inform main thread to exit - sem_post(&tsArbSem); + tsem_post(&tsArbSem); } diff --git a/src/sync/test/syncClient.c b/src/sync/test/syncClient.c index cd873b758b..16053d1088 100644 --- a/src/sync/test/syncClient.c +++ b/src/sync/test/syncClient.c @@ -25,8 +25,8 @@ typedef struct { int num; int numOfReqs; int msgSize; - sem_t rspSem; - sem_t *pOverSem; + tsem_t rspSem; + tsem_t *pOverSem; pthread_t thread; void *pRpc; } SInfo; @@ -38,7 +38,7 @@ void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { if (pEpSet) pInfo->epSet = *pEpSet; rpcFreeCont(pMsg->pCont); - sem_post(&pInfo->rspSem); + tsem_post(&pInfo->rspSem); } int tcount = 0; @@ -59,7 +59,7 @@ void *sendRequest(void *param) { rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg); if ( pInfo->num % 20000 == 0 ) uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); - sem_wait(&pInfo->rspSem); + tsem_wait(&pInfo->rspSem); } uDebug("thread:%d, it is over", pInfo->index); @@ -169,7 +169,7 @@ int main(int argc, char *argv[]) { pInfo->epSet = epSet; pInfo->numOfReqs = numOfReqs; pInfo->msgSize = msgSize; - sem_init(&pInfo->rspSem, 0, 0); + tsem_init(&pInfo->rspSem, 0, 0); pInfo->pRpc = pRpc; pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); pInfo++; -- GitLab