提交 cbb350b1 编写于 作者: H hjxilinx

Merge branch 'develop' into feature/liaohj

...@@ -269,13 +269,14 @@ public Connection getConn() throws Exception{ ...@@ -269,13 +269,14 @@ public Connection getConn() throws Exception{
用户可以在源代码的src/connector/python文件夹下找到python2和python3的安装包。用户可以通过pip命令安装: 用户可以在源代码的src/connector/python文件夹下找到python2和python3的安装包。用户可以通过pip命令安装:
`pip install src/connector/python/python2/` `pip install src/connector/python/[linux|windows]/python2/`
`pip install src/connector/python/python3/` `pip install src/connector/python/[linux|windows]/python3/`
如果机器上没有pip命令,用户可将src/connector/python/python3或src/connector/python/python2下的taos文件夹拷贝到应用程序的目录使用。 如果机器上没有pip命令,用户可将src/connector/python/python3或src/connector/python/python2下的taos文件夹拷贝到应用程序的目录使用。
对于windows 客户端,安装TDengine windows 客户端后,将C:\TDengine\driver\taos.dll拷贝到C:\windows\system32目录下即可。所有TDengine的连接器,均需依赖taos.dll。
### Python客户端接口 ### Python客户端接口
......
...@@ -61,13 +61,13 @@ jmethodID g_rowdataSetByteArrayFp; ...@@ -61,13 +61,13 @@ jmethodID g_rowdataSetByteArrayFp;
void jniGetGlobalMethod(JNIEnv *env) { void jniGetGlobalMethod(JNIEnv *env) {
// make sure init function executed once // make sure init function executed once
switch (__sync_val_compare_and_swap_32(&__init, 0, 1)) { switch (atomic_val_compare_exchange_32(&__init, 0, 1)) {
case 0: case 0:
break; break;
case 1: case 1:
do { do {
taosMsleep(0); taosMsleep(0);
} while (__sync_val_load_32(&__init) == 1); } while (atomic_load_32(&__init) == 1);
case 2: case 2:
return; return;
} }
...@@ -107,7 +107,7 @@ void jniGetGlobalMethod(JNIEnv *env) { ...@@ -107,7 +107,7 @@ void jniGetGlobalMethod(JNIEnv *env) {
g_rowdataSetByteArrayFp = (*env)->GetMethodID(env, g_rowdataClass, "setByteArray", "(I[B)V"); g_rowdataSetByteArrayFp = (*env)->GetMethodID(env, g_rowdataClass, "setByteArray", "(I[B)V");
(*env)->DeleteLocalRef(env, rowdataClass); (*env)->DeleteLocalRef(env, rowdataClass);
__sync_val_restore_32(&__init, 2); atomic_store_32(&__init, 2);
jniTrace("native method register finished"); jniTrace("native method register finished");
} }
......
...@@ -353,7 +353,7 @@ static void doQuitSubquery(SSqlObj* pParentSql) { ...@@ -353,7 +353,7 @@ static void doQuitSubquery(SSqlObj* pParentSql) {
} }
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) { static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) {
if (__sync_add_and_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
pSqlObj->res.code = abs(pSupporter->pState->code); pSqlObj->res.code = abs(pSupporter->pState->code);
tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code);
...@@ -412,7 +412,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -412,7 +412,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
taos_fetch_rows_a(tres, joinRetrieveCallback, param); taos_fetch_rows_a(tres, joinRetrieveCallback, param);
} else if (numOfRows == 0) { // no data from this vnode anymore } else if (numOfRows == 0) { // no data from this vnode anymore
if (__sync_add_and_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres, tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres,
...@@ -451,7 +451,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -451,7 +451,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex);
} }
if (__sync_add_and_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
tscTrace("%p secondary retrieve completed, global code:%d", tres, pParentSql->res.code); tscTrace("%p secondary retrieve completed, global code:%d", tres, pParentSql->res.code);
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = abs(pSupporter->pState->code); pParentSql->res.code = abs(pSupporter->pState->code);
...@@ -560,7 +560,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -560,7 +560,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param; SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param;
// if (__sync_add_and_fetch_32(pSupporter->numOfComplete, 1) >= // if (atomic_add_fetch_32(pSupporter->numOfComplete, 1) >=
// pSupporter->numOfTotal) { // pSupporter->numOfTotal) {
// SSqlObj *pParentObj = pSupporter->pObj; // SSqlObj *pParentObj = pSupporter->pObj;
// //
...@@ -605,7 +605,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -605,7 +605,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pParentSql, pSupporter);
} else { } else {
if (__sync_add_and_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
tscSetupOutputColumnIndex(pParentSql); tscSetupOutputColumnIndex(pParentSql);
if (pParentSql->fp == NULL) { if (pParentSql->fp == NULL) {
......
...@@ -432,11 +432,10 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { ...@@ -432,11 +432,10 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
// there is no more result, so we release all allocated resource // there is no more result, so we release all allocated resource
SLocalReducer *pLocalReducer = SLocalReducer *pLocalReducer = (SLocalReducer*)atomic_exchange_ptr(&pRes->pLocalReducer, NULL);
(SLocalReducer *)__sync_val_compare_and_swap_64(&pRes->pLocalReducer, pRes->pLocalReducer, 0);
if (pLocalReducer != NULL) { if (pLocalReducer != NULL) {
int32_t status = 0; int32_t status = 0;
while ((status = __sync_val_compare_and_swap_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, while ((status = atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY,
TSC_LOCALREDUCE_TOBE_FREED)) == TSC_LOCALREDUCE_IN_PROGRESS) { TSC_LOCALREDUCE_TOBE_FREED)) == TSC_LOCALREDUCE_IN_PROGRESS) {
taosMsleep(100); taosMsleep(100);
tscTrace("%p waiting for delete procedure, status: %d", pSql, status); tscTrace("%p waiting for delete procedure, status: %d", pSql, status);
...@@ -1328,7 +1327,7 @@ int32_t tscLocalDoReduce(SSqlObj *pSql) { ...@@ -1328,7 +1327,7 @@ int32_t tscLocalDoReduce(SSqlObj *pSql) {
// set the data merge in progress // set the data merge in progress
int32_t prevStatus = int32_t prevStatus =
__sync_val_compare_and_swap_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS); atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS);
if (prevStatus != TSC_LOCALREDUCE_READY || pLocalReducer == NULL) { if (prevStatus != TSC_LOCALREDUCE_READY || pLocalReducer == NULL) {
assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED); // it is in tscDestroyLocalReducer function already assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED); // it is in tscDestroyLocalReducer function already
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -1030,13 +1030,13 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq ...@@ -1030,13 +1030,13 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
tscProcessSql(pNew); tscProcessSql(pNew);
return; return;
} else { // reach the maximum retry count, abort } else { // reach the maximum retry count, abort
__sync_val_compare_and_swap_32(&trsupport->pState->code, TSDB_CODE_SUCCESS, numOfRows); atomic_val_compare_exchange_32(&trsupport->pState->code, TSDB_CODE_SUCCESS, numOfRows);
tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql, tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql,
numOfRows, idx, trsupport->pState->code); numOfRows, idx, trsupport->pState->code);
} }
} }
if (__sync_add_and_fetch_32(&trsupport->pState->numOfCompleted, 1) < trsupport->pState->numOfTotal) { if (atomic_add_fetch_32(&trsupport->pState->numOfCompleted, 1) < trsupport->pState->numOfTotal) {
return tscFreeSubSqlObj(trsupport, pSql); return tscFreeSubSqlObj(trsupport, pSql);
} }
...@@ -1102,7 +1102,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { ...@@ -1102,7 +1102,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
if (numOfRows > 0) { if (numOfRows > 0) {
assert(pRes->numOfRows == numOfRows); assert(pRes->numOfRows == numOfRows);
__sync_add_and_fetch_64(&trsupport->pState->numOfRetrievedRows, numOfRows); atomic_add_fetch_64(&trsupport->pState->numOfRetrievedRows, numOfRows);
tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql, tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
pRes->numOfRows, trsupport->pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx); pRes->numOfRows, trsupport->pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
...@@ -1161,7 +1161,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { ...@@ -1161,7 +1161,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
} }
if (__sync_add_and_fetch_32(&trsupport->pState->numOfCompleted, 1) < trsupport->pState->numOfTotal) { if (atomic_add_fetch_32(&trsupport->pState->numOfCompleted, 1) < trsupport->pState->numOfTotal) {
return tscFreeSubSqlObj(trsupport, pSql); return tscFreeSubSqlObj(trsupport, pSql);
} }
...@@ -1290,7 +1290,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -1290,7 +1290,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) { if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
tscTrace("%p sub:%p reach the max retry count,set global code:%d", trsupport->pParentSqlObj, pSql, code); tscTrace("%p sub:%p reach the max retry count,set global code:%d", trsupport->pParentSqlObj, pSql, code);
__sync_val_compare_and_swap_32(&trsupport->pState->code, 0, code); atomic_val_compare_exchange_32(&trsupport->pState->code, 0, code);
} else { // does not reach the maximum retry count, go on } else { // does not reach the maximum retry count, go on
tscTrace("%p sub:%p failed code:%d, retry:%d", trsupport->pParentSqlObj, pSql, code, trsupport->numOfRetry); tscTrace("%p sub:%p failed code:%d, retry:%d", trsupport->pParentSqlObj, pSql, code, trsupport->numOfRetry);
......
...@@ -546,7 +546,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -546,7 +546,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
void taos_close_stream(TAOS_STREAM *handle) { void taos_close_stream(TAOS_STREAM *handle) {
SSqlStream *pStream = (SSqlStream *)handle; SSqlStream *pStream = (SSqlStream *)handle;
SSqlObj *pSql = (SSqlObj *)__sync_val_compare_and_swap_64(&pStream->pSql, pStream->pSql, 0); SSqlObj *pSql = (SSqlObj *)atomic_exchange_ptr(&pStream->pSql, 0);
if (pSql == NULL) { if (pSql == NULL) {
return; return;
} }
......
...@@ -182,51 +182,45 @@ void taos_init_imp() { ...@@ -182,51 +182,45 @@ void taos_init_imp() {
void taos_init() { pthread_once(&tscinit, taos_init_imp); } void taos_init() { pthread_once(&tscinit, taos_init_imp); }
int taos_options(TSDB_OPTION option, const void *arg, ...) { static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
char * pStr = NULL; SGlobalConfig *cfg = NULL;
SGlobalConfig *cfg_configDir = tsGetConfigOption("configDir");
SGlobalConfig *cfg_activetimer = tsGetConfigOption("shellActivityTimer");
SGlobalConfig *cfg_locale = tsGetConfigOption("locale");
SGlobalConfig *cfg_charset = tsGetConfigOption("charset");
SGlobalConfig *cfg_timezone = tsGetConfigOption("timezone");
SGlobalConfig *cfg_socket = tsGetConfigOption("sockettype");
switch (option) { switch (option) {
case TSDB_OPTION_CONFIGDIR: case TSDB_OPTION_CONFIGDIR:
pStr = (char *)arg; cfg = tsGetConfigOption("configDir");
if (cfg_configDir && cfg_configDir->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
strncpy(configDir, pStr, TSDB_FILENAME_LEN); strncpy(configDir, pStr, TSDB_FILENAME_LEN);
cfg_configDir->cfgStatus = TSDB_CFG_CSTATUS_OPTION; cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
tscPrint("set config file directory:%s", pStr); tscPrint("set config file directory:%s", pStr);
} else { } else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg_configDir->option, pStr, tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr,
tsCfgStatusStr[cfg_configDir->cfgStatus], (char *)cfg_configDir->ptr); tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
} }
break; break;
case TSDB_OPTION_SHELL_ACTIVITY_TIMER: case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
if (cfg_activetimer && cfg_activetimer->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { cfg = tsGetConfigOption("shellActivityTimer");
tsShellActivityTimer = atoi((char *)arg); if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
tsShellActivityTimer = atoi(pStr);
if (tsShellActivityTimer < 1) tsShellActivityTimer = 1; if (tsShellActivityTimer < 1) tsShellActivityTimer = 1;
if (tsShellActivityTimer > 3600) tsShellActivityTimer = 3600; if (tsShellActivityTimer > 3600) tsShellActivityTimer = 3600;
cfg_activetimer->cfgStatus = TSDB_CFG_CSTATUS_OPTION; cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
tscPrint("set shellActivityTimer:%d", tsShellActivityTimer); tscPrint("set shellActivityTimer:%d", tsShellActivityTimer);
} else { } else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg_activetimer->option, pStr, tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, pStr,
tsCfgStatusStr[cfg_activetimer->cfgStatus], (int32_t *)cfg_activetimer->ptr); tsCfgStatusStr[cfg->cfgStatus], (int32_t *)cfg->ptr);
} }
break; break;
case TSDB_OPTION_LOCALE: { // set locale case TSDB_OPTION_LOCALE: { // set locale
pStr = (char *)arg; cfg = tsGetConfigOption("locale");
size_t len = strlen(pStr); size_t len = strlen(pStr);
if (len == 0 || len > TSDB_LOCALE_LEN) { if (len == 0 || len > TSDB_LOCALE_LEN) {
tscPrint("Invalid locale:%s, use default", pStr); tscPrint("Invalid locale:%s, use default", pStr);
return -1; return -1;
} }
if (cfg_locale && cfg_charset && cfg_locale->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { if (cfg && cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
char sep = '.'; char sep = '.';
if (strlen(tsLocale) == 0) { // locale does not set yet if (strlen(tsLocale) == 0) { // locale does not set yet
...@@ -239,7 +233,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { ...@@ -239,7 +233,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
if (locale != NULL) { if (locale != NULL) {
tscPrint("locale set, prev locale:%s, new locale:%s", tsLocale, locale); tscPrint("locale set, prev locale:%s, new locale:%s", tsLocale, locale);
cfg_locale->cfgStatus = TSDB_CFG_CSTATUS_OPTION; cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
} else { // set the user-specified localed failed, use default LC_CTYPE as current locale } else { // set the user-specified localed failed, use default LC_CTYPE as current locale
locale = setlocale(LC_CTYPE, tsLocale); locale = setlocale(LC_CTYPE, tsLocale);
tscPrint("failed to set locale:%s, current locale:%s", pStr, tsLocale); tscPrint("failed to set locale:%s, current locale:%s", pStr, tsLocale);
...@@ -261,7 +255,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { ...@@ -261,7 +255,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
} }
strncpy(tsCharset, charset, tListLen(tsCharset)); strncpy(tsCharset, charset, tListLen(tsCharset));
cfg_charset->cfgStatus = TSDB_CFG_CSTATUS_OPTION; cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
} else { } else {
tscPrint("charset:%s is not valid in locale, charset remains:%s", charset, tsCharset); tscPrint("charset:%s is not valid in locale, charset remains:%s", charset, tsCharset);
...@@ -272,23 +266,22 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { ...@@ -272,23 +266,22 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
tscPrint("charset remains:%s", tsCharset); tscPrint("charset remains:%s", tsCharset);
} }
} else { } else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg_locale->option, pStr, tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr,
tsCfgStatusStr[cfg_locale->cfgStatus], (char *)cfg_locale->ptr); tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
} }
break; break;
} }
case TSDB_OPTION_CHARSET: { case TSDB_OPTION_CHARSET: {
/* set charset will override the value of charset, assigned during system locale changed */ /* set charset will override the value of charset, assigned during system locale changed */
pStr = (char *)arg; cfg = tsGetConfigOption("charset");
size_t len = strlen(pStr); size_t len = strlen(pStr);
if (len == 0 || len > TSDB_LOCALE_LEN) { if (len == 0 || len > TSDB_LOCALE_LEN) {
tscPrint("failed to set charset:%s", pStr); tscPrint("failed to set charset:%s", pStr);
return -1; return -1;
} }
if (cfg_charset && cfg_charset->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
if (taosValidateEncodec(pStr)) { if (taosValidateEncodec(pStr)) {
if (strlen(tsCharset) == 0) { if (strlen(tsCharset) == 0) {
tscPrint("charset is set:%s", pStr); tscPrint("charset is set:%s", pStr);
...@@ -297,40 +290,41 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { ...@@ -297,40 +290,41 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
} }
strncpy(tsCharset, pStr, tListLen(tsCharset)); strncpy(tsCharset, pStr, tListLen(tsCharset));
cfg_charset->cfgStatus = TSDB_CFG_CSTATUS_OPTION; cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
} else { } else {
tscPrint("charset:%s not valid", pStr); tscPrint("charset:%s not valid", pStr);
} }
} else { } else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg_charset->option, pStr, tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr,
tsCfgStatusStr[cfg_charset->cfgStatus], (char *)cfg_charset->ptr); tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
} }
break; break;
} }
case TSDB_OPTION_TIMEZONE: case TSDB_OPTION_TIMEZONE:
pStr = (char *)arg; cfg = tsGetConfigOption("timezone");
if (cfg_timezone && cfg_timezone->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
strcpy(tsTimezone, pStr); strcpy(tsTimezone, pStr);
tsSetTimeZone(); tsSetTimeZone();
cfg_timezone->cfgStatus = TSDB_CFG_CSTATUS_OPTION; cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
tscTrace("timezone set:%s, input:%s by taos_options", tsTimezone, pStr); tscTrace("timezone set:%s, input:%s by taos_options", tsTimezone, pStr);
} else { } else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg_timezone->option, pStr, tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr,
tsCfgStatusStr[cfg_timezone->cfgStatus], (char *)cfg_timezone->ptr); tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
} }
break; break;
case TSDB_OPTION_SOCKET_TYPE: case TSDB_OPTION_SOCKET_TYPE:
if (cfg_socket && cfg_socket->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { cfg = tsGetConfigOption("sockettype");
if (strcasecmp(arg, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(arg, TAOS_SOCKET_TYPE_NAME_TCP) != 0) { if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) {
tscError("only 'tcp' or 'udp' allowed for configuring the socket type"); tscError("only 'tcp' or 'udp' allowed for configuring the socket type");
return -1; return -1;
} }
strncpy(tsSocketType, arg, tListLen(tsSocketType)); strncpy(tsSocketType, pStr, tListLen(tsSocketType));
cfg_socket->cfgStatus = TSDB_CFG_CSTATUS_OPTION; cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
tscPrint("socket type is set:%s", tsSocketType); tscPrint("socket type is set:%s", tsSocketType);
} }
break; break;
...@@ -342,3 +336,20 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { ...@@ -342,3 +336,20 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
return 0; return 0;
} }
int taos_options(TSDB_OPTION option, const void *arg, ...) {
static int32_t lock = 0;
for (int i = 1; atomic_val_compare_exchange_32(&lock, 0, 1) != 0; ++i) {
if (i % 1000 == 0) {
tscPrint("haven't acquire lock after spin %d times.", i);
sched_yield();
}
}
int ret = taos_options_imp(option, (const char*)arg);
atomic_store_32(&lock, 0);
return ret;
}
\ No newline at end of file
...@@ -247,7 +247,7 @@ typedef struct { ...@@ -247,7 +247,7 @@ typedef struct {
extern SGlobalConfig *tsGlobalConfig; extern SGlobalConfig *tsGlobalConfig;
extern int tsGlobalConfigNum; extern int tsGlobalConfigNum;
extern char * tsCfgStatusStr[]; extern char * tsCfgStatusStr[];
SGlobalConfig *tsGetConfigOption(char *option); SGlobalConfig *tsGetConfigOption(const char *option);
#define TSDB_CFG_MAX_NUM 110 #define TSDB_CFG_MAX_NUM 110
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
......
...@@ -72,7 +72,7 @@ void httpRemoveContextFromEpoll(HttpThread *pThread, HttpContext *pContext) { ...@@ -72,7 +72,7 @@ void httpRemoveContextFromEpoll(HttpThread *pThread, HttpContext *pContext) {
} }
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) { bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) {
return (__sync_val_compare_and_swap_32(&pContext->state, srcState, destState) == srcState); return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState);
} }
void httpFreeContext(HttpServer *pServer, HttpContext *pContext); void httpFreeContext(HttpServer *pServer, HttpContext *pContext);
...@@ -124,7 +124,7 @@ void httpCleanUpContextTimer(HttpContext *pContext) { ...@@ -124,7 +124,7 @@ void httpCleanUpContextTimer(HttpContext *pContext) {
void httpCleanUpContext(HttpContext *pContext) { void httpCleanUpContext(HttpContext *pContext) {
httpTrace("context:%p, start the clean up operation", pContext); httpTrace("context:%p, start the clean up operation", pContext);
__sync_val_compare_and_swap_64(&pContext->signature, pContext, 0); atomic_val_compare_exchange_ptr(&pContext->signature, pContext, 0);
if (pContext->signature != NULL) { if (pContext->signature != NULL) {
httpTrace("context:%p is freed by another thread.", pContext); httpTrace("context:%p is freed by another thread.", pContext);
return; return;
...@@ -494,7 +494,7 @@ void httpProcessHttpData(void *param) { ...@@ -494,7 +494,7 @@ void httpProcessHttpData(void *param) {
} else { } else {
if (httpReadData(pThread, pContext)) { if (httpReadData(pThread, pContext)) {
(*(pThread->processData))(pContext); (*(pThread->processData))(pContext);
__sync_fetch_and_add(&pThread->pServer->requestNum, 1); atomic_fetch_add_32(&pThread->pServer->requestNum, 1);
} }
} }
} }
......
...@@ -77,7 +77,7 @@ int httpStartSystem() { ...@@ -77,7 +77,7 @@ int httpStartSystem() {
if (httpServer == NULL) { if (httpServer == NULL) {
httpError("http server is null"); httpError("http server is null");
return -1; httpInitSystem();
} }
if (httpServer->pContextPool == NULL) { if (httpServer->pContextPool == NULL) {
...@@ -148,7 +148,7 @@ void httpCleanUpSystem() { ...@@ -148,7 +148,7 @@ void httpCleanUpSystem() {
void httpGetReqCount(int32_t *httpReqestNum) { void httpGetReqCount(int32_t *httpReqestNum) {
if (httpServer != NULL) { if (httpServer != NULL) {
*httpReqestNum = __sync_fetch_and_and(&httpServer->requestNum, 0); *httpReqestNum = atomic_exchange_32(&httpServer->requestNum, 0);
} else { } else {
*httpReqestNum = 0; *httpReqestNum = 0;
} }
......
...@@ -95,6 +95,9 @@ int monitorInitSystem() { ...@@ -95,6 +95,9 @@ int monitorInitSystem() {
} }
int monitorStartSystem() { int monitorStartSystem() {
if (monitor == NULL) {
monitorInitSystem();
}
taosTmrReset(monitorInitConn, 10, NULL, tscTmr, &monitor->initTimer); taosTmrReset(monitorInitConn, 10, NULL, tscTmr, &monitor->initTimer);
return 0; return 0;
} }
......
...@@ -73,28 +73,71 @@ ...@@ -73,28 +73,71 @@
#define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) #define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) #define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler #define atomic_val_compare_exchange_8 __sync_val_compare_and_swap
// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins. #define atomic_val_compare_exchange_16 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap #define atomic_val_compare_exchange_32 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap #define atomic_val_compare_exchange_64 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap #define atomic_val_compare_exchange_ptr __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap #define atomic_add_fetch_8(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_add_fetch_16(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_add_and_fetch_64 __sync_add_and_fetch #define atomic_add_fetch_32(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_add_and_fetch_32 __sync_add_and_fetch #define atomic_add_fetch_64(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_add_and_fetch_16 __sync_add_and_fetch #define atomic_add_fetch_ptr(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_add_and_fetch_8 __sync_add_and_fetch
#define __sync_add_and_fetch_ptr __sync_add_and_fetch #define atomic_fetch_add_8(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_add_16(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_sub_and_fetch_64 __sync_sub_and_fetch #define atomic_fetch_add_32(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_sub_and_fetch_32 __sync_sub_and_fetch #define atomic_fetch_add_64(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_sub_and_fetch_16 __sync_sub_and_fetch #define atomic_fetch_add_ptr(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_sub_and_fetch_8 __sync_sub_and_fetch
#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch #define atomic_sub_fetch_8(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_16(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
int32_t __sync_val_load_32(int32_t *ptr); #define atomic_sub_fetch_32(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
void __sync_val_restore_32(int32_t *ptr, int32_t newval); #define atomic_sub_fetch_64(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_ptr(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_8(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_16(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_32(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_64(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_ptr(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_8(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_16(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_32(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_64(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_ptr(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_8(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_16(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_32(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_64(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_ptr(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_8(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_16(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_32(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_64(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_ptr(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_8(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_16(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_32(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_64(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_ptr(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_8(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_16(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_32(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_64(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_ptr(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_8(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_16(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_32(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_64(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_ptr(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define SWAP(a, b, c) \ #define SWAP(a, b, c) \
do { \ do { \
......
...@@ -416,11 +416,3 @@ int tsem_post(dispatch_semaphore_t *sem) { ...@@ -416,11 +416,3 @@ int tsem_post(dispatch_semaphore_t *sem) {
int tsem_destroy(dispatch_semaphore_t *sem) { int tsem_destroy(dispatch_semaphore_t *sem) {
return 0; return 0;
} }
int32_t __sync_val_load_32(int32_t *ptr) {
return __atomic_load_n(ptr, __ATOMIC_ACQUIRE);
}
void __sync_val_restore_32(int32_t *ptr, int32_t newval) {
__atomic_store_n(ptr, newval, __ATOMIC_RELEASE);
}
\ No newline at end of file
...@@ -92,28 +92,71 @@ extern "C" { ...@@ -92,28 +92,71 @@ extern "C" {
#define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) #define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) #define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler #define atomic_val_compare_exchange_8 __sync_val_compare_and_swap
// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins. #define atomic_val_compare_exchange_16 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap #define atomic_val_compare_exchange_32 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap #define atomic_val_compare_exchange_64 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap #define atomic_val_compare_exchange_ptr __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap #define atomic_add_fetch_8(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_add_fetch_16(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_add_and_fetch_64 __sync_add_and_fetch #define atomic_add_fetch_32(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_add_and_fetch_32 __sync_add_and_fetch #define atomic_add_fetch_64(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_add_and_fetch_16 __sync_add_and_fetch #define atomic_add_fetch_ptr(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_add_and_fetch_8 __sync_add_and_fetch
#define __sync_add_and_fetch_ptr __sync_add_and_fetch #define atomic_fetch_add_8(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_add_16(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_sub_and_fetch_64 __sync_sub_and_fetch #define atomic_fetch_add_32(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_sub_and_fetch_32 __sync_sub_and_fetch #define atomic_fetch_add_64(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_sub_and_fetch_16 __sync_sub_and_fetch #define atomic_fetch_add_ptr(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define __sync_sub_and_fetch_8 __sync_sub_and_fetch
#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch #define atomic_sub_fetch_8(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_16(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
int32_t __sync_val_load_32(int32_t *ptr); #define atomic_sub_fetch_32(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
void __sync_val_restore_32(int32_t *ptr, int32_t newval); #define atomic_sub_fetch_64(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_ptr(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_8(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_16(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_32(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_64(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_ptr(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_8(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_16(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_32(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_64(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_ptr(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_8(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_16(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_32(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_64(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_ptr(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_8(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_16(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_32(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_64(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_ptr(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_8(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_16(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_32(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_64(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_ptr(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_8(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_16(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_32(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_64(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_ptr(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_8(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_16(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_32(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_64(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_ptr(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define SWAP(a, b, c) \ #define SWAP(a, b, c) \
do { \ do { \
......
...@@ -340,11 +340,3 @@ bool taosSkipSocketCheck() { ...@@ -340,11 +340,3 @@ bool taosSkipSocketCheck() {
return false; return false;
} }
int32_t __sync_val_load_32(int32_t *ptr) {
return __atomic_load_n(ptr, __ATOMIC_ACQUIRE);
}
void __sync_val_restore_32(int32_t *ptr, int32_t newval) {
__atomic_store_n(ptr, newval, __ATOMIC_RELEASE);
}
...@@ -81,6 +81,10 @@ extern "C" { ...@@ -81,6 +81,10 @@ extern "C" {
#if defined(_M_ARM) || defined(_M_ARM64) #if defined(_M_ARM) || defined(_M_ARM64)
/* the '__iso_volatile' functions does not use a memory fence, so these
* definitions are incorrect, comment out as we don't support Windows on
* ARM at present.
#define atomic_load_8(ptr) __iso_volatile_load8((const volatile __int8*)(ptr)) #define atomic_load_8(ptr) __iso_volatile_load8((const volatile __int8*)(ptr))
#define atomic_load_16(ptr) __iso_volatile_load16((const volatile __int16*)(ptr)) #define atomic_load_16(ptr) __iso_volatile_load16((const volatile __int16*)(ptr))
#define atomic_load_32(ptr) __iso_volatile_load32((const volatile __int32*)(ptr)) #define atomic_load_32(ptr) __iso_volatile_load32((const volatile __int32*)(ptr))
...@@ -98,7 +102,7 @@ extern "C" { ...@@ -98,7 +102,7 @@ extern "C" {
#define atomic_load_ptr atomic_load_32 #define atomic_load_ptr atomic_load_32
#define atomic_store_ptr atomic_store_32 #define atomic_store_ptr atomic_store_32
#endif #endif
*/
#else #else
#define atomic_load_8(ptr) (*(char volatile*)(ptr)) #define atomic_load_8(ptr) (*(char volatile*)(ptr))
...@@ -121,35 +125,152 @@ extern "C" { ...@@ -121,35 +125,152 @@ extern "C" {
#define atomic_exchange_64(ptr, val) _InterlockedExchange64((__int64 volatile*)(ptr), (__int64)(val)) #define atomic_exchange_64(ptr, val) _InterlockedExchange64((__int64 volatile*)(ptr), (__int64)(val))
#define atomic_exchange_ptr(ptr, val) _InterlockedExchangePointer((void* volatile*)(ptr), (void*)(val)) #define atomic_exchange_ptr(ptr, val) _InterlockedExchangePointer((void* volatile*)(ptr), (void*)(val))
#define __sync_val_compare_and_swap_8(ptr, oldval, newval) _InterlockedCompareExchange8((char volatile*)(ptr), (char)(newval), (char)(oldval)) #define atomic_val_compare_exchange_8(ptr, oldval, newval) _InterlockedCompareExchange8((char volatile*)(ptr), (char)(newval), (char)(oldval))
#define __sync_val_compare_and_swap_16(ptr, oldval, newval) _InterlockedCompareExchange16((short volatile*)(ptr), (short)(newval), (short)(oldval)) #define atomic_val_compare_exchange_16(ptr, oldval, newval) _InterlockedCompareExchange16((short volatile*)(ptr), (short)(newval), (short)(oldval))
#define __sync_val_compare_and_swap_32(ptr, oldval, newval) _InterlockedCompareExchange((long volatile*)(ptr), (long)(newval), (long)(oldval)) #define atomic_val_compare_exchange_32(ptr, oldval, newval) _InterlockedCompareExchange((long volatile*)(ptr), (long)(newval), (long)(oldval))
#define __sync_val_compare_and_swap_64(ptr, oldval, newval) _InterlockedCompareExchange64((__int64 volatile*)(ptr), (__int64)(newval), (__int64)(oldval)) #define atomic_val_compare_exchange_64(ptr, oldval, newval) _InterlockedCompareExchange64((__int64 volatile*)(ptr), (__int64)(newval), (__int64)(oldval))
#define __sync_val_compare_and_swap_ptr(ptr, oldval, newval) _InterlockedCompareExchangePointer((void* volatile*)(ptr), (void*)(newval), (void*)(oldval)) #define atomic_val_compare_exchange_ptr(ptr, oldval, newval) _InterlockedCompareExchangePointer((void* volatile*)(ptr), (void*)(newval), (void*)(oldval))
char interlocked_add_8(char volatile *ptr, char val); char interlocked_add_fetch_8(char volatile *ptr, char val);
short interlocked_add_16(short volatile *ptr, short val); short interlocked_add_fetch_16(short volatile *ptr, short val);
long interlocked_add_32(long volatile *ptr, long val); long interlocked_add_fetch_32(long volatile *ptr, long val);
__int64 interlocked_add_64(__int64 volatile *ptr, __int64 val); __int64 interlocked_add_fetch_64(__int64 volatile *ptr, __int64 val);
#define __sync_add_and_fetch_8(ptr, val) interlocked_add_8((char volatile*)(ptr), (char)(val)) #define atomic_add_fetch_8(ptr, val) interlocked_add_fetch_8((char volatile*)(ptr), (char)(val))
#define __sync_add_and_fetch_16(ptr, val) interlocked_add_16((short volatile*)(ptr), (short)(val)) #define atomic_add_fetch_16(ptr, val) interlocked_add_fetch_16((short volatile*)(ptr), (short)(val))
#define __sync_add_and_fetch_32(ptr, val) interlocked_add_32((long volatile*)(ptr), (long)(val)) #define atomic_add_fetch_32(ptr, val) interlocked_add_fetch_32((long volatile*)(ptr), (long)(val))
#define __sync_add_and_fetch_64(ptr, val) interlocked_add_64((__int64 volatile*)(ptr), (__int64)(val)) #define atomic_add_fetch_64(ptr, val) interlocked_add_fetch_64((__int64 volatile*)(ptr), (__int64)(val))
#ifdef _WIN64
#define atomic_add_fetch_ptr atomic_add_fetch_64
#else
#define atomic_add_fetch_ptr atomic_add_fetch_32
#endif
#define atomic_fetch_add_8(ptr, val) _InterlockedExchangeAdd8((char volatile*)(ptr), (char)(val))
#define atomic_fetch_add_16(ptr, val) _InterlockedExchangeAdd16((short volatile*)(ptr), (short)(val))
#define atomic_fetch_add_32(ptr, val) _InterlockedExchangeAdd((long volatile*)(ptr), (long)(val))
#define atomic_fetch_add_64(ptr, val) _InterlockedExchangeAdd64((__int64 volatile*)(ptr), (__int64)(val))
#ifdef _WIN64
#define atomic_fetch_add_ptr atomic_fetch_add_64
#else
#define atomic_fetch_add_ptr atomic_fetch_add_32
#endif
#define atomic_sub_fetch_8(ptr, val) interlocked_add_fetch_8((char volatile*)(ptr), -(char)(val))
#define atomic_sub_fetch_16(ptr, val) interlocked_add_fetch_16((short volatile*)(ptr), -(short)(val))
#define atomic_sub_fetch_32(ptr, val) interlocked_add_fetch_32((long volatile*)(ptr), -(long)(val))
#define atomic_sub_fetch_64(ptr, val) interlocked_add_fetch_64((__int64 volatile*)(ptr), -(__int64)(val))
#ifdef _WIN64
#define atomic_sub_fetch_ptr atomic_sub_fetch_64
#else
#define atomic_sub_fetch_ptr atomic_sub_fetch_32
#endif
#define atomic_fetch_sub_8(ptr, val) _InterlockedExchangeAdd8((char volatile*)(ptr), -(char)(val))
#define atomic_fetch_sub_16(ptr, val) _InterlockedExchangeAdd16((short volatile*)(ptr), -(short)(val))
#define atomic_fetch_sub_32(ptr, val) _InterlockedExchangeAdd((long volatile*)(ptr), -(long)(val))
#define atomic_fetch_sub_64(ptr, val) _InterlockedExchangeAdd64((__int64 volatile*)(ptr), -(__int64)(val))
#ifdef _WIN64
#define atomic_fetch_sub_ptr atomic_fetch_sub_64
#else
#define atomic_fetch_sub_ptr atomic_fetch_sub_32
#endif
char interlocked_and_fetch_8(char volatile* ptr, char val);
short interlocked_and_fetch_16(short volatile* ptr, short val);
long interlocked_and_fetch_32(long volatile* ptr, long val);
__int64 interlocked_and_fetch_64(__int64 volatile* ptr, __int64 val);
#define atomic_and_fetch_8(ptr, val) interlocked_and_fetch_8((char volatile*)(ptr), (char)(val))
#define atomic_and_fetch_16(ptr, val) interlocked_and_fetch_16((short volatile*)(ptr), (short)(val))
#define atomic_and_fetch_32(ptr, val) interlocked_and_fetch_32((long volatile*)(ptr), (long)(val))
#define atomic_and_fetch_64(ptr, val) interlocked_and_fetch_64((__int64 volatile*)(ptr), (__int64)(val))
#ifdef _WIN64
#define atomic_and_fetch_ptr atomic_and_fetch_64
#else
#define atomic_and_fetch_ptr atomic_and_fetch_32
#endif
#define atomic_fetch_and_8(ptr, val) _InterlockedAnd8((char volatile*)(ptr), (char)(val))
#define atomic_fetch_and_16(ptr, val) _InterlockedAnd16((short volatile*)(ptr), (short)(val))
#define atomic_fetch_and_32(ptr, val) _InterlockedAnd((long volatile*)(ptr), (long)(val))
#ifdef _M_IX86
__int64 interlocked_fetch_and_64(__int64 volatile* ptr, __int64 val);
#define atomic_fetch_and_64(ptr, val) interlocked_fetch_and_64((__int64 volatile*)(ptr), (__int64)(val))
#else
#define atomic_fetch_and_64(ptr, val) _InterlockedAnd64((__int64 volatile*)(ptr), (__int64)(val))
#endif
#ifdef _WIN64
#define atomic_fetch_and_ptr atomic_fetch_and_64
#else
#define atomic_fetch_and_ptr atomic_fetch_and_32
#endif
char interlocked_or_fetch_8(char volatile* ptr, char val);
short interlocked_or_fetch_16(short volatile* ptr, short val);
long interlocked_or_fetch_32(long volatile* ptr, long val);
__int64 interlocked_or_fetch_64(__int64 volatile* ptr, __int64 val);
#define atomic_or_fetch_8(ptr, val) interlocked_or_fetch_8((char volatile*)(ptr), (char)(val))
#define atomic_or_fetch_16(ptr, val) interlocked_or_fetch_16((short volatile*)(ptr), (short)(val))
#define atomic_or_fetch_32(ptr, val) interlocked_or_fetch_32((long volatile*)(ptr), (long)(val))
#define atomic_or_fetch_64(ptr, val) interlocked_or_fetch_64((__int64 volatile*)(ptr), (__int64)(val))
#ifdef _WIN64
#define atomic_or_fetch_ptr atomic_or_fetch_64
#else
#define atomic_or_fetch_ptr atomic_or_fetch_32
#endif
#define atomic_fetch_or_8(ptr, val) _InterlockedOr8((char volatile*)(ptr), (char)(val))
#define atomic_fetch_or_16(ptr, val) _InterlockedOr16((short volatile*)(ptr), (short)(val))
#define atomic_fetch_or_32(ptr, val) _InterlockedOr((long volatile*)(ptr), (long)(val))
#ifdef _M_IX86
__int64 interlocked_fetch_or_64(__int64 volatile* ptr, __int64 val);
#define atomic_fetch_or_64(ptr, val) interlocked_fetch_or_64((__int64 volatile*)(ptr), (__int64)(val))
#else
#define atomic_fetch_or_64(ptr, val) _InterlockedOr64((__int64 volatile*)(ptr), (__int64)(val))
#endif
#ifdef _WIN64
#define atomic_fetch_or_ptr atomic_fetch_or_64
#else
#define atomic_fetch_or_ptr atomic_fetch_or_32
#endif
char interlocked_xor_fetch_8(char volatile* ptr, char val);
short interlocked_xor_fetch_16(short volatile* ptr, short val);
long interlocked_xor_fetch_32(long volatile* ptr, long val);
__int64 interlocked_xor_fetch_64(__int64 volatile* ptr, __int64 val);
#define atomic_xor_fetch_8(ptr, val) interlocked_xor_fetch_8((char volatile*)(ptr), (char)(val))
#define atomic_xor_fetch_16(ptr, val) interlocked_xor_fetch_16((short volatile*)(ptr), (short)(val))
#define atomic_xor_fetch_32(ptr, val) interlocked_xor_fetch_32((long volatile*)(ptr), (long)(val))
#define atomic_xor_fetch_64(ptr, val) interlocked_xor_fetch_64((__int64 volatile*)(ptr), (__int64)(val))
#ifdef _WIN64 #ifdef _WIN64
#define __sync_add_and_fetch_ptr __sync_add_and_fetch_64 #define atomic_xor_fetch_ptr atomic_xor_fetch_64
#else #else
#define __sync_add_and_fetch_ptr __sync_add_and_fetch_32 #define atomic_xor_fetch_ptr atomic_xor_fetch_32
#endif #endif
#define __sync_sub_and_fetch_8(ptr, val) __sync_add_and_fetch_8((ptr), -(val)) #define atomic_fetch_xor_8(ptr, val) _InterlockedXor8((char volatile*)(ptr), (char)(val))
#define __sync_sub_and_fetch_16(ptr, val) __sync_add_and_fetch_16((ptr), -(val)) #define atomic_fetch_xor_16(ptr, val) _InterlockedXor16((short volatile*)(ptr), (short)(val))
#define __sync_sub_and_fetch_32(ptr, val) __sync_add_and_fetch_32((ptr), -(val)) #define atomic_fetch_xor_32(ptr, val) _InterlockedXor((long volatile*)(ptr), (long)(val))
#define __sync_sub_and_fetch_64(ptr, val) __sync_add_and_fetch_64((ptr), -(val))
#define __sync_sub_and_fetch_ptr(ptr, val) __sync_add_and_fetch_ptr((ptr), -(val))
int32_t __sync_val_load_32(int32_t *ptr); #ifdef _M_IX86
void __sync_val_restore_32(int32_t *ptr, int32_t newval); __int64 interlocked_fetch_xor_64(__int64 volatile* ptr, __int64 val);
#define atomic_fetch_xor_64(ptr, val) interlocked_fetch_xor_64((__int64 volatile*)(ptr), (__int64)(val))
#else
#define atomic_fetch_xor_64(ptr, val) _InterlockedXor64((__int64 volatile*)(ptr), (__int64)(val))
#endif
#ifdef _WIN64
#define atomic_fetch_xor_ptr atomic_fetch_xor_64
#else
#define atomic_fetch_xor_ptr atomic_fetch_xor_32
#endif
#define SWAP(a, b, c) \ #define SWAP(a, b, c) \
do { \ do { \
......
...@@ -66,31 +66,143 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle ...@@ -66,31 +66,143 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle
return setsockopt(socketfd, level, optname, optval, optlen); return setsockopt(socketfd, level, optname, optval, optlen);
} }
// add
char interlocked_add_8(char volatile* ptr, char val) { char interlocked_add_fetch_8(char volatile* ptr, char val) {
return _InterlockedExchangeAdd8(ptr, val) + val; return _InterlockedExchangeAdd8(ptr, val) + val;
} }
short interlocked_add_16(short volatile* ptr, short val) { short interlocked_add_fetch_16(short volatile* ptr, short val) {
return _InterlockedExchangeAdd16(ptr, val) + val; return _InterlockedExchangeAdd16(ptr, val) + val;
} }
long interlocked_add_32(long volatile* ptr, long val) { long interlocked_add_fetch_32(long volatile* ptr, long val) {
return _InterlockedExchangeAdd(ptr, val) + val; return _InterlockedExchangeAdd(ptr, val) + val;
} }
__int64 interlocked_add_64(__int64 volatile* ptr, __int64 val) { __int64 interlocked_add_fetch_64(__int64 volatile* ptr, __int64 val) {
return _InterlockedExchangeAdd64(ptr, val) + val; return _InterlockedExchangeAdd64(ptr, val) + val;
} }
int32_t __sync_val_load_32(int32_t *ptr) { // and
return InterlockedOr(ptr, 0); char interlocked_and_fetch_8(char volatile* ptr, char val) {
return _InterlockedAnd8(ptr, val) & val;
}
short interlocked_and_fetch_16(short volatile* ptr, short val) {
return _InterlockedAnd16(ptr, val) & val;
}
long interlocked_and_fetch_32(long volatile* ptr, long val) {
return _InterlockedAnd(ptr, val) & val;
}
#ifndef _M_IX86
__int64 interlocked_and_fetch_64(__int64 volatile* ptr, __int64 val) {
return _InterlockedAnd64(ptr, val) & val;
}
#else
__int64 interlocked_and_fetch_64(__int64 volatile* ptr, __int64 val) {
__int64 old, res;
do {
old = *ptr;
res = old & val;
} while(_InterlockedCompareExchange64(ptr, res, old) != old);
return res;
}
__int64 interlocked_fetch_and_64(__int64 volatile* ptr, __int64 val) {
__int64 old;
do {
old = *ptr;
} while(_InterlockedCompareExchange64(ptr, old & val, old) != old);
return old;
}
#endif
// or
char interlocked_or_fetch_8(char volatile* ptr, char val) {
return _InterlockedOr8(ptr, val) | val;
}
short interlocked_or_fetch_16(short volatile* ptr, short val) {
return _InterlockedOr16(ptr, val) | val;
}
long interlocked_or_fetch_32(long volatile* ptr, long val) {
return _InterlockedOr(ptr, val) | val;
}
#ifndef _M_IX86
__int64 interlocked_or_fetch_64(__int64 volatile* ptr, __int64 val) {
return _InterlockedOr64(ptr, val) & val;
}
#else
__int64 interlocked_or_fetch_64(__int64 volatile* ptr, __int64 val) {
__int64 old, res;
do {
old = *ptr;
res = old | val;
} while(_InterlockedCompareExchange64(ptr, res, old) != old);
return res;
}
__int64 interlocked_fetch_or_64(__int64 volatile* ptr, __int64 val) {
__int64 old;
do {
old = *ptr;
} while(_InterlockedCompareExchange64(ptr, old | val, old) != old);
return old;
}
#endif
// xor
char interlocked_xor_fetch_8(char volatile* ptr, char val) {
return _InterlockedXor8(ptr, val) ^ val;
}
short interlocked_xor_fetch_16(short volatile* ptr, short val) {
return _InterlockedXor16(ptr, val) ^ val;
}
long interlocked_xor_fetch_32(long volatile* ptr, long val) {
return _InterlockedXor(ptr, val) ^ val;
}
#ifndef _M_IX86
__int64 interlocked_xor_fetch_64(__int64 volatile* ptr, __int64 val) {
return _InterlockedXor64(ptr, val) ^ val;
} }
void __sync_val_restore_32(int32_t *ptr, int32_t newval) { #else
InterlockedCompareExchange(ptr, *ptr, newval);
__int64 interlocked_xor_fetch_64(__int64 volatile* ptr, __int64 val) {
__int64 old, res;
do {
old = *ptr;
res = old ^ val;
} while(_InterlockedCompareExchange64(ptr, res, old) != old);
return res;
}
__int64 interlocked_fetch_xor_64(__int64 volatile* ptr, __int64 val) {
__int64 old;
do {
old = *ptr;
} while(_InterlockedCompareExchange64(ptr, old ^ val, old) != old);
return old;
} }
#endif
void tsPrintOsInfo() {} void tsPrintOsInfo() {}
char *taosCharsetReplace(char *charsetstr) { char *taosCharsetReplace(char *charsetstr) {
......
...@@ -164,8 +164,8 @@ char *taosBuildReqHeader(void *param, char type, char *msg) { ...@@ -164,8 +164,8 @@ char *taosBuildReqHeader(void *param, char type, char *msg) {
pHeader->spi = 0; pHeader->spi = 0;
pHeader->tcp = 0; pHeader->tcp = 0;
pHeader->encrypt = 0; pHeader->encrypt = 0;
pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1); pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1);
if (pHeader->tranId == 0) pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1); if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1);
pHeader->sourceId = pConn->ownId; pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId; pHeader->destId = pConn->peerId;
...@@ -196,8 +196,8 @@ char *taosBuildReqMsgWithSize(void *param, char type, int size) { ...@@ -196,8 +196,8 @@ char *taosBuildReqMsgWithSize(void *param, char type, int size) {
pHeader->spi = 0; pHeader->spi = 0;
pHeader->tcp = 0; pHeader->tcp = 0;
pHeader->encrypt = 0; pHeader->encrypt = 0;
pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1); pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1);
if (pHeader->tranId == 0) pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1); if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1);
pHeader->sourceId = pConn->ownId; pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId; pHeader->destId = pConn->peerId;
......
...@@ -218,15 +218,15 @@ typedef struct { ...@@ -218,15 +218,15 @@ typedef struct {
* Only the QInfo.signature == QInfo, this structure can be released safely. * Only the QInfo.signature == QInfo, this structure can be released safely.
*/ */
#define TSDB_QINFO_QUERY_FLAG 0x1 #define TSDB_QINFO_QUERY_FLAG 0x1
#define TSDB_QINFO_RESET_SIG(x) ((x)->signature = (uint64_t)(x)) #define TSDB_QINFO_RESET_SIG(x) atomic_store_64(&((x)->signature), (uint64_t)(x))
#define TSDB_QINFO_SET_QUERY_FLAG(x) \ #define TSDB_QINFO_SET_QUERY_FLAG(x) \
__sync_val_compare_and_swap(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG); atomic_val_compare_exchange_64(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG);
// live lock: wait for query reaching a safe-point, release all resources // live lock: wait for query reaching a safe-point, release all resources
// belongs to this query // belongs to this query
#define TSDB_WAIT_TO_SAFE_DROP_QINFO(x) \ #define TSDB_WAIT_TO_SAFE_DROP_QINFO(x) \
{ \ { \
while (__sync_val_compare_and_swap(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \ while (atomic_val_compare_exchange_64(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \
taosMsleep(1); \ taosMsleep(1); \
} \ } \
} }
......
...@@ -33,8 +33,8 @@ ...@@ -33,8 +33,8 @@
#pragma GCC diagnostic push #pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Woverflow" #pragma GCC diagnostic ignored "-Woverflow"
SModule tsModule[TSDB_MOD_MAX]; SModule tsModule[TSDB_MOD_MAX] = {0};
uint32_t tsModuleStatus; uint32_t tsModuleStatus = 0;
pthread_mutex_t dmutex; pthread_mutex_t dmutex;
extern int vnodeSelectReqNum; extern int vnodeSelectReqNum;
extern int vnodeInsertReqNum; extern int vnodeInsertReqNum;
...@@ -216,8 +216,8 @@ void dnodeResetSystem() { ...@@ -216,8 +216,8 @@ void dnodeResetSystem() {
void dnodeCountRequest(SCountInfo *info) { void dnodeCountRequest(SCountInfo *info) {
httpGetReqCount(&info->httpReqNum); httpGetReqCount(&info->httpReqNum);
info->selectReqNum = __sync_fetch_and_and(&vnodeSelectReqNum, 0); info->selectReqNum = atomic_exchange_32(&vnodeSelectReqNum, 0);
info->insertReqNum = __sync_fetch_and_and(&vnodeInsertReqNum, 0); info->insertReqNum = atomic_exchange_32(&vnodeInsertReqNum, 0);
} }
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
\ No newline at end of file
...@@ -922,8 +922,8 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -922,8 +922,8 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) {
taosSendMsgToPeer(pConn->thandle, pStart, msgLen); taosSendMsgToPeer(pConn->thandle, pStart, msgLen);
if (rowsToRead == 0) { if (rowsToRead == 0) {
int64_t oldSign = __sync_val_compare_and_swap(&pShow->signature, (uint64_t)pShow, 0); uintptr_t oldSign = atomic_val_compare_exchange_ptr(&pShow->signature, pShow, 0);
if (oldSign != (uint64_t)pShow) { if (oldSign != (uintptr_t)pShow) {
return msgLen; return msgLen;
} }
// pShow->signature = 0; // pShow->signature = 0;
...@@ -1093,8 +1093,8 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) { ...@@ -1093,8 +1093,8 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) {
} }
void mgmtEstablishConn(SConnObj *pConn) { void mgmtEstablishConn(SConnObj *pConn) {
__sync_fetch_and_add(&mgmtShellConns, 1); atomic_fetch_add_32(&mgmtShellConns, 1);
__sync_fetch_and_add(&sdbExtConns, 1); atomic_fetch_add_32(&sdbExtConns, 1);
pConn->stime = taosGetTimestampMs(); pConn->stime = taosGetTimestampMs();
if (strcmp(pConn->pUser->user, "root") == 0 || strcmp(pConn->pUser->user, pConn->pAcct->user) == 0) { if (strcmp(pConn->pUser->user, "root") == 0 || strcmp(pConn->pUser->user, pConn->pAcct->user) == 0) {
...@@ -1168,8 +1168,8 @@ int mgmtProcessConnectMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -1168,8 +1168,8 @@ int mgmtProcessConnectMsg(char *pMsg, int msgLen, SConnObj *pConn) {
if (pConn->pAcct) { if (pConn->pAcct) {
mgmtRemoveConnFromAcct(pConn); mgmtRemoveConnFromAcct(pConn);
__sync_fetch_and_sub(&mgmtShellConns, 1); atomic_fetch_sub_32(&mgmtShellConns, 1);
__sync_fetch_and_sub(&sdbExtConns, 1); atomic_fetch_sub_32(&sdbExtConns, 1);
} }
code = 0; code = 0;
...@@ -1227,8 +1227,8 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { ...@@ -1227,8 +1227,8 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
if (msg == NULL) { if (msg == NULL) {
if (pConn) { if (pConn) {
mgmtRemoveConnFromAcct(pConn); mgmtRemoveConnFromAcct(pConn);
__sync_fetch_and_sub(&mgmtShellConns, 1); atomic_fetch_sub_32(&mgmtShellConns, 1);
__sync_fetch_and_sub(&sdbExtConns, 1); atomic_fetch_sub_32(&sdbExtConns, 1);
mTrace("connection from %s is closed", pConn->pUser->user); mTrace("connection from %s is closed", pConn->pUser->user);
memset(pConn, 0, sizeof(SConnObj)); memset(pConn, 0, sizeof(SConnObj));
} }
......
...@@ -256,7 +256,7 @@ void vnodeUpdateCommitInfo(SMeterObj *pObj, int slot, int pos, uint64_t count) { ...@@ -256,7 +256,7 @@ void vnodeUpdateCommitInfo(SMeterObj *pObj, int slot, int pos, uint64_t count) {
tslot = (tslot + 1) % pInfo->maxBlocks; tslot = (tslot + 1) % pInfo->maxBlocks;
} }
__sync_fetch_and_add(&pObj->freePoints, pObj->pointsPerBlock * slots); atomic_fetch_add_32(&pObj->freePoints, pObj->pointsPerBlock * slots);
pInfo->commitSlot = slot; pInfo->commitSlot = slot;
pInfo->commitPoint = pos; pInfo->commitPoint = pos;
pObj->commitCount = count; pObj->commitCount = count;
...@@ -505,7 +505,7 @@ int vnodeInsertPointToCache(SMeterObj *pObj, char *pData) { ...@@ -505,7 +505,7 @@ int vnodeInsertPointToCache(SMeterObj *pObj, char *pData) {
pData += pObj->schema[col].bytes; pData += pObj->schema[col].bytes;
} }
__sync_fetch_and_sub(&pObj->freePoints, 1); atomic_fetch_sub_32(&pObj->freePoints, 1);
pCacheBlock->numOfPoints++; pCacheBlock->numOfPoints++;
pPool->count++; pPool->count++;
...@@ -1114,7 +1114,7 @@ int vnodeSyncRestoreCache(int vnode, int fd) { ...@@ -1114,7 +1114,7 @@ int vnodeSyncRestoreCache(int vnode, int fd) {
for (int col = 0; col < pObj->numOfColumns; ++col) for (int col = 0; col < pObj->numOfColumns; ++col)
if (taosReadMsg(fd, pBlock->offset[col], pObj->schema[col].bytes * points) <= 0) return -1; if (taosReadMsg(fd, pBlock->offset[col], pObj->schema[col].bytes * points) <= 0) return -1;
__sync_fetch_and_sub(&pObj->freePoints, points); atomic_fetch_sub_32(&pObj->freePoints, points);
blocksReceived++; blocksReceived++;
pointsReceived += points; pointsReceived += points;
pObj->lastKey = *((TSKEY *)(pBlock->offset[0] + pObj->schema[0].bytes * (points - 1))); pObj->lastKey = *((TSKEY *)(pBlock->offset[0] + pObj->schema[0].bytes * (points - 1)));
......
...@@ -410,7 +410,7 @@ void vnodeRemoveFile(int vnode, int fileId) { ...@@ -410,7 +410,7 @@ void vnodeRemoveFile(int vnode, int fileId) {
int fd = open(headName, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); int fd = open(headName, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (fd > 0) { if (fd > 0) {
vnodeGetHeadFileHeaderInfo(fd, &headInfo); vnodeGetHeadFileHeaderInfo(fd, &headInfo);
__sync_fetch_and_add(&(pVnode->vnodeStatistic.totalStorage), -headInfo.totalStorage); atomic_fetch_add_64(&(pVnode->vnodeStatistic.totalStorage), -headInfo.totalStorage);
close(fd); close(fd);
} }
......
...@@ -497,7 +497,7 @@ int vnodeImportToFile(SImportInfo *pImport) { ...@@ -497,7 +497,7 @@ int vnodeImportToFile(SImportInfo *pImport) {
pInfo->commitPoint = 0; pInfo->commitPoint = 0;
pCacheBlock->numOfPoints = points; pCacheBlock->numOfPoints = points;
if (slot == pInfo->currentSlot) { if (slot == pInfo->currentSlot) {
__sync_fetch_and_add(&pObj->freePoints, pInfo->commitPoint); atomic_fetch_add_32(&pObj->freePoints, pInfo->commitPoint);
} }
} else { } else {
// if last block is full and committed // if last block is full and committed
...@@ -625,7 +625,7 @@ int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) { ...@@ -625,7 +625,7 @@ int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) {
} }
code = 0; code = 0;
__sync_fetch_and_sub(&pObj->freePoints, rows); atomic_fetch_sub_32(&pObj->freePoints, rows);
dTrace("vid:%d sid:%d id:%s, %d rows data are imported to cache", pObj->vnode, pObj->sid, pObj->meterId, rows); dTrace("vid:%d sid:%d id:%s, %d rows data are imported to cache", pObj->vnode, pObj->sid, pObj->meterId, rows);
_exit: _exit:
......
...@@ -643,8 +643,8 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -643,8 +643,8 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pData += pObj->bytesPerPoint; pData += pObj->bytesPerPoint;
points++; points++;
} }
__sync_fetch_and_add(&(pVnode->vnodeStatistic.pointsWritten), points * (pObj->numOfColumns - 1)); atomic_fetch_add_64(&(pVnode->vnodeStatistic.pointsWritten), points * (pObj->numOfColumns - 1));
__sync_fetch_and_add(&(pVnode->vnodeStatistic.totalStorage), points * pObj->bytesPerPoint); atomic_fetch_add_64(&(pVnode->vnodeStatistic.totalStorage), points * pObj->bytesPerPoint);
pthread_mutex_lock(&(pVnode->vmutex)); pthread_mutex_lock(&(pVnode->vmutex));
......
...@@ -3890,14 +3890,14 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { ...@@ -3890,14 +3890,14 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter;
if (pSupporter == NULL || pSupporter->numOfMeters == 1) { if (pSupporter == NULL || pSupporter->numOfMeters == 1) {
__sync_fetch_and_sub(&pQInfo->pObj->numOfQueries, 1); atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1);
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode, dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode,
pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries); pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries);
} else { } else {
int32_t num = 0; int32_t num = 0;
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) { for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {
SMeterObj *pMeter = getMeterObj(pSupporter->pMeterObj, pSupporter->pSidSet->pSids[i]->sid); SMeterObj *pMeter = getMeterObj(pSupporter->pMeterObj, pSupporter->pSidSet->pSids[i]->sid);
__sync_fetch_and_sub(&(pMeter->numOfQueries), 1); atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
if (pMeter->numOfQueries > 0) { if (pMeter->numOfQueries > 0) {
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pMeter->vnode, pMeter->sid, dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pMeter->vnode, pMeter->sid,
......
...@@ -365,7 +365,7 @@ _query_over: ...@@ -365,7 +365,7 @@ _query_over:
vnodeFreeColumnInfo(&pQueryMsg->colList[i]); vnodeFreeColumnInfo(&pQueryMsg->colList[i]);
} }
__sync_fetch_and_add(&vnodeSelectReqNum, 1); atomic_fetch_add_32(&vnodeSelectReqNum, 1);
return ret; return ret;
} }
...@@ -599,6 +599,6 @@ _submit_over: ...@@ -599,6 +599,6 @@ _submit_over:
// for import, send the submit response only when return code is not zero // for import, send the submit response only when return code is not zero
if (pSubmit->import == 0 || code != 0) ret = vnodeSendShellSubmitRspMsg(pObj, code, numOfTotalPoints); if (pSubmit->import == 0 || code != 0) ret = vnodeSendShellSubmitRspMsg(pObj, code, numOfTotalPoints);
__sync_fetch_and_add(&vnodeInsertReqNum, 1); atomic_fetch_add_32(&vnodeInsertReqNum, 1);
return ret; return ret;
} }
...@@ -351,7 +351,7 @@ void vnodeCalcOpenVnodes() { ...@@ -351,7 +351,7 @@ void vnodeCalcOpenVnodes() {
openVnodes++; openVnodes++;
} }
__sync_val_compare_and_swap(&tsOpenVnodes, tsOpenVnodes, openVnodes); atomic_store_32(&tsOpenVnodes, openVnodes);
} }
void vnodeUpdateHeadFile(int vnode, int oldTables, int newTables) { void vnodeUpdateHeadFile(int vnode, int oldTables, int newTables) {
......
...@@ -567,7 +567,7 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid ...@@ -567,7 +567,7 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
* check if the numOfQueries is 0 or not. * check if the numOfQueries is 0 or not.
*/ */
pMeterObjList[(*numOfInc)++] = pMeter; pMeterObjList[(*numOfInc)++] = pMeter;
__sync_fetch_and_add(&pMeter->numOfQueries, 1); atomic_fetch_add_32(&pMeter->numOfQueries, 1);
// output for meter more than one query executed // output for meter more than one query executed
if (pMeter->numOfQueries > 1) { if (pMeter->numOfQueries > 1) {
...@@ -591,7 +591,7 @@ void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList, ...@@ -591,7 +591,7 @@ void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList,
SMeterObj* pMeter = pMeterObjList[i]; SMeterObj* pMeter = pMeterObjList[i];
if (pMeter != NULL) { // here, do not need to lock to perform operations if (pMeter != NULL) { // here, do not need to lock to perform operations
__sync_fetch_and_sub(&pMeter->numOfQueries, 1); atomic_fetch_sub_32(&pMeter->numOfQueries, 1);
if (pMeter->numOfQueries > 0) { if (pMeter->numOfQueries > 0) {
dTrace("qmsg:%p, vid:%d sid:%d id:%s dec query ref, numOfQueries:%d", pQueryMsg, pMeter->vnode, pMeter->sid, dTrace("qmsg:%p, vid:%d sid:%d id:%s dec query ref, numOfQueries:%d", pQueryMsg, pMeter->vnode, pMeter->sid,
...@@ -646,7 +646,7 @@ void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj) { ...@@ -646,7 +646,7 @@ void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj) {
} }
int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state) { int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state) {
return __sync_val_compare_and_swap(&pMeterObj->state, TSDB_METER_STATE_READY, state); return atomic_val_compare_exchange_32(&pMeterObj->state, TSDB_METER_STATE_READY, state);
} }
void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state) { void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state) {
......
...@@ -516,7 +516,7 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k ...@@ -516,7 +516,7 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k
pNewNode->addTime = taosGetTimestampMs(); pNewNode->addTime = taosGetTimestampMs();
pNewNode->time = pNewNode->addTime + keepTime; pNewNode->time = pNewNode->addTime + keepTime;
__sync_add_and_fetch_32(&pNewNode->refCount, 1); atomic_add_fetch_32(&pNewNode->refCount, 1);
// the address of this node may be changed, so the prev and next element should update the corresponding pointer // the address of this node may be changed, so the prev and next element should update the corresponding pointer
taosUpdateInHashTable(pObj, pNewNode); taosUpdateInHashTable(pObj, pNewNode);
...@@ -529,7 +529,7 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k ...@@ -529,7 +529,7 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k
return NULL; return NULL;
} }
__sync_add_and_fetch_32(&pNewNode->refCount, 1); atomic_add_fetch_32(&pNewNode->refCount, 1);
assert(hashVal == (*pObj->hashFp)(key, keyLen - 1)); assert(hashVal == (*pObj->hashFp)(key, keyLen - 1));
pNewNode->hashVal = hashVal; pNewNode->hashVal = hashVal;
...@@ -558,7 +558,7 @@ static FORCE_INLINE SDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *key, ui ...@@ -558,7 +558,7 @@ static FORCE_INLINE SDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *key, ui
return NULL; return NULL;
} }
__sync_add_and_fetch_32(&pNode->refCount, 1); atomic_add_fetch_32(&pNode->refCount, 1);
pNode->hashVal = (*pObj->hashFp)(key, keyLen - 1); pNode->hashVal = (*pObj->hashFp)(key, keyLen - 1);
taosAddNodeToHashTable(pObj, pNode); taosAddNodeToHashTable(pObj, pNode);
...@@ -616,7 +616,7 @@ static FORCE_INLINE void taosDecRef(SDataNode *pNode) { ...@@ -616,7 +616,7 @@ static FORCE_INLINE void taosDecRef(SDataNode *pNode) {
} }
if (pNode->refCount > 0) { if (pNode->refCount > 0) {
__sync_sub_and_fetch_32(&pNode->refCount, 1); atomic_sub_fetch_32(&pNode->refCount, 1);
pTrace("key:%s is released by app.refcnt:%d", pNode->key, pNode->refCount); pTrace("key:%s is released by app.refcnt:%d", pNode->key, pNode->refCount);
} else { } else {
/* /*
...@@ -676,20 +676,20 @@ void *taosGetDataFromCache(void *handle, char *key) { ...@@ -676,20 +676,20 @@ void *taosGetDataFromCache(void *handle, char *key) {
SDataNode *ptNode = taosGetNodeFromHashTable(handle, key, keyLen); SDataNode *ptNode = taosGetNodeFromHashTable(handle, key, keyLen);
if (ptNode != NULL) { if (ptNode != NULL) {
__sync_add_and_fetch_32(&ptNode->refCount, 1); atomic_add_fetch_32(&ptNode->refCount, 1);
} }
__cache_unlock(pObj); __cache_unlock(pObj);
if (ptNode != NULL) { if (ptNode != NULL) {
__sync_add_and_fetch_32(&pObj->statistics.hitCount, 1); atomic_add_fetch_32(&pObj->statistics.hitCount, 1);
pTrace("key:%s is retrieved from cache,refcnt:%d", key, ptNode->refCount); pTrace("key:%s is retrieved from cache,refcnt:%d", key, ptNode->refCount);
} else { } else {
__sync_add_and_fetch_32(&pObj->statistics.missCount, 1); atomic_add_fetch_32(&pObj->statistics.missCount, 1);
pTrace("key:%s not in cache,retrieved failed", key); pTrace("key:%s not in cache,retrieved failed", key);
} }
__sync_add_and_fetch_32(&pObj->statistics.totalAccess, 1); atomic_add_fetch_32(&pObj->statistics.totalAccess, 1);
return (ptNode != NULL) ? ptNode->data : NULL; return (ptNode != NULL) ? ptNode->data : NULL;
} }
......
...@@ -60,7 +60,7 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) { ...@@ -60,7 +60,7 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
strcat(tmpPath, fileNamePrefix); strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%u-%u"); strcat(tmpPath, "-%u-%u");
snprintf(dstPath, MAX_TMPFILE_PATH_LENGTH, tmpPath, taosGetPthreadId(), __sync_add_and_fetch_32(&tmpFileSerialNum, 1)); snprintf(dstPath, MAX_TMPFILE_PATH_LENGTH, tmpPath, taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1));
} }
/* /*
......
...@@ -364,7 +364,7 @@ void tsReadLogOption(char *option, char *value) { ...@@ -364,7 +364,7 @@ void tsReadLogOption(char *option, char *value) {
} }
} }
SGlobalConfig *tsGetConfigOption(char *option) { SGlobalConfig *tsGetConfigOption(const char *option) {
tsInitGlobalConfig(); tsInitGlobalConfig();
for (int i = 0; i < tsGlobalConfigNum; ++i) { for (int i = 0; i < tsGlobalConfigNum; ++i) {
SGlobalConfig *cfg = tsGlobalConfig + i; SGlobalConfig *cfg = tsGlobalConfig + i;
...@@ -374,7 +374,7 @@ SGlobalConfig *tsGetConfigOption(char *option) { ...@@ -374,7 +374,7 @@ SGlobalConfig *tsGetConfigOption(char *option) {
return NULL; return NULL;
} }
void tsReadConfigOption(char *option, char *value) { void tsReadConfigOption(const char *option, char *value) {
for (int i = 0; i < tsGlobalConfigNum; ++i) { for (int i = 0; i < tsGlobalConfigNum; ++i) {
SGlobalConfig *cfg = tsGlobalConfig + i; SGlobalConfig *cfg = tsGlobalConfig + i;
if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_CONFIG)) continue; if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_CONFIG)) continue;
...@@ -423,9 +423,7 @@ void tsInitConfigOption(SGlobalConfig *cfg, char *name, void *ptr, int8_t valTyp ...@@ -423,9 +423,7 @@ void tsInitConfigOption(SGlobalConfig *cfg, char *name, void *ptr, int8_t valTyp
cfg->cfgStatus = TSDB_CFG_CSTATUS_NONE; cfg->cfgStatus = TSDB_CFG_CSTATUS_NONE;
} }
void tsInitGlobalConfig() { static void doInitGlobalConfig() {
if (tsGlobalConfig != NULL) return;
tsGlobalConfig = (SGlobalConfig *) malloc(sizeof(SGlobalConfig) * TSDB_CFG_MAX_NUM); tsGlobalConfig = (SGlobalConfig *) malloc(sizeof(SGlobalConfig) * TSDB_CFG_MAX_NUM);
memset(tsGlobalConfig, 0, sizeof(SGlobalConfig) * TSDB_CFG_MAX_NUM); memset(tsGlobalConfig, 0, sizeof(SGlobalConfig) * TSDB_CFG_MAX_NUM);
...@@ -783,6 +781,11 @@ void tsInitGlobalConfig() { ...@@ -783,6 +781,11 @@ void tsInitGlobalConfig() {
tsGlobalConfigNum = (int)(cfg - tsGlobalConfig); tsGlobalConfigNum = (int)(cfg - tsGlobalConfig);
} }
static pthread_once_t initGlobalConfig = PTHREAD_ONCE_INIT;
void tsInitGlobalConfig() {
pthread_once(&initGlobalConfig, doInitGlobalConfig);
}
void tsReadGlobalLogConfig() { void tsReadGlobalLogConfig() {
tsInitGlobalConfig(); tsInitGlobalConfig();
......
...@@ -381,7 +381,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...) ...@@ -381,7 +381,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...)
} }
if (taosLogMaxLines > 0) { if (taosLogMaxLines > 0) {
__sync_add_and_fetch_32(&taosLogLines, 1); atomic_add_fetch_32(&taosLogLines, 1);
if ((taosLogLines > taosLogMaxLines) && (openInProgress == 0)) taosOpenNewLogFile(); if ((taosLogLines > taosLogMaxLines) && (openInProgress == 0)) taosOpenNewLogFile();
} }
...@@ -458,7 +458,7 @@ void taosPrintLongString(const char *const flags, int dflag, const char *const f ...@@ -458,7 +458,7 @@ void taosPrintLongString(const char *const flags, int dflag, const char *const f
taosPushLogBuffer(logHandle, buffer, len); taosPushLogBuffer(logHandle, buffer, len);
if (taosLogMaxLines > 0) { if (taosLogMaxLines > 0) {
__sync_add_and_fetch_32(&taosLogLines, 1); atomic_add_fetch_32(&taosLogLines, 1);
if ((taosLogLines > taosLogMaxLines) && (openInProgress == 0)) taosOpenNewLogFile(); if ((taosLogLines > taosLogMaxLines) && (openInProgress == 0)) taosOpenNewLogFile();
} }
......
...@@ -105,15 +105,15 @@ static timer_map_t timerMap; ...@@ -105,15 +105,15 @@ static timer_map_t timerMap;
static uintptr_t getNextTimerId() { static uintptr_t getNextTimerId() {
uintptr_t id; uintptr_t id;
do { do {
id = __sync_add_and_fetch_ptr(&nextTimerId, 1); id = atomic_add_fetch_ptr(&nextTimerId, 1);
} while (id == 0); } while (id == 0);
return id; return id;
} }
static void timerAddRef(tmr_obj_t* timer) { __sync_add_and_fetch_8(&timer->refCount, 1); } static void timerAddRef(tmr_obj_t* timer) { atomic_add_fetch_8(&timer->refCount, 1); }
static void timerDecRef(tmr_obj_t* timer) { static void timerDecRef(tmr_obj_t* timer) {
if (__sync_sub_and_fetch_8(&timer->refCount, 1) == 0) { if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
free(timer); free(timer);
} }
} }
...@@ -121,7 +121,7 @@ static void timerDecRef(tmr_obj_t* timer) { ...@@ -121,7 +121,7 @@ static void timerDecRef(tmr_obj_t* timer) {
static void lockTimerList(timer_list_t* list) { static void lockTimerList(timer_list_t* list) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetPthreadId();
int i = 0; int i = 0;
while (__sync_val_compare_and_swap_64(&(list->lockedBy), 0, tid) != 0) { while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
if (++i % 1000 == 0) { if (++i % 1000 == 0) {
sched_yield(); sched_yield();
} }
...@@ -130,7 +130,7 @@ static void lockTimerList(timer_list_t* list) { ...@@ -130,7 +130,7 @@ static void lockTimerList(timer_list_t* list) {
static void unlockTimerList(timer_list_t* list) { static void unlockTimerList(timer_list_t* list) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetPthreadId();
if (__sync_val_compare_and_swap_64(&(list->lockedBy), tid, 0) != tid) { if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
assert(false); assert(false);
tmrError("%d trying to unlock a timer list not locked by current thread.", tid); tmrError("%d trying to unlock a timer list not locked by current thread.", tid);
} }
...@@ -257,7 +257,7 @@ static bool removeFromWheel(tmr_obj_t* timer) { ...@@ -257,7 +257,7 @@ static bool removeFromWheel(tmr_obj_t* timer) {
static void processExpiredTimer(void* handle, void* arg) { static void processExpiredTimer(void* handle, void* arg) {
tmr_obj_t* timer = (tmr_obj_t*)handle; tmr_obj_t* timer = (tmr_obj_t*)handle;
timer->executedBy = taosGetPthreadId(); timer->executedBy = taosGetPthreadId();
uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED); uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
if (state == TIMER_STATE_WAITING) { if (state == TIMER_STATE_WAITING) {
const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] execution start."; const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] execution start.";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param); tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
...@@ -431,7 +431,7 @@ bool taosTmrStop(tmr_h timerId) { ...@@ -431,7 +431,7 @@ bool taosTmrStop(tmr_h timerId) {
return false; return false;
} }
uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED); uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
doStopTimer(timer, state); doStopTimer(timer, state);
timerDecRef(timer); timerDecRef(timer);
...@@ -456,7 +456,7 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle, ...@@ -456,7 +456,7 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle,
if (timer == NULL) { if (timer == NULL) {
tmrTrace("%s timer[id=%lld] does not exist", ctrl->label, id); tmrTrace("%s timer[id=%lld] does not exist", ctrl->label, id);
} else { } else {
uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED); uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
if (!doStopTimer(timer, state)) { if (!doStopTimer(timer, state)) {
timerDecRef(timer); timerDecRef(timer);
timer = NULL; timer = NULL;
......
文件模式从 100644 更改为 100755
...@@ -7,8 +7,15 @@ ...@@ -7,8 +7,15 @@
#include <lualib.h> #include <lualib.h>
#include <taos.h> #include <taos.h>
static int l_connect(lua_State *L) struct cb_param{
{ lua_State* state;
int callback;
void * stream;
};
static int l_connect(lua_State *L){
TAOS * taos; TAOS * taos;
char *host = lua_tostring(L, 1); char *host = lua_tostring(L, 1);
char *user = lua_tostring(L, 2); char *user = lua_tostring(L, 2);
...@@ -29,6 +36,7 @@ static int l_connect(lua_State *L) ...@@ -29,6 +36,7 @@ static int l_connect(lua_State *L)
lua_pushstring(L, taos_errstr(taos)); lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error"); lua_setfield(L, table_index, "error");
lua_pushlightuserdata(L,NULL); lua_pushlightuserdata(L,NULL);
lua_setfield(L, table_index, "conn");
}else{ }else{
printf("success to connect server\n"); printf("success to connect server\n");
lua_pushnumber(L, 0); lua_pushnumber(L, 0);
...@@ -49,7 +57,7 @@ static int l_query(lua_State *L){ ...@@ -49,7 +57,7 @@ static int l_query(lua_State *L){
lua_newtable(L); lua_newtable(L);
int table_index = lua_gettop(L); int table_index = lua_gettop(L);
printf("receive command:%s\r\n",s); // printf("receive command:%s\r\n",s);
if(taos_query(taos, s)!=0){ if(taos_query(taos, s)!=0){
printf("failed, reason:%s\n", taos_errstr(taos)); printf("failed, reason:%s\n", taos_errstr(taos));
lua_pushnumber(L, -1); lua_pushnumber(L, -1);
...@@ -78,8 +86,12 @@ static int l_query(lua_State *L){ ...@@ -78,8 +86,12 @@ static int l_query(lua_State *L){
TAOS_FIELD *fields = taos_fetch_fields(result); TAOS_FIELD *fields = taos_fetch_fields(result);
char temp[256]; char temp[256];
int affectRows = taos_affected_rows(taos);
// printf(" affect rows:%d\r\n", affectRows);
lua_pushnumber(L, 0); lua_pushnumber(L, 0);
lua_setfield(L, table_index, "code"); lua_setfield(L, table_index, "code");
lua_pushinteger(L, affectRows);
lua_setfield(L, table_index, "affected");
lua_newtable(L); lua_newtable(L);
while ((row = taos_fetch_row(result))) { while ((row = taos_fetch_row(result))) {
...@@ -95,7 +107,7 @@ static int l_query(lua_State *L){ ...@@ -95,7 +107,7 @@ static int l_query(lua_State *L){
} }
lua_pushstring(L,fields[i].name); lua_pushstring(L,fields[i].name);
//printf("field name:%s,type:%d\n",fields[i].name,fields[i].type);
switch (fields[i].type) { switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
lua_pushinteger(L,*((char *)row[i])); lua_pushinteger(L,*((char *)row[i]));
...@@ -142,6 +154,115 @@ static int l_query(lua_State *L){ ...@@ -142,6 +154,115 @@ static int l_query(lua_State *L){
return 1; return 1;
} }
void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
struct cb_param* p = (struct cb_param*) param;
TAOS_FIELD *fields = taos_fetch_fields(result);
int numFields = taos_num_fields(result);
printf("\n\r-----------------------------------------------------------------------------------\n");
// printf("r:%d, L:%d\n",p->callback, p->state);
lua_State *L = p->state;
lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback);
lua_newtable(L);
for (int i = 0; i < numFields; ++i) {
if (row[i] == NULL) {
continue;
}
lua_pushstring(L,fields[i].name);
switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT:
lua_pushinteger(L,*((char *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
lua_pushinteger(L,*((short *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
lua_pushinteger(L,*((int *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
lua_pushinteger(L,*((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
lua_pushnumber(L,*((float *)row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
lua_pushnumber(L,*((double *)row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
lua_pushstring(L,(char *)row[i]);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
lua_pushinteger(L,*((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_BOOL:
lua_pushinteger(L,*((char *)row[i]));
break;
default:
lua_pushnil(L);
break;
}
lua_settable(L, -3);
}
lua_call(L, 1, 0);
printf("-----------------------------------------------------------------------------------\n\r");
}
static int l_open_stream(lua_State *L){
int r = luaL_ref(L, LUA_REGISTRYINDEX);
TAOS * taos = lua_topointer(L,1);
char * sqlstr = lua_tostring(L,2);
int stime = luaL_checknumber(L,3);
lua_newtable(L);
int table_index = lua_gettop(L);
struct cb_param *p = malloc(sizeof(struct cb_param));
p->state = L;
p->callback=r;
// printf("r:%d, L:%d\n",r,L);
void * s = taos_open_stream(taos,sqlstr,stream_cb,stime,p,NULL);
if (s == NULL) {
printf("failed to open stream, reason:%s\n", taos_errstr(taos));
free(p);
lua_pushnumber(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
lua_pushlightuserdata(L,NULL);
lua_setfield(L, table_index, "stream");
}else{
// printf("success to open stream\n");
lua_pushnumber(L, 0);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
p->stream = s;
lua_pushlightuserdata(L,p);
lua_setfield(L, table_index, "stream");//stream has different content in lua and c.
}
return 1;
}
static int l_close_stream(lua_State *L){
//TODO:get stream and free cb_param
struct cb_param *p = lua_touserdata(L,1);
taos_close_stream(p->stream);
free(p);
return 0;
}
static int l_close(lua_State *L){ static int l_close(lua_State *L){
TAOS * taos= lua_topointer(L,1); TAOS * taos= lua_topointer(L,1);
lua_newtable(L); lua_newtable(L);
...@@ -166,6 +287,8 @@ static const struct luaL_Reg lib[] = { ...@@ -166,6 +287,8 @@ static const struct luaL_Reg lib[] = {
{"connect", l_connect}, {"connect", l_connect},
{"query", l_query}, {"query", l_query},
{"close", l_close}, {"close", l_close},
{"open_stream", l_open_stream},
{"close_stream", l_close_stream},
{NULL, NULL} {NULL, NULL}
}; };
......
...@@ -35,10 +35,12 @@ if res.code ~=0 then ...@@ -35,10 +35,12 @@ if res.code ~=0 then
return return
end end
res = driver.query(conn,"insert into m1 values (1592222222222,0,'robotspace'), (1592222222223,1,'Hilink'),(1592222222224,2,'Harmony')") res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001',0,'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')")
if res.code ~=0 then if res.code ~=0 then
print(res.error) print(res.error)
return return
else
print("insert successfully, affected:"..res.affected)
end end
res = driver.query(conn,"select * from m1") res = driver.query(conn,"select * from m1")
...@@ -55,4 +57,69 @@ else ...@@ -55,4 +57,69 @@ else
end end
end end
res = driver.query(conn,"CREATE TABLE thermometer (ts timestamp, degree double) TAGS(location binary(20), type int)")
if res.code ~=0 then
print(res.error)
return
end
res = driver.query(conn,"CREATE TABLE therm1 USING thermometer TAGS ('beijing', 1)")
if res.code ~=0 then
print(res.error)
return
end
res = driver.query(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.001', 20),('2019-09-01 00:00:00.002', 21)")
if res.code ~=0 then
print(res.error)
return
else
print("insert successfully, affected:"..res.affected)
end
res = driver.query(conn,"SELECT COUNT(*) count, AVG(degree) AS av, MAX(degree), MIN(degree) FROM thermometer WHERE location='beijing' or location='tianjin' GROUP BY location, type")
if res.code ~=0 then
print("select error:"..res.error)
return
else
print("in lua, result:")
for i = 1, #(res.item) do
print("res:"..res.item[i].count)
end
end
function callback(t)
print("continuous query result:")
for key, value in pairs(t) do
print("key:"..key..", value:"..value)
end
end
local stream
res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0,callback)
if res.code ~=0 then
print("open stream error:"..res.error)
return
else
print("openstream ok")
stream = res.stream
end
--From now on we begin continous query in an definite (infinite if you want) loop.
local loop_index = 0
while loop_index < 20 do
local t = os.time()*1000
local v = loop_index
res = driver.query(conn,string.format("INSERT INTO therm1 VALUES (%d, %d)",t,v))
if res.code ~=0 then
print(res.error)
return
else
print("insert successfully, affected:"..res.affected)
end
os.execute("sleep " .. 1)
loop_index = loop_index + 1
end
driver.close_stream(stream)
driver.close(conn) driver.close(conn)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册