提交 41176072 编写于 作者: S slguan

Merge remote-tracking branch 'origin/develop' into feature/mergecode

# Conflicts:
#	src/client/src/TSDBJNIConnector.c
......@@ -38,21 +38,56 @@ IF (NOT DEFINED TD_CLUSTER)
# Set macro definitions according to os platform
SET(TD_LINUX_64 FALSE)
SET(TD_LINUX_32 FALSE)
SET(TD_ARM FALSE)
SET(TD_ARM_64 FALSE)
SET(TD_ARM_32 FALSE)
SET(TD_MIPS_64 FALSE)
SET(TD_DARWIN_64 FALSE)
SET(TD_WINDOWS_64 FALSE)
# if generate ARM version:
# cmake -DARMVER=arm32 .. or cmake -DARMVER=arm64
IF (${ARMVER} MATCHES "arm32")
SET(TD_ARM TRUE)
SET(TD_ARM_32 TRUE)
ADD_DEFINITIONS(-D_TD_ARM_)
ADD_DEFINITIONS(-D_TD_ARM_32_)
ELSEIF (${ARMVER} MATCHES "arm64")
SET(TD_ARM TRUE)
SET(TD_ARM_64 TRUE)
ADD_DEFINITIONS(-D_TD_ARM_)
ADD_DEFINITIONS(-D_TD_ARM_64_)
ENDIF ()
IF (TD_ARM)
ADD_DEFINITIONS(-D_TD_ARM_)
IF (TD_ARM_32)
ADD_DEFINITIONS(-D_TD_ARM_32_)
ELSEIF (TD_ARM_64)
ADD_DEFINITIONS(-D_TD_ARM_64_)
ELSE ()
EXIT ()
ENDIF ()
ENDIF ()
IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
IF (${CMAKE_SIZEOF_VOID_P} MATCHES 8)
SET(TD_LINUX_64 TRUE)
SET(TD_OS_DIR ${TD_COMMUNITY_DIR}/src/os/linux)
ADD_DEFINITIONS(-D_M_X64)
MESSAGE(STATUS "The current platform is Linux 64-bit")
ELSE ()
ELSEIF (${CMAKE_SIZEOF_VOID_P} MATCHES 4)
IF (TD_ARM)
SET(TD_LINUX_32 TRUE)
MESSAGE(FATAL_ERROR "The current platform is Linux 32-bit, not supported yet")
SET(TD_OS_DIR ${TD_COMMUNITY_DIR}/src/os/linux)
#ADD_DEFINITIONS(-D_M_IX86)
MESSAGE(STATUS "The current platform is Linux 32-bit")
ELSE ()
MESSAGE(FATAL_ERROR "The current platform is Linux 32-bit, but no ARM not supported yet")
EXIT ()
ENDIF ()
ELSE ()
MESSAGE(FATAL_ERROR "The current platform is Linux neither 32-bit nor 64-bit, not supported yet")
EXIT ()
ENDIF ()
ELSEIF (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
......@@ -102,13 +137,27 @@ IF (NOT DEFINED TD_CLUSTER)
IF (TD_LINUX_64)
SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0")
IF (NOT TD_ARM)
IF (${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -Wno-char-subscripts -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ELSE ()
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -Wno-char-subscripts -malign-stringops -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
ELSE ()
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -Wno-char-subscripts -fsigned-char -munaligned-access -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
ADD_DEFINITIONS(-DLINUX)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
ELSEIF (TD_LINUX_32)
IF (NOT TD_ARM)
EXIT ()
ENDIF ()
SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0")
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -Wno-char-subscripts -fsigned-char -munaligned-access -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ADD_DEFINITIONS(-DLINUX)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
ADD_DEFINITIONS(-DUSE_LIBICONV)
ELSEIF (TD_WINDOWS_64)
SET(CMAKE_GENERATOR "NMake Makefiles" CACHE INTERNAL "" FORCE)
SET(COMMON_FLAGS "/nologo /WX- /Oi /Oy- /Gm- /EHsc /MT /GS /Gy /fp:precise /Zc:wchar_t /Zc:forScope /Gd /errorReport:prompt /analyze-")
......@@ -169,6 +218,14 @@ IF (NOT DEFINED TD_CLUSTER)
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR})")
ELSEIF (TD_LINUX_32)
IF (NOT TD_ARM)
EXIT ()
ENDIF ()
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR})")
ELSEIF (TD_WINDOWS_64)
SET(CMAKE_INSTALL_PREFIX C:/TDengine)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/go DESTINATION connector)
......
......@@ -8,7 +8,7 @@ C/C++ APIs are similar to the MySQL APIs. Applications should include TDengine h
```C
#include <taos.h>
```
Make sure TDengine library _libtaos.so_ is installed and use _-ltaos_ option to link the library when compiling. The return values of all APIs are _-1_ or _NULL_ for failure.
Make sure TDengine library _libtaos.so_ is installed and use _-ltaos_ option to link the library when compiling. In most cases, if the return value of an API is integer, it return _0_ for success and other values as an error code for failure; if the return value is pointer, then _NULL_ is used for failure.
### C/C++ sync API
......@@ -78,6 +78,51 @@ The 12 APIs are the most important APIs frequently used. Users can check _taos.h
**Note**: The connection to a TDengine server is not multi-thread safe. So a connection can only be used by one thread.
### C/C++ parameter binding API
TDengine also provides parameter binding APIs, like MySQL, only question mark `?` can be used to represent a parameter in these APIs.
- `TAOS_STMT* taos_stmt_init(TAOS *taos)`
Create a TAOS_STMT to represent the prepared statement for other APIs.
- `int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length)`
Parse SQL statement _sql_ and bind result to _stmt_ , if _length_ larger than 0, its value is used to determine the length of _sql_, the API auto detects the actual length of _sql_ otherwise.
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind)`
Bind values to parameters. _bind_ points to an array, the element count and sequence of the array must be identical as the parameters of the SQL statement. The usage of _TAOS_BIND_ is same as _MYSQL_BIND_ in MySQL, its definition is as below:
```c
typedef struct TAOS_BIND {
int buffer_type;
void * buffer;
unsigned long buffer_length; // not used in TDengine
unsigned long *length;
int * is_null;
int is_unsigned; // not used in TDengine
int * error; // not used in TDengine
} TAOS_BIND;
```
- `int taos_stmt_add_batch(TAOS_STMT *stmt)`
Add bound parameters to batch, client can call `taos_stmt_bind_param` again after calling this API. Note this API only support _insert_ / _import_ statements, it returns an error in other cases.
- `int taos_stmt_execute(TAOS_STMT *stmt)`
Execute the prepared statement. This API can only be called once for a statement at present.
- `TAOS_RES* taos_stmt_use_result(TAOS_STMT *stmt)`
Acquire the result set of an executed statement. The usage of the result is same as `taos_use_result`, `taos_free_result` must be called after one you are done with the result set to release resources.
- `int taos_stmt_close(TAOS_STMT *stmt)`
Close the statement, release all resources.
### C/C++ async API
In addition to sync APIs, TDengine also provides async APIs, which are more efficient. Async APIs are returned right away without waiting for a response from the server, allowing the application to continute with other tasks without blocking. So async APIs are more efficient, especially useful when in a poor network.
......
......@@ -4,13 +4,13 @@ TDengine提供了丰富的应用程序开发接口,其中包括C/C++、JAVA、
## C/C++ Connector
C/C++的API类似于MySQL的C API。应用程序使用时,需要包含TDengine头文件 _taos.h_(安装后,位于_/usr/local/taos/include_):
C/C++的API类似于MySQL的C API。应用程序使用时,需要包含TDengine头文件 _taos.h_(安装后,位于 _/usr/local/taos/include_):
```C
#include <taos.h>
```
在编译时需要链接TDengine动态库_libtaos.so_(安装后,位于/usr/local/taos/driver,gcc编译时,请加上 -ltaos)。 所有API都以返回_-1_或_NULL_均表示失败。
在编译时需要链接TDengine动态库 _libtaos.so_ (安装后,位于 _/usr/local/taos/driver_,gcc编译时,请加上 -ltaos)。 如未特别说明,当API的返回值是整数时,_0_ 代表成功,其它是代表失败原因的错误码,当返回值是指针时, _NULL_ 表示失败。
### C/C++同步API
......@@ -79,6 +79,51 @@ C/C++的API类似于MySQL的C API。应用程序使用时,需要包含TDengine
**注意**:对于单个数据库连接,在同一时刻只能有一个线程使用该链接调用API,否则会有未定义的行为出现并可能导致客户端crash。客户端应用可以通过建立多个连接进行多线程的数据写入或查询处理。
### C/C++ 参数绑定接口
除了直接调用 `taos_query` 进行查询,TDengine也提供了支持参数绑定的Prepare API,与 MySQL 一样,这些API目前也仅支持用问号`?`来代表待绑定的参数,具体如下:
- `TAOS_STMT* taos_stmt_init(TAOS *taos)`
创建一个 TAOS_STMT 对象用于后续调用。
- `int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length)`
解析一条sql语句,将解析结果和参数信息绑定到stmt上,如果参数length大于0,将使用此此参数作为sql语句的长度,如等于0,将自动判断sql语句的长度。
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind)`
进行参数绑定,bind指向一个数组,需保证此数组的元素数量和顺序与sql语句中的参数完全一致。TAOS_BIND 的使用方法与 MySQL中的 MYSQL_BIND 一致,具体定义如下:
```c
typedef struct TAOS_BIND {
int buffer_type;
void * buffer;
unsigned long buffer_length; // 未实际使用
unsigned long *length;
int * is_null;
int is_unsigned; // 未实际使用
int * error; // 未实际使用
} TAOS_BIND;
```
- `int taos_stmt_add_batch(TAOS_STMT *stmt)`
将当前绑定的参数加入批处理中,调用此函数后,可以再次调用`taos_stmt_bind_param`绑定新的参数。需要注意,此函数仅支持 insert/import 语句,如果是select等其他SQL语句,将返回错误。
- `int taos_stmt_execute(TAOS_STMT *stmt)`
执行准备好的语句。目前,一条语句只能执行一次。
- `TAOS_RES* taos_stmt_use_result(TAOS_STMT *stmt)`
获取语句的结果集。结果集的使用方式与非参数化调用时一致,使用完成后,应对此结果集调用 `taos_free_result`以释放资源。
- `int taos_stmt_close(TAOS_STMT *stmt)`
执行完毕,释放所有资源。
### C/C++异步API
同步API之外,TDengine还提供性能更高的异步调用API处理数据插入、查询操作。在软硬件环境相同的情况下,异步API处理数据插入的速度比同步API快2~4倍。异步API采用非阻塞式的调用方式,在系统真正完成某个具体数据库操作前,立即返回。调用的线程可以去处理其他工作,从而可以提升整个应用的性能。异步API在网络延迟严重的情况下,优点尤为突出。
......
......@@ -7,7 +7,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
AUX_SOURCE_DIRECTORY(./src SRC)
IF (TD_LINUX_64)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux)
# set the static lib name
......
......@@ -479,6 +479,7 @@ void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
int32_t tscBuildResultsForEmptyRetrieval(SSqlObj *pSql);
bool tscIsUpdateQuery(STscObj *pObj);
// transfer SSqlInfo to SqlCmd struct
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
......
......@@ -13,8 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdbool.h>
#include "os.h"
#include "com_taosdata_jdbc_TSDBJNIConnector.h"
#include "taos.h"
......@@ -291,6 +289,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(
return JNI_CONNECTION_NULL;
}
<<<<<<< HEAD
<<<<<<< Updated upstream
int num_fields = taos_field_count(tscon);
if (num_fields != 0) {
......@@ -298,19 +297,27 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(
jniTrace("jobj:%p, taos:%p, get resultset:%p", jobj, tscon, (void *)ret);
return ret;
=======
=======
>>>>>>> origin/develop
jlong ret = 0;
if (tscIsUpdateQuery(tscon)) {
ret = 0; // for update query, no result pointer
<<<<<<< HEAD
jniTrace("jobj:%p, conn:%p, no result", jobj, tscon);
} else {
ret = (jlong) taos_use_result(tscon);
jniTrace("jobj:%p, conn:%p, get resultset:%p", jobj, tscon, (void *) ret);
>>>>>>> Stashed changes
=======
jniTrace("jobj:%p, taos:%p, no result", jobj, tscon);
} else {
ret = (jlong) taos_use_result(tscon);
jniTrace("jobj:%p, taos:%p, get resultset:%p", jobj, tscon, (void *) ret);
>>>>>>> origin/develop
}
jniTrace("jobj:%p, taos:%p, no resultset", jobj, tscon);
return 0;
return ret;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp(JNIEnv *env, jobject jobj, jlong con,
......
......@@ -13,12 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <float.h>
#include <math.h>
#include <stdbool.h>
#include <stdlib.h>
#include "os.h"
#include "taosmsg.h"
#include "tast.h"
......@@ -261,8 +255,7 @@ static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, cha
t0 = tStrGetToken(str, i, false, 0, NULL);
if (t0.n == 0 || t0.type == TK_RP) {
if (pLeft->nodeType != TSQL_NODE_EXPR) {
// if left is not the expr, it is not a legal expr
if (pLeft->nodeType != TSQL_NODE_EXPR) { // if left is not the expr, it is not a legal expr
tSQLSyntaxNodeDestroy(pLeft, NULL);
return NULL;
}
......@@ -326,13 +319,13 @@ static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, cha
pn->colId = -1;
return pn;
} else {
int32_t optr = getBinaryExprOptr(&t0);
if (optr <= 0) {
uint8_t localOptr = getBinaryExprOptr(&t0);
if (localOptr <= 0) {
pError("not support binary operator:%d", t0.type);
return NULL;
}
return parseRemainStr(str, pBinExpr, pSchema, optr, numOfCols, i);
return parseRemainStr(str, pBinExpr, pSchema, localOptr, numOfCols, i);
}
}
......
......@@ -13,8 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include "os.h"
#include "tlog.h"
#include "trpc.h"
......
......@@ -13,14 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "os.h"
#include "tglobalcfg.h"
#include "tlog.h"
......
......@@ -15,16 +15,6 @@
#pragma GCC diagnostic ignored "-Wincompatible-pointer-types"
#include <assert.h>
#include <ctype.h>
#include <fcntl.h>
#include <float.h>
#include <math.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <wctype.h>
#include "os.h"
#include "taosmsg.h"
#include "tast.h"
......@@ -3786,10 +3776,11 @@ static void getStatics_i64(int64_t *primaryKey, int64_t *data, int32_t numOfRow,
static void getStatics_f(int64_t *primaryKey, float *data, int32_t numOfRow, double *min, double *max, double *sum,
int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
*min = DBL_MAX;
*max = -DBL_MAX;
*minIndex = 0;
*maxIndex = 0;
float fmin = DBL_MAX;
float fmax = -DBL_MAX;
float fminIndex = 0;
float fmaxIndex = 0;
double dsum = 0;
assert(numOfRow <= INT16_MAX);
......@@ -3799,15 +3790,19 @@ static void getStatics_f(int64_t *primaryKey, float *data, int32_t numOfRow, dou
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
float fv = 0;
*(int32_t*)(&fv) = *(int32_t*)(&(data[i]));
//*sum += data[i];
dsum += fv;
if (fmin > fv) {
fmin = fv;
fminIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
if (fmax < fv) {
fmax = fv;
fmaxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_FLOAT)) {
......@@ -3819,14 +3814,26 @@ static void getStatics_f(int64_t *primaryKey, float *data, int32_t numOfRow, dou
// lastVal = data[i];
// }
}
double csum = 0;
*(int64_t*)(&csum) = *(int64_t*)sum;
csum += dsum;
*(int64_t*)(sum) = *(int64_t*)(&csum);
*(int32_t*)max = *(int32_t*)(&fmax);
*(int32_t*)min = *(int32_t*)(&fmin);
*(int32_t*)minIndex = *(int32_t*)(&fminIndex);
*(int32_t*)maxIndex = *(int32_t*)(&fmaxIndex);
}
static void getStatics_d(int64_t *primaryKey, double *data, int32_t numOfRow, double *min, double *max, double *sum,
int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
*min = DBL_MAX;
*max = -DBL_MAX;
*minIndex = 0;
*maxIndex = 0;
double dmin = DBL_MAX;
double dmax = -DBL_MAX;
double dminIndex = 0;
double dmaxIndex = 0;
double dsum = 0;
assert(numOfRow <= INT16_MAX);
......@@ -3839,15 +3846,19 @@ static void getStatics_d(int64_t *primaryKey, double *data, int32_t numOfRow, do
continue;
}
*sum += data[i];
if (*min > data[i]) {
*min = data[i];
*minIndex = i;
double dv = 0;
*(int64_t*)(&dv) = *(int64_t*)(&(data[i]));
//*sum += data[i];
dsum += dv;
if (dmin > dv) {
dmin = dv;
dminIndex = i;
}
if (*max < data[i]) {
*max = data[i];
*maxIndex = i;
if (dmax < dv) {
dmax = dv;
dmaxIndex = i;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_DOUBLE)) {
......@@ -3859,6 +3870,16 @@ static void getStatics_d(int64_t *primaryKey, double *data, int32_t numOfRow, do
// lastVal = data[i];
// }
}
double csum = 0;
*(int64_t*)(&csum) = *(int64_t*)sum;
csum += dsum;
*(int64_t*)(sum) = *(int64_t*)(&csum);
*(int64_t*)max = *(int64_t*)(&dmax);
*(int64_t*)min = *(int64_t*)(&dmin);
*(int64_t*)minIndex = *(int64_t*)(&dminIndex);
*(int64_t*)maxIndex = *(int64_t*)(&dmaxIndex);
}
void getStatistics(char *priData, char *data, int32_t size, int32_t numOfRow, int32_t type, int64_t *min, int64_t *max,
......
......@@ -13,13 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <fcntl.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <tsclient.h>
#include <sys/stat.h>
#include "os.h"
#include "tcache.h"
#include "tscJoinProcess.h"
......
......@@ -13,9 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "os.h"
#include "taosmsg.h"
#include "tcache.h"
......
......@@ -21,22 +21,8 @@
#pragma GCC diagnostic ignored "-Woverflow"
#pragma GCC diagnostic ignored "-Wunused-variable"
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <assert.h>
#include <float.h>
#include <math.h>
#include <string.h>
#include <time.h>
#include <wchar.h>
#include "ihash.h"
#include "os.h"
#include "ihash.h"
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
......
......@@ -13,9 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include "taos.h"
#include "tsclient.h"
#include "tsql.h"
......
......@@ -13,9 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include "os.h"
#include "tlog.h"
#include "tsclient.h"
......@@ -23,6 +20,27 @@
#include "ttimer.h"
#include "tutil.h"
void tscSaveSlowQueryFp(void *handle, void *tmrId);
void *tscSlowQueryConn = NULL;
bool tscSlowQueryConnInitialized = false;
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, int port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos);
void tscInitConnCb(void *param, TAOS_RES *result, int code) {
char *sql = param;
if (code < 0) {
tscError("taos:%p, slow query connect failed, code:%d", tscSlowQueryConn, code);
taos_close(tscSlowQueryConn);
tscSlowQueryConn = NULL;
tscSlowQueryConnInitialized = false;
free(sql);
} else {
tscTrace("taos:%p, slow query connect success, code:%d", tscSlowQueryConn, code);
tscSlowQueryConnInitialized = true;
tscSaveSlowQueryFp(sql, NULL);
}
}
void tscAddIntoSqlList(SSqlObj *pSql) {
static uint32_t queryId = 1;
......@@ -47,33 +65,35 @@ void tscAddIntoSqlList(SSqlObj *pSql) {
void tscSaveSlowQueryFpCb(void *param, TAOS_RES *result, int code) {
if (code < 0) {
tscError("failed to save slowquery, code:%d", code);
tscError("failed to save slow query, code:%d", code);
} else {
tscTrace("success to save slow query, code:%d", code);
}
}
void tscSaveSlowQueryFp(void *handle, void *tmrId) {
char *sql = handle;
static void *taos = NULL;
if (taos == NULL) {
taos = taos_connect(NULL, "monitor", tsInternalPass, NULL, 0);
if (taos == NULL) {
tscError("failed to save slow query, can't connect to server");
if (!tscSlowQueryConnInitialized) {
if (tscSlowQueryConn == NULL) {
tscTrace("start to init slow query connect");
taos_connect_a(NULL, "monitor", tsInternalPass, "", 0, tscInitConnCb, sql, &tscSlowQueryConn);
} else {
tscError("taos:%p, slow query connect is already initialized", tscSlowQueryConn);
free(sql);
return;
}
}
tscTrace("save slow query:sql", sql);
taos_query_a(taos, sql, tscSaveSlowQueryFpCb, NULL);
} else {
tscTrace("taos:%p, save slow query:%s", tscSlowQueryConn, sql);
taos_query_a(tscSlowQueryConn, sql, tscSaveSlowQueryFpCb, NULL);
free(sql);
}
}
void tscSaveSlowQuery(SSqlObj *pSql) {
const static int64_t SLOW_QUERY_INTERVAL = 3000000L;
if (pSql->res.useconds < SLOW_QUERY_INTERVAL) return;
tscTrace("%p query time:%ld sql:%s", pSql, pSql->res.useconds, pSql->sqlstr);
tscTrace("%p query time:%lld sql:%s", pSql, pSql->res.useconds, pSql->sqlstr);
char *sql = malloc(200);
int len = snprintf(sql, 200, "insert into %s.slowquery values(now, '%s', %lld, %lld, '", tsMonitorDbName,
......
......@@ -13,13 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include <stdarg.h>
#include "os.h"
#include "tglobalcfg.h"
#include "tsql.h"
......
......@@ -13,10 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include "os.h"
#include "taosmsg.h"
#include "tschemautil.h"
......
......@@ -13,13 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdlib.h>
#include <tsclient.h>
#include "tlosertree.h"
#include "os.h"
#include "tlosertree.h"
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
......
......@@ -13,12 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <wchar.h>
#include "os.h"
#include "tcache.h"
#include "trpc.h"
......@@ -2848,7 +2842,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql) {
return msgLen;
}
int tscEstimateBuildHeartBeatMsgLength(SSqlObj *pSql) {
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
int size = 0;
STscObj *pObj = pSql->pTscObj;
......@@ -2881,7 +2875,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql) {
pthread_mutex_lock(&pObj->mutex);
size = tscEstimateBuildHeartBeatMsgLength(pSql);
size = tscEstimateHeartBeatMsgLength(pSql);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
tscError("%p failed to malloc for heartbeat msg", pSql);
return -1;
......@@ -3613,7 +3607,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) {
code = tscDoGetMeterMeta(pSql, meterId, 0); // todo ??
} else {
tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%d, addr:%p", pSql,
tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%lld, addr:%p", pSql,
pMeterMetaInfo->pMeterMeta->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid,
pMeterMetaInfo->pMeterMeta);
}
......
......@@ -13,9 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include "os.h"
#include "tcache.h"
#include "tlog.h"
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <signal.h>
#include "os.h"
#include "shash.h"
#include "taos.h"
......
......@@ -13,10 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "os.h"
#include "tscSyntaxtreefunction.h"
#include "tsql.h"
......
......@@ -13,15 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <locale.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include "os.h"
#include "taosmsg.h"
#include "tcache.h"
......
......@@ -13,10 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <math.h>
#include <time.h>
#include "os.h"
#include "ihash.h"
#include "taosmsg.h"
......@@ -1821,3 +1817,17 @@ int16_t tscGetJoinTagColIndexByUid(SSqlCmd* pCmd, uint64_t uid) {
return pTagCond->joinInfo.right.tagCol;
}
}
bool tscIsUpdateQuery(STscObj* pObj) {
if (pObj == NULL || pObj->signature != pObj) {
globalCode = TSDB_CODE_DISCONNECTED;
return TSDB_CODE_DISCONNECTED;
}
SSqlCmd* pCmd = &(pObj->pSql->cmd);
if (pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) {
return 1;
}
return 0;
}
......@@ -29,7 +29,7 @@ import (
"unsafe"
)
func (mc *taosConn) taosConnect(ip, user, pass, db string, port int) (taos unsafe.Pointer, err error){
func (mc *taosConn) taosConnect(ip, user, pass, db string, port int) (taos unsafe.Pointer, err error) {
cuser := C.CString(user)
cpass := C.CString(pass)
cip := C.CString(ip)
......@@ -48,7 +48,7 @@ func (mc *taosConn) taosConnect(ip, user, pass, db string, port int) (taos unsaf
}
func (mc *taosConn) taosQuery(sqlstr string) (int, error) {
taosLog.Printf("taosQuery() input sql:%s\n", sqlstr)
//taosLog.Printf("taosQuery() input sql:%s\n", sqlstr)
csqlstr := C.CString(sqlstr)
defer C.free(unsafe.Pointer(csqlstr))
......@@ -58,6 +58,7 @@ func (mc *taosConn) taosQuery(sqlstr string) (int, error) {
mc.taos_error()
errStr := C.GoString(C.taos_errstr(mc.taos))
taosLog.Println("taos_query() failed:", errStr)
taosLog.Printf("taosQuery() input sql:%s\n", sqlstr)
return 0, errors.New(errStr)
}
......
......@@ -134,7 +134,7 @@ public class TSDBJNIConnector {
}
}
// Try retrieving result set for the executed SQLusing the current connection pointer. If the executed
// Try retrieving result set for the executed SQL using the current connection pointer. If the executed
// SQL is a DML/DDL which doesn't return a result set, then taosResultSetPointer should be 0L. Otherwise,
// taosResultSetPointer should be a non-zero value.
taosResultSetPointer = this.getResultSetImp(this.taos);
......
......@@ -122,9 +122,6 @@ void taos_close_stream(TAOS_STREAM *tstr);
int taos_load_table_info(TAOS *taos, const char* tableNameList);
// TODO: `configDir` should not be declared here
extern char configDir[]; // the path to global configuration
#ifdef __cplusplus
}
#endif
......
......@@ -169,6 +169,8 @@ extern uint32_t debugFlag;
extern uint32_t odbcdebugFlag;
extern uint32_t qdebugFlag;
extern uint32_t taosMaxTmrCtrl;
extern int tsRpcTimer;
extern int tsRpcMaxTime;
extern int tsUdpDelay;
......
......@@ -21,40 +21,41 @@ extern "C" {
#endif
typedef void *tmr_h;
typedef void (*TAOS_TMR_CALLBACK)(void *, void *);
extern uint32_t tmrDebugFlag;
extern int taosTmrThreads;
extern uint32_t taosMaxTmrCtrl;
#define tmrError(...) \
if (tmrDebugFlag & DEBUG_ERROR) { \
do { if (tmrDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR TMR ", tmrDebugFlag, __VA_ARGS__); \
}
} } while(0)
#define tmrWarn(...) \
if (tmrDebugFlag & DEBUG_WARN) { \
do { if (tmrDebugFlag & DEBUG_WARN) { \
tprintf("WARN TMR ", tmrDebugFlag, __VA_ARGS__); \
}
} } while(0)
#define tmrTrace(...) \
if (tmrDebugFlag & DEBUG_TRACE) { \
do { if (tmrDebugFlag & DEBUG_TRACE) { \
tprintf("TMR ", tmrDebugFlag, __VA_ARGS__); \
}
} } while(0)
#define MAX_NUM_OF_TMRCTL 512
#define MSECONDS_PER_TICK 5
void *taosTmrInit(int maxTmr, int resoultion, int longest, char *label);
void *taosTmrInit(int maxTmr, int resoultion, int longest, const char *label);
tmr_h taosTmrStart(void (*fp)(void *, void *), int mseconds, void *param1, void *handle);
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void *param, void *handle);
void taosTmrStop(tmr_h tmrId);
bool taosTmrStop(tmr_h tmrId);
void taosTmrStopA(tmr_h *timerId);
bool taosTmrStopA(tmr_h *timerId);
void taosTmrReset(void (*fp)(void *, void *), int mseconds, void *param1, void *handle, tmr_h *pTmrId);
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void *param, void *handle, tmr_h *pTmrId);
void taosTmrCleanUp(void *handle);
void taosTmrList(void *handle);
#ifdef __cplusplus
}
#endif
......
......@@ -89,7 +89,7 @@ extern "C" {
} else { \
return (x) < (y) ? -1 : 1; \
} \
} while (0);
} while (0)
#define GET_INT8_VAL(x) (*(int8_t *)(x))
#define GET_INT16_VAL(x) (*(int16_t *)(x))
......@@ -169,8 +169,6 @@ int32_t taosInitTimer(void (*callback)(int), int32_t ms);
*/
uint32_t MurmurHash3_32(const void *key, int32_t len);
bool taosCheckDbName(char *db, char *monitordb);
bool taosMbsToUcs4(char *mbs, int32_t mbs_len, char *ucs4, int32_t ucs4_max_len);
bool taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs);
......
......@@ -6,7 +6,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc)
IF (TD_LINUX_64)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(./src SRC)
LIST(REMOVE_ITEM SRC ./src/shellWindows.c)
ADD_EXECUTABLE(shell ${SRC})
......
......@@ -445,7 +445,7 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
case TSDB_DATA_TYPE_BIGINT:
printf("%*lld|", l[i], *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_FLOAT: {
#ifdef _TD_ARM_32_
float fv = 0;
//memcpy(&fv, row[i], sizeof(float));
......@@ -454,8 +454,9 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
#else
printf("%*.5f|", l[i], *((float *)row[i]));
#endif
}
break;
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_DOUBLE: {
#ifdef _TD_ARM_32_
double dv = 0;
//memcpy(&dv, row[i], sizeof(double));
......@@ -464,6 +465,7 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
#else
printf("%*.9f|", l[i], *((double *)row[i]));
#endif
}
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
......@@ -528,7 +530,7 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
case TSDB_DATA_TYPE_BIGINT:
printf("%lld\n", *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_FLOAT: {
#ifdef _TD_ARM_32_
float fv = 0;
//memcpy(&fv, row[i], sizeof(float));
......@@ -537,8 +539,9 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
#else
printf("%.5f\n", *((float *)row[i]));
#endif
}
break;
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_DOUBLE: {
#ifdef _TD_ARM_32_
double dv = 0;
//memcpy(&dv, row[i], sizeof(double));
......@@ -547,6 +550,7 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
#else
printf("%.9f\n", *((double *)row[i]));
#endif
}
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
......@@ -614,7 +618,7 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
case TSDB_DATA_TYPE_BIGINT:
fprintf(fp, "%lld", *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_FLOAT: {
#ifdef _TD_ARM_32_
float fv = 0;
//memcpy(&fv, row[i], sizeof(float));
......@@ -623,8 +627,9 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
#else
fprintf(fp, "%.5f", *((float *)row[i]));
#endif
}
break;
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_DOUBLE: {
#ifdef _TD_ARM_32_
double dv = 0;
//memcpy(&dv, row[i], sizeof(double));
......@@ -633,6 +638,7 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
#else
fprintf(fp, "%.9f", *((double *)row[i]));
#endif
}
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
......
......@@ -6,7 +6,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc)
IF (TD_LINUX_64)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(taosdemo ${SRC})
TARGET_LINK_LIBRARIES(taosdemo taos_static)
......
......@@ -6,7 +6,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc)
IF (TD_LINUX_64)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(taosdump ${SRC})
TARGET_LINK_LIBRARIES(taosdump taos_static)
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF (TD_LINUX_64)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc)
......
......@@ -92,7 +92,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
return false;
}
bool us = taos_result_precision(result) == TSDB_TIME_PRECISION_MICRO;
int precision = taos_result_precision(result);
// such as select count(*) from sys.cpu
// such as select count(*) from sys.cpu group by ipaddr
......@@ -151,7 +151,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
snprintf(target, HTTP_GC_TARGET_SIZE, "%s%s", aliasBuffer, (char *)row[groupFields]);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (us) {
if (precision == TSDB_TIME_PRECISION_MILLI) {
snprintf(target, HTTP_GC_TARGET_SIZE, "%s%ld", aliasBuffer, *((int64_t *) row[groupFields]));
} else {
snprintf(target, HTTP_GC_TARGET_SIZE, "%s%ld", aliasBuffer, *((int64_t *) row[groupFields]) / 1000);
......@@ -210,7 +210,11 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
httpJsonStringForTransMean(jsonBuf, row[i], fields[i].bytes);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (precision == TSDB_TIME_PRECISION_MILLI) { //ms
httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
} else {
httpJsonInt64(jsonBuf, *((int64_t *)row[i]) / 1000);
}
break;
default:
httpJsonString(jsonBuf, "invalidcol", 10);
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF (TD_LINUX_64)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
......
......@@ -55,10 +55,44 @@
#define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosReadSocket(fd, buf, len) read(fd, buf, len)
#define atomic_load_8(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_16(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_32(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_64(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_ptr(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_store_8(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_16(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_32(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_64(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_ptr(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_8(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_16(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_32(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler
// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins.
#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap
#define __sync_add_and_fetch_64 __sync_add_and_fetch
#define __sync_add_and_fetch_32 __sync_add_and_fetch
#define __sync_add_and_fetch_16 __sync_add_and_fetch
#define __sync_add_and_fetch_8 __sync_add_and_fetch
#define __sync_add_and_fetch_ptr __sync_add_and_fetch
#define __sync_sub_and_fetch_64 __sync_sub_and_fetch
#define __sync_sub_and_fetch_32 __sync_sub_and_fetch
#define __sync_sub_and_fetch_16 __sync_sub_and_fetch
#define __sync_sub_and_fetch_8 __sync_sub_and_fetch
#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch
int32_t __sync_val_load_32(int32_t *ptr);
void __sync_val_restore_32(int32_t *ptr, int32_t newval);
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF (TD_LINUX_64)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_PLATFORM_LINUX_H
#define TDENGINE_PLATFORM_LINUX_H
......@@ -25,10 +25,12 @@ extern "C" {
#include <arpa/inet.h>
#include <assert.h>
#include <dirent.h>
#include <endian.h>
#include <float.h>
#include <ifaddrs.h>
#include <limits.h>
#include <locale.h>
#include <math.h>
#include <netdb.h>
#include <netinet/in.h>
......@@ -37,7 +39,8 @@ extern "C" {
#include <netinet/udp.h>
#include <pthread.h>
#include <pwd.h>
#include <stdbool.h>
#include <semaphore.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
......@@ -45,21 +48,19 @@ extern "C" {
#include <sys/epoll.h>
#include <sys/file.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/sendfile.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <sys/un.h>
#include <syslog.h>
#include <termios.h>
#include <unistd.h>
#include <wchar.h>
#include <wordexp.h>
#include <locale.h>
#include <dirent.h>
#define taosCloseSocket(x) \
{ \
......@@ -71,14 +72,43 @@ extern "C" {
#define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosReadSocket(fd, buf, len) read(fd, buf, len)
#define atomic_load_8(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_16(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_32(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_64(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_ptr(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_store_8(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_16(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_32(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_64(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_ptr(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_8(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_16(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_32(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler
// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins.
#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap
#define __sync_add_and_fetch_64 __sync_add_and_fetch
#define __sync_add_and_fetch_32 __sync_add_and_fetch
#define __sync_add_and_fetch_16 __sync_add_and_fetch
#define __sync_add_and_fetch_8 __sync_add_and_fetch
#define __sync_add_and_fetch_ptr __sync_add_and_fetch
#define __sync_sub_and_fetch_64 __sync_sub_and_fetch
#define __sync_sub_and_fetch_32 __sync_sub_and_fetch
#define __sync_sub_and_fetch_16 __sync_sub_and_fetch
#define __sync_sub_and_fetch_8 __sync_sub_and_fetch
#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch
int32_t __sync_val_load_32(int32_t *ptr);
void __sync_val_restore_32(int32_t *ptr, int32_t newval);
......
......@@ -29,6 +29,7 @@
#include <math.h>
#include <string.h>
#include <assert.h>
#include <intrin.h>
#ifdef __cplusplus
extern "C" {
......@@ -78,12 +79,75 @@ extern "C" {
#define taosWriteSocket(fd, buf, len) send(fd, buf, len, 0)
#define taosReadSocket(fd, buf, len) recv(fd, buf, len, 0)
int32_t __sync_val_compare_and_swap_32(int32_t *ptr, int32_t oldval, int32_t newval);
int32_t __sync_add_and_fetch_32(int32_t *ptr, int32_t val);
int32_t __sync_sub_and_fetch_32(int32_t *ptr, int32_t val);
int64_t __sync_val_compare_and_swap_64(int64_t *ptr, int64_t oldval, int64_t newval);
int64_t __sync_add_and_fetch_64(int64_t *ptr, int64_t val);
int64_t __sync_sub_and_fetch_64(int64_t *ptr, int64_t val);
#if defined(_M_ARM) || defined(_M_ARM64)
#define atomic_load_8(ptr) __iso_volatile_load8((const volatile __int8*)(ptr))
#define atomic_load_16(ptr) __iso_volatile_load16((const volatile __int16*)(ptr))
#define atomic_load_32(ptr) __iso_volatile_load32((const volatile __int32*)(ptr))
#define atomic_load_64(ptr) __iso_volatile_load64((const volatile __int64*)(ptr))
#define atomic_store_8(ptr, val) __iso_volatile_store8((volatile __int8*)(ptr), (__int8)(val))
#define atomic_store_16(ptr, val) __iso_volatile_store16((volatile __int16*)(ptr), (__int16)(val))
#define atomic_store_32(ptr, val) __iso_volatile_store32((volatile __int32*)(ptr), (__int32)(val))
#define atomic_store_64(ptr, val) __iso_volatile_store64((volatile __int64*)(ptr), (__int64)(val))
#ifdef _M_ARM64
#define atomic_load_ptr atomic_load_64
#define atomic_store_ptr atomic_store_64
#else
#define atomic_load_ptr atomic_load_32
#define atomic_store_ptr atomic_store_32
#endif
#else
#define atomic_load_8(ptr) (*(char volatile*)(ptr))
#define atomic_load_16(ptr) (*(short volatile*)(ptr))
#define atomic_load_32(ptr) (*(long volatile*)(ptr))
#define atomic_load_64(ptr) (*(__int64 volatile*)(ptr))
#define atomic_load_ptr(ptr) (*(void* volatile*)(ptr))
#define atomic_store_8(ptr, val) ((*(char volatile*)(ptr)) = (char)(val))
#define atomic_store_16(ptr, val) ((*(short volatile*)(ptr)) = (short)(val))
#define atomic_store_32(ptr, val) ((*(long volatile*)(ptr)) = (long)(val))
#define atomic_store_64(ptr, val) ((*(__int64 volatile*)(ptr)) = (__int64)(val))
#define atomic_store_ptr(ptr, val) ((*(void* volatile*)(ptr)) = (void*)(val))
#endif
#define atomic_exchange_8(ptr, val) _InterlockedExchange8((char volatile*)(ptr), (char)(val))
#define atomic_exchange_16(ptr, val) _InterlockedExchange16((short volatile*)(ptr), (short)(val))
#define atomic_exchange_32(ptr, val) _InterlockedExchange((long volatile*)(ptr), (long)(val))
#define atomic_exchange_64(ptr, val) _InterlockedExchange64((__int64 volatile*)(ptr), (__int64)(val))
#define atomic_exchange_ptr(ptr, val) _InterlockedExchangePointer((void* volatile*)(ptr), (void*)(val))
#define __sync_val_compare_and_swap_8(ptr, oldval, newval) _InterlockedCompareExchange8((char volatile*)(ptr), (char)(newval), (char)(oldval))
#define __sync_val_compare_and_swap_16(ptr, oldval, newval) _InterlockedCompareExchange16((short volatile*)(ptr), (short)(newval), (short)(oldval))
#define __sync_val_compare_and_swap_32(ptr, oldval, newval) _InterlockedCompareExchange((long volatile*)(ptr), (long)(newval), (long)(oldval))
#define __sync_val_compare_and_swap_64(ptr, oldval, newval) _InterlockedCompareExchange64((__int64 volatile*)(ptr), (__int64)(newval), (__int64)(oldval))
#define __sync_val_compare_and_swap_ptr(ptr, oldval, newval) _InterlockedCompareExchangePointer((void* volatile*)(ptr), (void*)(newval), (void*)(oldval))
char interlocked_add_8(char volatile *ptr, char val);
short interlocked_add_16(short volatile *ptr, short val);
long interlocked_add_32(long volatile *ptr, long val);
__int64 interlocked_add_64(__int64 volatile *ptr, __int64 val);
#define __sync_add_and_fetch_8(ptr, val) interlocked_add_8((char volatile*)(ptr), (char)(val))
#define __sync_add_and_fetch_16(ptr, val) interlocked_add_16((short volatile*)(ptr), (short)(val))
#define __sync_add_and_fetch_32(ptr, val) interlocked_add_32((long volatile*)(ptr), (long)(val))
#define __sync_add_and_fetch_64(ptr, val) interlocked_add_64((__int64 volatile*)(ptr), (__int64)(val))
#ifdef _WIN64
#define __sync_add_and_fetch_ptr __sync_add_and_fetch_64
#else
#define __sync_add_and_fetch_ptr __sync_add_and_fetch_32
#endif
#define __sync_sub_and_fetch_8(ptr, val) __sync_add_and_fetch_8((ptr), -(val))
#define __sync_sub_and_fetch_16(ptr, val) __sync_add_and_fetch_16((ptr), -(val))
#define __sync_sub_and_fetch_32(ptr, val) __sync_add_and_fetch_32((ptr), -(val))
#define __sync_sub_and_fetch_64(ptr, val) __sync_add_and_fetch_64((ptr), -(val))
#define __sync_sub_and_fetch_ptr(ptr, val) __sync_add_and_fetch_ptr((ptr), -(val))
int32_t __sync_val_load_32(int32_t *ptr);
void __sync_val_restore_32(int32_t *ptr, int32_t newval);
......
......@@ -43,8 +43,11 @@ void taosResetPthread(pthread_t *thread) {
}
int64_t taosGetPthreadId() {
pthread_t id = pthread_self();
return (int64_t)id.p;
#ifdef PTW32_VERSION
return pthread_getw32threadid_np(pthread_self());
#else
return (int64_t)pthread_self();
#endif
}
int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optlen) {
......@@ -63,28 +66,21 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle
return setsockopt(socketfd, level, optname, optval, optlen);
}
int32_t __sync_val_compare_and_swap_32(int32_t *ptr, int32_t oldval, int32_t newval) {
return InterlockedCompareExchange(ptr, newval, oldval);
}
int32_t __sync_add_and_fetch_32(int32_t *ptr, int32_t val) {
return InterlockedAdd(ptr, val);
}
int32_t __sync_sub_and_fetch_32(int32_t *ptr, int32_t val) {
return InterlockedAdd(ptr, -val);
char interlocked_add_8(char volatile* ptr, char val) {
return _InterlockedExchangeAdd8(ptr, val) + val;
}
int64_t __sync_val_compare_and_swap_64(int64_t *ptr, int64_t oldval, int64_t newval) {
return InterlockedCompareExchange64(ptr, newval, oldval);
short interlocked_add_16(short volatile* ptr, short val) {
return _InterlockedExchangeAdd16(ptr, val) + val;
}
int64_t __sync_add_and_fetch_64(int64_t *ptr, int64_t val) {
return InterlockedAdd64(ptr, val);
long interlocked_add_32(long volatile* ptr, long val) {
return _InterlockedExchangeAdd(ptr, val) + val;
}
int64_t __sync_sub_and_fetch_64(int64_t *ptr, int64_t val) {
return InterlockedAdd64(ptr, -val);
__int64 interlocked_add_64(__int64 volatile* ptr, __int64 val) {
return _InterlockedExchangeAdd64(ptr, val) + val;
}
int32_t __sync_val_load_32(int32_t *ptr) {
......
......@@ -5,7 +5,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc)
IF (TD_LINUX_64)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(./src SRC)
ELSEIF (TD_DARWIN_64)
LIST(APPEND SRC ./src/thaship.c)
......
......@@ -5,7 +5,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc)
IF (TD_LINUX_64)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(sdb ${SRC})
TARGET_LINK_LIBRARIES(sdb trpc)
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF (TD_LINUX_64)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/http/inc)
......
......@@ -30,6 +30,7 @@ char* mgmtMeterGetTag(STabObj* pMeter, int32_t col, SSchema* pTagColSchema);
int32_t mgmtFindTagCol(STabObj * pMetric, const char * tagName);
int32_t mgmtGetTagsLength(STabObj* pMetric, int32_t col);
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
int32_t mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pInfo, int32_t tableIndex, tQueryResultset* pRes);
int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes);
......
......@@ -14,9 +14,11 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mgmt.h"
#include <arpa/inet.h>
#include "mgmtBalance.h"
#include "mgmtUtil.h"
#include "tschemautil.h"
void *dbSdb = NULL;
......@@ -373,10 +375,12 @@ int mgmtDropDbByName(SAcctObj *pAcct, char *name) {
if (pDb == NULL) {
mWarn("db:%s is not there", name);
// return TSDB_CODE_INVALID_DB;
return 0;
return TSDB_CODE_SUCCESS;
}
if (taosCheckDbName(pDb->name, tsMonitorDbName)) return TSDB_CODE_MONITOR_DB_FORBEIDDEN;
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
return TSDB_CODE_MONITOR_DB_FORBEIDDEN;
}
return mgmtDropDb(pDb);
}
......
......@@ -688,8 +688,10 @@ int mgmtDropMeter(SDbObj *pDb, char *meterId, int ignore) {
pAcct = mgmtGetAcct(pDb->cfg.acct);
// 0.sys
if (taosCheckDbName(pDb->name, tsMonitorDbName)) return TSDB_CODE_MONITOR_DB_FORBEIDDEN;
// 0.log
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
return TSDB_CODE_MONITOR_DB_FORBEIDDEN;
}
if (mgmtIsNormalMeter(pMeter)) {
return dropMeterImp(pDb, pMeter, pAcct);
......@@ -719,8 +721,8 @@ int mgmtAlterMeter(SDbObj *pDb, SAlterTableMsg *pAlter) {
return TSDB_CODE_INVALID_TABLE;
}
// 0.sys
if (taosCheckDbName(pDb->name, tsMonitorDbName)) return TSDB_CODE_MONITOR_DB_FORBEIDDEN;
// 0.log
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) return TSDB_CODE_MONITOR_DB_FORBEIDDEN;
if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
if (!mgmtIsNormalMeter(pMeter) || !mgmtMeterCreateFromMetric(pMeter)) {
......@@ -833,6 +835,7 @@ static void removeMeterFromMetricIndex(STabObj *pMetric, STabObj *pMeter) {
}
}
tSkipListDestroyKey(&key);
if (num != 0) {
free(pRes);
}
......
......@@ -1140,7 +1140,7 @@ int mgmtProcessConnectMsg(char *pMsg, int msgLen, SConnObj *pConn) {
SAcctObj * pAcct = NULL;
SUserObj * pUser = NULL;
SDbObj * pDb = NULL;
char dbName[TSDB_METER_ID_LEN];
char dbName[256] = {0};
pConnectMsg = (SConnectMsg *)pMsg;
......@@ -1158,7 +1158,6 @@ int mgmtProcessConnectMsg(char *pMsg, int msgLen, SConnObj *pConn) {
pAcct = mgmtGetAcct(pUser->acct);
if (pConnectMsg->db[0]) {
memset(dbName, 0, sizeof(dbName));
sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db);
pDb = mgmtGetDb(dbName);
if (pDb == NULL) {
......
......@@ -87,3 +87,10 @@ int32_t mgmtGetTagsLength(STabObj* pMetric, int32_t col) { // length before col
return len;
}
bool mgmtCheckIsMonitorDB(char *db, char *monitordb) {
char dbName[TSDB_DB_NAME_LEN + 1] = {0};
extractDBName(db, dbName);
return (strncasecmp(dbName, monitordb, strlen(dbName)) == 0);
}
......@@ -3436,9 +3436,18 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
if (pQuery->interpoType == TSDB_INTERPO_SET_VALUE) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
// only the function of interp needs the corresponding information
if (pCtx->functionId != TSDB_FUNC_INTERP) {
continue;
}
pCtx->numOfParams = 4;
SInterpInfo * pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf;
pInterpInfo->pInterpDetail = calloc(1, sizeof(SInterpInfoDetail));
SInterpInfoDetail *pInterpDetail = pInterpInfo->pInterpDetail;
// for primary timestamp column, set the flag
......
......@@ -866,7 +866,8 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
pQInfo->pointsRead);
if (pQInfo->over == 0) {
dTrace("QInfo:%p set query flag, oldSig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__);
//dTrace("QInfo:%p set query flag, oldSig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__);
dTrace("QInfo:%p set query flag, oldSig:%p", pQInfo, pQInfo->signature);
uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo);
/*
......
......@@ -419,7 +419,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
if (code == TSDB_CODE_SUCCESS) {
pRsp->offset = htobe64(vnodeGetOffsetVal(pRetrieve->qhandle));
pRsp->useconds = ((SQInfo *)(pRetrieve->qhandle))->useconds;
pRsp->useconds = htobe64(((SQInfo *)(pRetrieve->qhandle))->useconds);
} else {
pRsp->offset = 0;
pRsp->useconds = 0;
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF (TD_LINUX_64)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/modules/http/inc)
......
......@@ -4,7 +4,7 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
IF (TD_LINUX_64)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(tutil ${SRC})
TARGET_LINK_LIBRARIES(tutil pthread os m rt)
......
......@@ -1353,7 +1353,7 @@ uint32_t crc32c_hw(uint32_t crc, crc_stream buf, size_t len) {
#endif // #ifndef _TD_ARM_
void taosResolveCRC() {
#ifndef _TD_ARM_32
#ifndef _TD_ARM_
int sse42;
SSE42(sse42);
crc32c = sse42 ? crc32c_hw : crc32c_sf;
......
......@@ -536,6 +536,11 @@ void tsInitGlobalConfig() {
0, 2, 0, TSDB_CFG_UTYPE_NONE);
// 0-any, 1-mgmt, 2-dnode
// timer
tsInitConfigOption(cfg++, "maxTmrCtrl", &taosMaxTmrCtrl, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLUSTER,
8, 2048, 0, TSDB_CFG_UTYPE_NONE);
// time
tsInitConfigOption(cfg++, "monitorInterval", &tsMonitorInterval, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG,
......
......@@ -16,555 +16,553 @@
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "os.h"
#include "tidpool.h"
#include "tlog.h"
#include "tsched.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
// special mempool without mutex
#define mpool_h void *
typedef struct {
int numOfFree; /* number of free slots */
int first; /* the first free slot */
int numOfBlock; /* the number of blocks */
int blockSize; /* block size in bytes */
int * freeList; /* the index list */
char *pool; /* the actual mem block */
} pool_t;
mpool_h tmrMemPoolInit(int maxNum, int blockSize);
char *tmrMemPoolMalloc(mpool_h handle);
void tmrMemPoolFree(mpool_h handle, char *p);
void tmrMemPoolCleanUp(mpool_h handle);
typedef struct _tmr_obj {
void *param1;
void (*fp)(void *, void *);
tmr_h timerId;
short cycle;
struct _tmr_obj * prev;
struct _tmr_obj * next;
int index;
struct _tmr_ctrl_t *pCtrl;
} tmr_obj_t;
typedef struct {
tmr_obj_t *head;
int count;
} tmr_list_t;
typedef struct _tmr_ctrl_t {
void * signature;
pthread_mutex_t mutex; /* mutex to protect critical resource */
int resolution; /* resolution in mseconds */
int numOfPeriods; /* total number of periods */
int64_t periodsFromStart; /* count number of periods since start */
pthread_t thread; /* timer thread ID */
tmr_list_t * tmrList;
mpool_h poolHandle;
char label[12];
int maxNumOfTmrs;
int numOfTmrs;
int ticks;
int maxTicks;
int tmrCtrlId;
#define TIMER_STATE_WAITING 0
#define TIMER_STATE_EXPIRED 1
#define TIMER_STATE_STOPPED 2
#define TIMER_STATE_CANCELED 3
typedef union _tmr_ctrl_t {
char label[16];
struct {
// pad to ensure 'next' is the end of this union
char padding[16 - sizeof(union _tmr_ctrl_t*)];
union _tmr_ctrl_t* next;
};
} tmr_ctrl_t;
typedef struct tmr_obj_t {
uintptr_t id;
tmr_ctrl_t* ctrl;
struct tmr_obj_t* mnext;
struct tmr_obj_t* prev;
struct tmr_obj_t* next;
uint16_t slot;
uint8_t wheel;
uint8_t state;
uint8_t refCount;
uint8_t reserved1;
uint16_t reserved2;
union {
int64_t expireAt;
int64_t executedBy;
};
TAOS_TMR_CALLBACK fp;
void* param;
} tmr_obj_t;
typedef struct timer_list_t {
int64_t lockedBy;
tmr_obj_t* timers;
} timer_list_t;
typedef struct timer_map_t {
uint32_t size;
uint32_t count;
timer_list_t* slots;
} timer_map_t;
typedef struct time_wheel_t {
pthread_mutex_t mutex;
int64_t nextScanAt;
uint32_t resolution;
uint16_t size;
uint16_t index;
tmr_obj_t** slots;
} time_wheel_t;
uint32_t tmrDebugFlag = DEBUG_ERROR | DEBUG_WARN | DEBUG_FILE;
void taosTmrProcessList(tmr_ctrl_t *);
uint32_t taosMaxTmrCtrl = 512;
static pthread_once_t tmrModuleInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t tmrCtrlMutex;
static tmr_ctrl_t* tmrCtrls;
static tmr_ctrl_t* unusedTmrCtrl = NULL;
static void* tmrQhandle;
static int numOfTmrCtrl = 0;
tmr_ctrl_t tmrCtrl[MAX_NUM_OF_TMRCTL];
int numOfTmrCtrl = 0;
void * tmrIdPool = NULL;
void * tmrQhandle;
int taosTmrThreads = 1;
void taosTimerLoopFunc(int signo) {
tmr_ctrl_t *pCtrl;
int count = 0;
static uintptr_t nextTimerId = 0;
static time_wheel_t wheels[] = {
{.resolution = MSECONDS_PER_TICK, .size = 4096},
{.resolution = 1000, .size = 1024},
{.resolution = 60000, .size = 1024},
};
static timer_map_t timerMap;
static uintptr_t getNextTimerId() {
uintptr_t id;
do {
id = __sync_add_and_fetch_ptr(&nextTimerId, 1);
} while (id == 0);
return id;
}
static void timerAddRef(tmr_obj_t* timer) { __sync_add_and_fetch_8(&timer->refCount, 1); }
for (int i = 1; i < MAX_NUM_OF_TMRCTL; ++i) {
pCtrl = tmrCtrl + i;
if (pCtrl->signature) {
count++;
pCtrl->ticks++;
if (pCtrl->ticks >= pCtrl->maxTicks) {
taosTmrProcessList(pCtrl);
pCtrl->ticks = 0;
static void timerDecRef(tmr_obj_t* timer) {
if (__sync_sub_and_fetch_8(&timer->refCount, 1) == 0) {
free(timer);
}
if (count >= numOfTmrCtrl) break;
}
static void lockTimerList(timer_list_t* list) {
int64_t tid = taosGetPthreadId();
int i = 0;
while (__sync_val_compare_and_swap_64(&(list->lockedBy), 0, tid) != 0) {
if (++i % 1000 == 0) {
sched_yield();
}
}
}
void taosTmrModuleInit(void) {
tmrIdPool = taosInitIdPool(MAX_NUM_OF_TMRCTL);
memset(tmrCtrl, 0, sizeof(tmrCtrl));
static void unlockTimerList(timer_list_t* list) {
int64_t tid = taosGetPthreadId();
if (__sync_val_compare_and_swap_64(&(list->lockedBy), tid, 0) != tid) {
assert(false);
tmrError("%d trying to unlock a timer list not locked by current thread.", tid);
}
}
static void addTimer(tmr_obj_t* timer) {
timerAddRef(timer);
timer->wheel = tListLen(wheels);
taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
uint32_t idx = (uint32_t)(timer->id % timerMap.size);
timer_list_t* list = timerMap.slots + idx;
tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr");
tmrTrace("timer module is initialized, thread:%d", taosTmrThreads);
lockTimerList(list);
timer->mnext = list->timers;
list->timers = timer;
unlockTimerList(list);
}
void *taosTmrInit(int maxNumOfTmrs, int resolution, int longest, char *label) {
static pthread_once_t tmrInit = PTHREAD_ONCE_INIT;
tmr_ctrl_t * pCtrl;
pthread_once(&tmrInit, taosTmrModuleInit);
static tmr_obj_t* findTimer(uintptr_t id) {
tmr_obj_t* timer = NULL;
if (id > 0) {
uint32_t idx = (uint32_t)(id % timerMap.size);
timer_list_t* list = timerMap.slots + idx;
lockTimerList(list);
for (timer = list->timers; timer != NULL; timer = timer->mnext) {
if (timer->id == id) {
timerAddRef(timer);
break;
}
}
unlockTimerList(list);
}
return timer;
}
int tmrCtrlId = taosAllocateId(tmrIdPool);
static void removeTimer(uintptr_t id) {
tmr_obj_t* prev = NULL;
uint32_t idx = (uint32_t)(id % timerMap.size);
timer_list_t* list = timerMap.slots + idx;
lockTimerList(list);
for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
if (p->id == id) {
if (prev == NULL) {
list->timers = p->mnext;
} else {
prev->mnext = p->mnext;
}
timerDecRef(p);
break;
}
prev = p;
}
unlockTimerList(list);
}
if (tmrCtrlId < 0) {
tmrError("%s bug!!! too many timers!!!", label);
return NULL;
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
timerAddRef(timer);
// select a wheel for the timer, we are not an accurate timer,
// but the inaccuracy should not be too large.
timer->wheel = tListLen(wheels) - 1;
for (uint8_t i = 0; i < tListLen(wheels); i++) {
time_wheel_t* wheel = wheels + i;
if (delay < wheel->resolution * wheel->size) {
timer->wheel = i;
break;
}
}
pCtrl = tmrCtrl + tmrCtrlId;
tfree(pCtrl->tmrList);
tmrMemPoolCleanUp(pCtrl->poolHandle);
time_wheel_t* wheel = wheels + timer->wheel;
timer->prev = NULL;
timer->expireAt = taosGetTimestampMs() + delay;
memset(pCtrl, 0, sizeof(tmr_ctrl_t));
pthread_mutex_lock(&wheel->mutex);
pCtrl->tmrCtrlId = tmrCtrlId;
strcpy(pCtrl->label, label);
pCtrl->maxNumOfTmrs = maxNumOfTmrs;
uint32_t idx = 0;
if (timer->expireAt > wheel->nextScanAt) {
// adjust delay according to next scan time of this wheel
// so that the timer is not fired earlier than desired.
delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
idx = (delay + wheel->resolution - 1) / wheel->resolution;
}
if ((pCtrl->poolHandle = tmrMemPoolInit(maxNumOfTmrs + 10, sizeof(tmr_obj_t))) == NULL) {
tmrError("%s failed to allocate mem pool", label);
tmrMemPoolCleanUp(pCtrl->poolHandle);
return NULL;
timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
tmr_obj_t* p = wheel->slots[timer->slot];
wheel->slots[timer->slot] = timer;
timer->next = p;
if (p != NULL) {
p->prev = timer;
}
if (resolution < MSECONDS_PER_TICK) resolution = MSECONDS_PER_TICK;
pCtrl->resolution = resolution;
pCtrl->maxTicks = resolution / MSECONDS_PER_TICK;
pCtrl->ticks = rand() / pCtrl->maxTicks;
pCtrl->numOfPeriods = longest / resolution;
if (pCtrl->numOfPeriods < 10) pCtrl->numOfPeriods = 10;
pthread_mutex_unlock(&wheel->mutex);
}
pCtrl->tmrList = (tmr_list_t *)malloc(sizeof(tmr_list_t) * pCtrl->numOfPeriods);
if (pCtrl->tmrList == NULL) {
tmrError("%s failed to allocate(size:%d) mem for tmrList", label, sizeof(tmr_list_t) * pCtrl->numOfPeriods);
tmrMemPoolCleanUp(pCtrl->poolHandle);
taosTmrCleanUp(pCtrl);
return NULL;
static bool removeFromWheel(tmr_obj_t* timer) {
if (timer->wheel >= tListLen(wheels)) {
return false;
}
time_wheel_t* wheel = wheels + timer->wheel;
for (int i = 0; i < pCtrl->numOfPeriods; i++) {
pCtrl->tmrList[i].head = NULL;
pCtrl->tmrList[i].count = 0;
bool removed = false;
pthread_mutex_lock(&wheel->mutex);
// other thread may modify timer->wheel, check again.
if (timer->wheel < tListLen(wheels)) {
if (timer->prev != NULL) {
timer->prev->next = timer->next;
}
if (pthread_mutex_init(&pCtrl->mutex, NULL) < 0) {
tmrError("%s failed to create the mutex, reason:%s", label, strerror(errno));
taosTmrCleanUp(pCtrl);
return NULL;
if (timer->next != NULL) {
timer->next->prev = timer->prev;
}
if (timer == wheel->slots[timer->slot]) {
wheel->slots[timer->slot] = timer->next;
}
timer->wheel = tListLen(wheels);
timer->next = NULL;
timer->prev = NULL;
timerDecRef(timer);
removed = true;
}
pthread_mutex_unlock(&wheel->mutex);
pCtrl->signature = pCtrl;
numOfTmrCtrl++;
tmrTrace("%s timer ctrl is initialized, index:%d", label, tmrCtrlId);
return pCtrl;
return removed;
}
void taosTmrProcessList(tmr_ctrl_t *pCtrl) {
unsigned int index;
tmr_list_t * pList;
tmr_obj_t * pObj, *header;
pthread_mutex_lock(&pCtrl->mutex);
index = pCtrl->periodsFromStart % pCtrl->numOfPeriods;
pList = &pCtrl->tmrList[index];
static void processExpiredTimer(void* handle, void* arg) {
tmr_obj_t* timer = (tmr_obj_t*)handle;
timer->executedBy = taosGetPthreadId();
uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
if (state == TIMER_STATE_WAITING) {
const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] execution start.";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
while (1) {
header = pList->head;
if (header == NULL) break;
(*timer->fp)(timer->param, (tmr_h)timer->id);
atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
if (header->cycle > 0) {
pObj = header;
while (pObj) {
pObj->cycle--;
pObj = pObj->next;
}
break;
fmt = "%s timer[id=%lld, fp=%p, param=%p] execution end.";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
}
removeTimer(timer->id);
timerDecRef(timer);
}
pCtrl->numOfTmrs--;
tmrTrace("%s %p, timer expired, fp:%p, tmr_h:%p, index:%d, total:%d", pCtrl->label, header->param1, header->fp,
header, index, pCtrl->numOfTmrs);
static void addToExpired(tmr_obj_t* head) {
const char* fmt = "%s adding expired timer[id=%lld, fp=%p, param=%p] to queue.";
pList->head = header->next;
if (header->next) header->next->prev = NULL;
pList->count--;
header->timerId = NULL;
while (head != NULL) {
uintptr_t id = head->id;
tmr_obj_t* next = head->next;
tmrTrace(fmt, head->ctrl->label, id, head->fp, head->param);
SSchedMsg schedMsg;
schedMsg.fp = NULL;
schedMsg.tfp = header->fp;
schedMsg.ahandle = header->param1;
schedMsg.thandle = header;
schedMsg.tfp = processExpiredTimer;
schedMsg.ahandle = head;
schedMsg.thandle = NULL;
taosScheduleTask(tmrQhandle, &schedMsg);
tmrMemPoolFree(pCtrl->poolHandle, (char *)header);
tmrTrace("timer[id=%lld] has been added to queue.", id);
head = next;
}
pCtrl->periodsFromStart++;
pthread_mutex_unlock(&pCtrl->mutex);
}
void taosTmrCleanUp(void *handle) {
tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle;
if (pCtrl == NULL || pCtrl->signature != pCtrl) return;
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int mseconds, void* param, tmr_ctrl_t* ctrl) {
uintptr_t id = getNextTimerId();
timer->id = id;
timer->state = TIMER_STATE_WAITING;
timer->fp = fp;
timer->param = param;
timer->ctrl = ctrl;
addTimer(timer);
const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] started";
tmrTrace(fmt, ctrl->label, timer->id, timer->fp, timer->param);
if (mseconds == 0) {
timer->wheel = tListLen(wheels);
timerAddRef(timer);
addToExpired(timer);
} else {
addToWheel(timer, mseconds);
}
pCtrl->signature = NULL;
taosFreeId(tmrIdPool, pCtrl->tmrCtrlId);
numOfTmrCtrl--;
tmrTrace("%s is cleaned up, numOfTmrs:%d", pCtrl->label, numOfTmrCtrl);
// note: use `timer->id` here is unsafe as `timer` may already be freed
return id;
}
tmr_h taosTmrStart(void (*fp)(void *, void *), int mseconds, void *param1, void *handle) {
tmr_obj_t * pObj, *cNode, *pNode;
tmr_list_t *pList;
int index, period;
tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle;
if (handle == NULL) return NULL;
period = mseconds / pCtrl->resolution;
if (pthread_mutex_lock(&pCtrl->mutex) != 0)
tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno));
pObj = (tmr_obj_t *)tmrMemPoolMalloc(pCtrl->poolHandle);
if (pObj == NULL) {
tmrError("%s reach max number of timers:%d", pCtrl->label, pCtrl->maxNumOfTmrs);
pthread_mutex_unlock(&pCtrl->mutex);
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle) {
tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
if (ctrl == NULL || ctrl->label[0] == 0) {
return NULL;
}
pObj->cycle = period / pCtrl->numOfPeriods;
pObj->param1 = param1;
pObj->fp = fp;
pObj->timerId = pObj;
pObj->pCtrl = pCtrl;
index = (period + pCtrl->periodsFromStart) % pCtrl->numOfPeriods;
int cindex = (pCtrl->periodsFromStart) % pCtrl->numOfPeriods;
pList = &(pCtrl->tmrList[index]);
pObj->index = index;
cNode = pList->head;
pNode = NULL;
while (cNode != NULL) {
if (cNode->cycle < pObj->cycle) {
pNode = cNode;
cNode = cNode->next;
} else {
break;
}
tmr_obj_t* timer = (tmr_obj_t*)calloc(1, sizeof(tmr_obj_t));
if (timer == NULL) {
tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
return NULL;
}
pObj->next = cNode;
pObj->prev = pNode;
return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
}
if (cNode != NULL) {
cNode->prev = pObj;
static void taosTimerLoopFunc(int signo) {
int64_t now = taosGetTimestampMs();
for (int i = 0; i < tListLen(wheels); i++) {
// `expried` is a temporary expire list.
// expired timers are first add to this list, then move
// to expired queue as a batch to improve performance.
// note this list is used as a stack in this function.
tmr_obj_t* expired = NULL;
time_wheel_t* wheel = wheels + i;
while (now >= wheel->nextScanAt) {
pthread_mutex_lock(&wheel->mutex);
wheel->index = (wheel->index + 1) % wheel->size;
tmr_obj_t* timer = wheel->slots[wheel->index];
while (timer != NULL) {
tmr_obj_t* next = timer->next;
if (now < timer->expireAt) {
timer = next;
continue;
}
// remove from the wheel
if (timer->prev == NULL) {
wheel->slots[wheel->index] = next;
if (next != NULL) {
next->prev = NULL;
}
if (pNode != NULL) {
pNode->next = pObj;
} else {
pList->head = pObj;
timer->prev->next = next;
if (next != NULL) {
next->prev = timer->prev;
}
pList->count++;
pCtrl->numOfTmrs++;
if (pthread_mutex_unlock(&pCtrl->mutex) != 0)
tmrError("%s mutex unlock failed, reason:%s", pCtrl->label, strerror(errno));
tmrTrace("%s %p, timer started, fp:%p, tmr_h:%p, index:%d, total:%d cindex:%d", pCtrl->label, param1, fp, pObj, index,
pCtrl->numOfTmrs, cindex);
return (tmr_h)pObj;
}
void taosTmrStop(tmr_h timerId) {
tmr_obj_t * pObj;
tmr_list_t *pList;
tmr_ctrl_t *pCtrl;
pObj = (tmr_obj_t *)timerId;
if (pObj == NULL) return;
pCtrl = pObj->pCtrl;
if (pCtrl == NULL) return;
if (pthread_mutex_lock(&pCtrl->mutex) != 0)
tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno));
if (pObj->timerId == timerId) {
pList = &(pCtrl->tmrList[pObj->index]);
if (pObj->prev) {
pObj->prev->next = pObj->next;
} else {
pList->head = pObj->next;
}
timer->wheel = tListLen(wheels);
if (pObj->next) {
pObj->next->prev = pObj->prev;
// add to temporary expire list
timer->next = expired;
timer->prev = NULL;
if (expired != NULL) {
expired->prev = timer;
}
expired = timer;
pList->count--;
pObj->timerId = NULL;
pCtrl->numOfTmrs--;
tmrTrace("%s %p, timer stopped, fp:%p, tmr_h:%p, total:%d", pCtrl->label, pObj->param1, pObj->fp, pObj,
pCtrl->numOfTmrs);
tmrMemPoolFree(pCtrl->poolHandle, (char *)(pObj));
timer = next;
}
pthread_mutex_unlock(&wheel->mutex);
wheel->nextScanAt += wheel->resolution;
}
pthread_mutex_unlock(&pCtrl->mutex);
addToExpired(expired);
}
}
void taosTmrStopA(tmr_h *timerId) {
tmr_obj_t * pObj;
tmr_list_t *pList;
tmr_ctrl_t *pCtrl;
pObj = *(tmr_obj_t **)timerId;
if (pObj == NULL) return;
pCtrl = pObj->pCtrl;
if (pCtrl == NULL) return;
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
bool reusable = false;
if (state == TIMER_STATE_WAITING) {
if (removeFromWheel(timer)) {
removeTimer(timer->id);
// only safe to reuse the timer when timer is removed from the wheel.
// we cannot guarantee the thread safety of the timr in all other cases.
reusable = true;
}
const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] is cancelled.";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
} else if (state != TIMER_STATE_EXPIRED) {
// timer already stopped or cancelled, has nothing to do in this case
} else if (timer->executedBy == taosGetPthreadId()) {
// taosTmrReset is called in the timer callback, should do nothing in this
// case to avoid dead lock. note taosTmrReset must be the last statement
// of the callback funtion, will be a bug otherwise.
} else {
assert(timer->executedBy != taosGetPthreadId());
if (pthread_mutex_lock(&pCtrl->mutex) != 0)
tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno));
const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] fired, waiting...";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
if (pObj->timerId == pObj) {
pList = &(pCtrl->tmrList[pObj->index]);
if (pObj->prev) {
pObj->prev->next = pObj->next;
} else {
pList->head = pObj->next;
for (int i = 1; atomic_load_8(&timer->state) != TIMER_STATE_STOPPED; i++) {
if (i % 1000 == 0) {
sched_yield();
}
if (pObj->next) {
pObj->next->prev = pObj->prev;
}
pList->count--;
pObj->timerId = NULL;
pCtrl->numOfTmrs--;
tmrTrace("%s %p, timer stopped atomiclly, fp:%p, tmr_h:%p, total:%d", pCtrl->label, pObj->param1, pObj->fp, pObj,
pCtrl->numOfTmrs);
tmrMemPoolFree(pCtrl->poolHandle, (char *)(pObj));
*(tmr_obj_t **)timerId = NULL;
} else {
tmrTrace("%s %p, timer stopped atomiclly, fp:%p, tmr_h:%p, total:%d", pCtrl->label, pObj->param1, pObj->fp, pObj,
pCtrl->numOfTmrs);
fmt = "%s timer[id=%lld, fp=%p, param=%p] stopped.";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
}
pthread_mutex_unlock(&pCtrl->mutex);
return reusable;
}
void taosTmrReset(void (*fp)(void *, void *), int mseconds, void *param1, void *handle, tmr_h *pTmrId) {
tmr_obj_t * pObj, *cNode, *pNode;
tmr_list_t *pList;
int index, period;
tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle;
bool taosTmrStop(tmr_h timerId) {
uintptr_t id = (uintptr_t)timerId;
if (handle == NULL) return;
if (pTmrId == NULL) return;
tmr_obj_t* timer = findTimer(id);
if (timer == NULL) {
tmrTrace("timer[id=%lld] does not exist", id);
return false;
}
period = mseconds / pCtrl->resolution;
if (pthread_mutex_lock(&pCtrl->mutex) != 0)
tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno));
uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
doStopTimer(timer, state);
timerDecRef(timer);
pObj = (tmr_obj_t *)(*pTmrId);
return state == TIMER_STATE_WAITING;
}
if (pObj && pObj->timerId == *pTmrId) {
// exist, stop it first
pList = &(pCtrl->tmrList[pObj->index]);
if (pObj->prev) {
pObj->prev->next = pObj->next;
} else {
pList->head = pObj->next;
}
bool taosTmrStopA(tmr_h* timerId) {
bool ret = taosTmrStop(*timerId);
*timerId = NULL;
return ret;
}
if (pObj->next) {
pObj->next->prev = pObj->prev;
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle, tmr_h* pTmrId) {
tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
if (ctrl == NULL || ctrl->label[0] == 0) {
return NULL;
}
pList->count--;
pObj->timerId = NULL;
pCtrl->numOfTmrs--;
uintptr_t id = (uintptr_t)*pTmrId;
bool stopped = false;
tmr_obj_t* timer = findTimer(id);
if (timer == NULL) {
tmrTrace("%s timer[id=%lld] does not exist", ctrl->label, id);
} else {
// timer not there, or already expired
pObj = (tmr_obj_t *)tmrMemPoolMalloc(pCtrl->poolHandle);
*pTmrId = pObj;
if (pObj == NULL) {
tmrError("%s failed to allocate timer, max:%d allocated:%d", pCtrl->label, pCtrl->maxNumOfTmrs, pCtrl->numOfTmrs);
pthread_mutex_unlock(&pCtrl->mutex);
return;
uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
if (!doStopTimer(timer, state)) {
timerDecRef(timer);
timer = NULL;
}
stopped = state == TIMER_STATE_WAITING;
}
pObj->cycle = period / pCtrl->numOfPeriods;
pObj->param1 = param1;
pObj->fp = fp;
pObj->timerId = pObj;
pObj->pCtrl = pCtrl;
index = (period + pCtrl->periodsFromStart) % pCtrl->numOfPeriods;
pList = &(pCtrl->tmrList[index]);
pObj->index = index;
cNode = pList->head;
pNode = NULL;
while (cNode != NULL) {
if (cNode->cycle < pObj->cycle) {
pNode = cNode;
cNode = cNode->next;
} else {
break;
}
if (timer == NULL) {
*pTmrId = taosTmrStart(fp, mseconds, param, handle);
return stopped;
}
pObj->next = cNode;
pObj->prev = pNode;
tmrTrace("%s timer[id=%lld] is reused", ctrl->label, timer->id);
if (cNode != NULL) {
cNode->prev = pObj;
// wait until there's no other reference to this timer,
// so that we can reuse this timer safely.
for (int i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
if (i % 1000 == 0) {
sched_yield();
}
if (pNode != NULL) {
pNode->next = pObj;
} else {
pList->head = pObj;
}
pList->count++;
pCtrl->numOfTmrs++;
assert(timer->refCount == 1);
memset(timer, 0, sizeof(*timer));
*pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
if (pthread_mutex_unlock(&pCtrl->mutex) != 0)
tmrError("%s mutex unlock failed, reason:%s", pCtrl->label, strerror(errno));
tmrTrace("%s %p, timer is reset, fp:%p, tmr_h:%p, index:%d, total:%d numOfFree:%d", pCtrl->label, param1, fp, pObj,
index, pCtrl->numOfTmrs, ((pool_t *)pCtrl->poolHandle)->numOfFree);
return;
return stopped;
}
void taosTmrList(void *handle) {
int i;
tmr_list_t *pList;
tmr_obj_t * pObj;
tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle;
for (i = 0; i < pCtrl->numOfPeriods; ++i) {
pList = &(pCtrl->tmrList[i]);
pObj = pList->head;
if (!pObj) continue;
printf("\nindex=%d count:%d\n", i, pList->count);
while (pObj) {
pObj = pObj->next;
static void taosTmrModuleInit(void) {
tmrCtrls = malloc(sizeof(tmr_ctrl_t) * taosMaxTmrCtrl);
if (tmrCtrls == NULL) {
tmrError("failed to allocate memory for timer controllers.");
return;
}
for (int i = 0; i < taosMaxTmrCtrl - 1; ++i) {
tmr_ctrl_t* ctrl = tmrCtrls + i;
ctrl->next = ctrl + 1;
}
}
unusedTmrCtrl = tmrCtrls;
mpool_h tmrMemPoolInit(int numOfBlock, int blockSize) {
int i;
pool_t *pool_p;
pthread_mutex_init(&tmrCtrlMutex, NULL);
if (numOfBlock <= 1 || blockSize <= 1) {
tmrError("invalid parameter in memPoolInit\n");
return NULL;
int64_t now = taosGetTimestampMs();
for (int i = 0; i < tListLen(wheels); i++) {
time_wheel_t* wheel = wheels + i;
if (pthread_mutex_init(&wheel->mutex, NULL) != 0) {
tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
return;
}
pool_p = (pool_t *)malloc(sizeof(pool_t));
if (pool_p == NULL) {
tmrError("mempool malloc failed\n");
return NULL;
} else {
memset(pool_p, 0, sizeof(pool_t));
wheel->nextScanAt = now + wheel->resolution;
wheel->index = 0;
wheel->slots = (tmr_obj_t**)calloc(wheel->size, sizeof(tmr_obj_t*));
if (wheel->slots == NULL) {
tmrError("failed to allocate wheel slots");
return;
}
pool_p->blockSize = blockSize;
pool_p->numOfBlock = numOfBlock;
pool_p->pool = (char *)malloc(blockSize * numOfBlock);
pool_p->freeList = (int *)malloc(sizeof(int) * numOfBlock);
if (pool_p->pool == NULL || pool_p->freeList == NULL) {
tmrError("failed to allocate memory\n");
tfree(pool_p->freeList);
tfree(pool_p->pool);
free(pool_p);
return NULL;
timerMap.size += wheel->size;
}
memset(pool_p->pool, 0, blockSize * numOfBlock);
for (i = 0; i < pool_p->numOfBlock; ++i) pool_p->freeList[i] = i;
timerMap.count = 0;
timerMap.slots = (timer_list_t*)calloc(timerMap.size, sizeof(timer_list_t));
if (timerMap.slots == NULL) {
tmrError("failed to allocate hash map");
return;
}
pool_p->first = 0;
pool_p->numOfFree = pool_p->numOfBlock;
tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr");
taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
return (mpool_h)pool_p;
tmrTrace("timer module is initialized, number of threads: %d", taosTmrThreads);
}
char *tmrMemPoolMalloc(mpool_h handle) {
char * pos = NULL;
pool_t *pool_p = (pool_t *)handle;
void* taosTmrInit(int maxNumOfTmrs, int resolution, int longest, const char* label) {
pthread_once(&tmrModuleInit, taosTmrModuleInit);
if (pool_p->numOfFree <= 0 || pool_p->numOfFree > pool_p->numOfBlock) {
tmrError("mempool: out of memory, numOfFree:%d, numOfBlock:%d", pool_p->numOfFree, pool_p->numOfBlock);
} else {
pos = pool_p->pool + pool_p->blockSize * (pool_p->freeList[pool_p->first]);
pool_p->first++;
pool_p->first = pool_p->first % pool_p->numOfBlock;
pool_p->numOfFree--;
pthread_mutex_lock(&tmrCtrlMutex);
tmr_ctrl_t* ctrl = unusedTmrCtrl;
if (ctrl != NULL) {
unusedTmrCtrl = ctrl->next;
numOfTmrCtrl++;
}
pthread_mutex_unlock(&tmrCtrlMutex);
return pos;
}
void tmrMemPoolFree(mpool_h handle, char *pMem) {
int index;
pool_t *pool_p = (pool_t *)handle;
if (pMem == NULL) return;
index = (int)(pMem - pool_p->pool) / pool_p->blockSize;
if (index < 0 || index >= pool_p->numOfBlock) {
tmrError("tmr mempool: error, invalid address:%p\n", pMem);
} else {
memset(pMem, 0, pool_p->blockSize);
pool_p->freeList[(pool_p->first + pool_p->numOfFree) % pool_p->numOfBlock] = index;
pool_p->numOfFree++;
if (ctrl == NULL) {
tmrError("%s too many timer controllers, failed to create timer controller.", label);
return NULL;
}
strncpy(ctrl->label, label, sizeof(ctrl->label));
ctrl->label[sizeof(ctrl->label) - 1] = 0;
tmrTrace("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
return ctrl;
}
void tmrMemPoolCleanUp(mpool_h handle) {
pool_t *pool_p = (pool_t *)handle;
if (pool_p == NULL) return;
void taosTmrCleanUp(void* handle) {
tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
assert(ctrl != NULL && ctrl->label[0] != 0);
if (pool_p->pool) free(pool_p->pool);
if (pool_p->freeList) free(pool_p->freeList);
memset(&pool_p, 0, sizeof(pool_p));
free(pool_p);
tmrTrace("%s timer controller is cleaned up.", ctrl->label);
ctrl->label[0] = 0;
pthread_mutex_lock(&tmrCtrlMutex);
ctrl->next = unusedTmrCtrl;
numOfTmrCtrl--;
unusedTmrCtrl = ctrl;
pthread_mutex_unlock(&tmrCtrlMutex);
}
......@@ -406,13 +406,6 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
return rename(fullPath, *dstPath);
}
bool taosCheckDbName(char *db, char *monitordb) {
char *pos = strchr(db, '.');
if (pos == NULL) return false;
return strncasecmp(pos + 1, monitordb, strlen(monitordb)) == 0;
}
bool taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs) {
#ifdef USE_LIBICONV
iconv_t cd = iconv_open(tsCharset, DEFAULT_UNICODE_ENCODEC);
......
char version[64] = "1.6.4.0";
char compatible_version[64] = "1.6.1.0";
char gitinfo[128] = "869171d2331eb25ba0901e88d33ae627bf5a9d91";
char buildinfo[512] = "Built by ubuntu at 2019-11-07 22:31";
char gitinfo[128] = "d04354a8ac2f7dd9ba521d755e5d484a203783d9";
char buildinfo[512] = "Built by root at 2019-11-11 10:23";
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册